# DataFlow API 操作说明文档 ## 概述 DataFlow API 提供了完整的数据流管理功能,包括数据流的创建、查询、更新、删除、执行等操作。系统基于Neo4j图数据库和PostgreSQL关系数据库的混合架构,支持复杂的数据流编排和管理。 **版本**: v1.0 **基础URL**: `/api/data-flow` **内容类型**: `application/json` **字符编码**: UTF-8 ## 通用响应格式 所有API接口都遵循统一的响应格式: ```json { "code": 200, // HTTP状态码 "message": "操作成功", // 操作结果消息 "data": {} // 返回的具体数据 } ``` ## 状态码说明 | 状态码 | 含义 | 说明 | |--------|------|------| | 200 | 成功 | 请求成功执行 | | 400 | 请求错误 | 请求参数错误或缺失 | | 404 | 未找到 | 请求的资源不存在 | | 500 | 服务器错误 | 服务器内部错误 | --- ## API 接口详情 ### 1. 获取数据流列表 **接口描述**: 获取数据流列表,支持分页和搜索功能。 #### 请求信息 - **HTTP方法**: `GET` - **请求路径**: `/get-dataflows-list` - **请求头**: `Content-Type: application/json` #### 请求参数 | 参数名 | 类型 | 必填 | 默认值 | 说明 | |--------|------|------|--------|------| | page | int | 否 | 1 | 页码,从1开始 | | page_size | int | 否 | 10 | 每页记录数,最大100 | | search | string | 否 | "" | 搜索关键词,支持名称和描述模糊匹配 | #### 响应数据 ```json { "code": 200, "message": "success", "data": { "list": [ { "id": 1, "name": "用户数据ETL流程", "en_name": "user_data_etl_process", "description": "处理用户数据的ETL流程", "status": "active", "created_at": "2023-12-01 10:00:00", "updated_at": "2023-12-01 12:00:00", "created_by": "admin", "config": "{\"source\": \"mysql\", \"target\": \"elasticsearch\"}" } ], "pagination": { "page": 1, "page_size": 10, "total": 25, "total_pages": 3 } } } ``` #### 示例代码 **Python示例**: ```python import requests url = "http://your-domain/api/data-flow/get-dataflows-list" params = { "page": 1, "page_size": 10, "search": "ETL" } response = requests.get(url, params=params) data = response.json() if data["code"] == 200: dataflows = data["data"]["list"] for df in dataflows: print(f"数据流: {df['name']} - {df['description']}") ``` **JavaScript示例**: ```javascript const params = new URLSearchParams({ page: 1, page_size: 10, search: 'ETL' }); fetch(`/api/data-flow/get-dataflows-list?${params}`) .then(response => response.json()) .then(data => { if (data.code === 200) { console.log('数据流列表:', data.data.list); } }) .catch(error => console.error('错误:', error)); ``` #### 测试数据 ``` GET /api/data-flow/get-dataflows-list?page=1&page_size=5&search=数据处理 ``` --- ### 2. 获取数据流详情 **接口描述**: 根据数据流ID获取详细信息,包括关联的标签、子节点和父节点。 #### 请求信息 - **HTTP方法**: `GET` - **请求路径**: `/get-dataflow/{dataflow_id}` - **请求头**: `Content-Type: application/json` #### 路径参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | dataflow_id | int | 是 | 数据流ID | #### 响应数据 ```json { "code": 200, "message": "success", "data": { "id": 1, "name": "用户数据ETL流程", "en_name": "user_data_etl_process", "description": "处理用户数据的ETL流程", "status": "active", "created_at": "2023-12-01 10:00:00", "updated_at": "2023-12-01 12:00:00", "created_by": "admin", "config": "{\"source\": \"mysql\", \"target\": \"elasticsearch\"}", "tags": [ {"id": 10, "name": "数据处理"}, {"id": 11, "name": "ETL"} ], "children": [ {"id": 2, "name": "数据清洗子流程"}, {"id": 3, "name": "数据转换子流程"} ], "parents": [ {"id": 0, "name": "主数据流"} ] } } ``` #### 错误响应 ```json { "code": 404, "message": "数据流不存在", "data": null } ``` #### 示例代码 **Python示例**: ```python import requests dataflow_id = 1 url = f"http://your-domain/api/data-flow/get-dataflow/{dataflow_id}" response = requests.get(url) data = response.json() if data["code"] == 200: dataflow = data["data"] print(f"数据流名称: {dataflow['name']}") print(f"关联标签: {[tag['name'] for tag in dataflow['tags']]}") elif data["code"] == 404: print("数据流不存在") ``` #### 测试数据 ``` GET /api/data-flow/get-dataflow/1 ``` --- ### 3. 创建数据流 **接口描述**: 创建新的数据流,自动生成英文名称,支持标签和子节点关联。 #### 请求信息 - **HTTP方法**: `POST` - **请求路径**: `/add-dataflow` - **请求头**: `Content-Type: application/json` #### 请求参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | name | string | 是 | 数据流名称 | | description | string | 是 | 数据流描述 | | status | string | 否 | 状态,默认"inactive",可选值: "active", "inactive" | | created_by | string | 否 | 创建者,默认"system" | | config | object | 否 | 配置信息,JSON对象 | | children_ids | array | 否 | 子节点ID列表 | | tag_id | int | 否 | 关联标签ID | #### 请求体示例 ```json { "name": "客户数据同步流程", "description": "将客户数据从CRM同步到数据仓库", "status": "active", "created_by": "admin", "config": { "source": { "type": "mysql", "host": "192.168.1.100", "database": "crm" }, "target": { "type": "postgresql", "host": "192.168.1.200", "database": "datawarehouse" }, "schedule": "0 2 * * *" }, "children_ids": [2, 3], "tag_id": 10 } ``` #### 响应数据 ```json { "code": 200, "message": "数据流创建成功", "data": { "id": 5, "name": "客户数据同步流程", "en_name": "customer_data_sync_process", "description": "将客户数据从CRM同步到数据仓库", "status": "active", "created_at": "2023-12-01 14:30:00", "updated_at": "2023-12-01 14:30:00", "created_by": "admin", "config": "{\"source\":{\"type\":\"mysql\",\"host\":\"192.168.1.100\",\"database\":\"crm\"},\"target\":{\"type\":\"postgresql\",\"host\":\"192.168.1.200\",\"database\":\"datawarehouse\"},\"schedule\":\"0 2 * * *\"}" } } ``` #### 错误响应 ```json { "code": 400, "message": "参数错误: 缺少必填字段: name", "data": null } ``` #### 示例代码 **Python示例**: ```python import requests import json url = "http://your-domain/api/data-flow/add-dataflow" data = { "name": "订单数据处理流程", "description": "处理电商订单数据", "status": "active", "created_by": "data_team", "config": { "batch_size": 1000, "retry_count": 3 } } response = requests.post(url, json=data) result = response.json() if result["code"] == 200: print(f"创建成功,数据流ID: {result['data']['id']}") else: print(f"创建失败: {result['message']}") ``` #### 测试数据 ```json { "name": "测试数据流", "description": "用于测试的数据流", "status": "inactive", "config": {"test": true} } ``` --- ### 4. 更新数据流 **接口描述**: 更新现有数据流的信息,支持部分字段更新。 #### 请求信息 - **HTTP方法**: `PUT` - **请求路径**: `/update-dataflow/{dataflow_id}` - **请求头**: `Content-Type: application/json` #### 路径参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | dataflow_id | int | 是 | 数据流ID | #### 请求参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | name | string | 否 | 数据流名称 | | description | string | 否 | 数据流描述 | | status | string | 否 | 状态 | | config | object | 否 | 配置信息 | #### 请求体示例 ```json { "description": "更新后的数据流描述", "status": "active", "config": { "batch_size": 2000, "timeout": 300 } } ``` #### 响应数据 ```json { "code": 200, "message": "数据流更新成功", "data": { "id": 1, "name": "用户数据ETL流程", "en_name": "user_data_etl_process", "description": "更新后的数据流描述", "status": "active", "created_at": "2023-12-01 10:00:00", "updated_at": "2023-12-01 15:45:00", "created_by": "admin", "config": "{\"batch_size\":2000,\"timeout\":300}" } } ``` #### 示例代码 **Python示例**: ```python import requests dataflow_id = 1 url = f"http://your-domain/api/data-flow/update-dataflow/{dataflow_id}" data = { "status": "active", "config": {"enabled": True} } response = requests.put(url, json=data) result = response.json() if result["code"] == 200: print("更新成功") elif result["code"] == 404: print("数据流不存在") ``` --- ### 5. 删除数据流 **接口描述**: 删除指定的数据流及其所有关联关系。 #### 请求信息 - **HTTP方法**: `DELETE` - **请求路径**: `/delete-dataflow/{dataflow_id}` #### 路径参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | dataflow_id | int | 是 | 数据流ID | #### 响应数据 **成功响应**: ```json { "code": 200, "message": "数据流删除成功", "data": {} } ``` **失败响应**: ```json { "code": 404, "message": "数据流不存在", "data": {} } ``` #### 示例代码 **Python示例**: ```python import requests dataflow_id = 1 url = f"http://your-domain/api/data-flow/delete-dataflow/{dataflow_id}" response = requests.delete(url) result = response.json() if result["code"] == 200: print("删除成功") elif result["code"] == 404: print("数据流不存在") ``` #### 测试数据 ``` DELETE /api/data-flow/delete-dataflow/1 ``` --- ### 6. 执行数据流 **接口描述**: 启动数据流执行,支持传入执行参数。 #### 请求信息 - **HTTP方法**: `POST` - **请求路径**: `/execute-dataflow/{dataflow_id}` - **请求头**: `Content-Type: application/json` #### 路径参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | dataflow_id | int | 是 | 数据流ID | #### 请求参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | params | object | 否 | 执行参数,JSON对象 | #### 请求体示例 ```json { "params": { "start_date": "2023-12-01", "end_date": "2023-12-07", "force_refresh": true } } ``` #### 响应数据 ```json { "code": 200, "message": "数据流执行成功", "data": { "execution_id": "exec_1_1701420000", "dataflow_id": 1, "status": "running", "started_at": "2023-12-01T16:00:00", "params": { "start_date": "2023-12-01", "end_date": "2023-12-07", "force_refresh": true }, "progress": 0 } } ``` #### 示例代码 **Python示例**: ```python import requests dataflow_id = 1 url = f"http://your-domain/api/data-flow/execute-dataflow/{dataflow_id}" data = { "params": { "batch_size": 500, "parallel_workers": 4 } } response = requests.post(url, json=data) result = response.json() if result["code"] == 200: execution_id = result["data"]["execution_id"] print(f"执行开始,执行ID: {execution_id}") ``` --- ### 7. 获取数据流执行状态 **接口描述**: 查询数据流的当前执行状态和进度。 #### 请求信息 - **HTTP方法**: `GET` - **请求路径**: `/get-dataflow-status/{dataflow_id}` #### 路径参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | dataflow_id | int | 是 | 数据流ID | #### 响应数据 ```json { "code": 200, "message": "success", "data": { "dataflow_id": 1, "status": "running", "progress": 45, "started_at": "2023-12-01T16:00:00", "completed_at": null, "error_message": null } } ``` #### 状态说明 | 状态值 | 含义 | 说明 | |--------|------|------| | pending | 等待中 | 数据流等待执行 | | running | 运行中 | 数据流正在执行 | | completed | 已完成 | 数据流执行成功完成 | | failed | 执行失败 | 数据流执行过程中出现错误 | #### 示例代码 **JavaScript示例**: ```javascript function checkDataflowStatus(dataflowId) { fetch(`/api/data-flow/get-dataflow-status/${dataflowId}`) .then(response => response.json()) .then(data => { if (data.code === 200) { const status = data.data; console.log(`状态: ${status.status}, 进度: ${status.progress}%`); if (status.status === 'running') { // 如果还在运行,5秒后再次检查 setTimeout(() => checkDataflowStatus(dataflowId), 5000); } } }); } ``` --- ### 8. 获取数据流执行日志 **接口描述**: 获取数据流执行过程中的详细日志信息,支持分页。 #### 请求信息 - **HTTP方法**: `GET` - **请求路径**: `/get-dataflow-logs/{dataflow_id}` #### 路径参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | dataflow_id | int | 是 | 数据流ID | #### 请求参数 | 参数名 | 类型 | 必填 | 默认值 | 说明 | |--------|------|------|--------|------| | page | int | 否 | 1 | 页码 | | page_size | int | 否 | 50 | 每页记录数 | #### 响应数据 ```json { "code": 200, "message": "success", "data": { "logs": [ { "id": 1, "timestamp": "2023-12-01T16:00:00", "level": "INFO", "message": "数据流开始执行", "component": "source" }, { "id": 2, "timestamp": "2023-12-01T16:00:30", "level": "INFO", "message": "成功连接到数据源", "component": "source" }, { "id": 3, "timestamp": "2023-12-01T16:01:00", "level": "WARNING", "message": "发现重复数据,已自动处理", "component": "transform" } ], "pagination": { "page": 1, "page_size": 50, "total": 100, "total_pages": 2 } } } ``` #### 日志级别说明 | 级别 | 含义 | 说明 | |------|------|------| | INFO | 信息 | 一般性信息日志 | | WARNING | 警告 | 警告信息,不影响执行 | | ERROR | 错误 | 错误信息,可能导致执行失败 | #### 示例代码 **Python示例**: ```python import requests dataflow_id = 1 url = f"http://your-domain/api/data-flow/get-dataflow-logs/{dataflow_id}" params = {"page": 1, "page_size": 20} response = requests.get(url, params=params) data = response.json() if data["code"] == 200: logs = data["data"]["logs"] for log in logs: print(f"[{log['timestamp']}] {log['level']}: {log['message']}") ``` --- ### 9. 使用AI生成脚本 **接口描述**: 使用Deepseek AI模型根据文本描述生成数据处理脚本。 #### 请求信息 - **HTTP方法**: `POST` - **请求路径**: `/create-script` - **请求头**: `Content-Type: application/json` #### 请求参数 | 参数名 | 类型 | 必填 | 说明 | |--------|------|------|------| | request_data | string | 是 | 需求描述文本 | #### 请求体示例 ```json { "request_data": "请生成一个Python脚本,从MySQL数据库读取用户表数据,清洗无效数据后写入Elasticsearch。需要包含错误处理和日志记录功能。" } ``` #### 响应数据 ```json { "code": 200, "message": "脚本生成成功", "data": { "script_content": "#!/usr/bin/env python3\n# -*- coding: utf-8 -*-\n\nimport mysql.connector\nfrom elasticsearch import Elasticsearch\nimport logging\nimport json\nfrom datetime import datetime\n\n# 配置日志\nlogging.basicConfig(level=logging.INFO)\nlogger = logging.getLogger(__name__)\n\ndef main():\n try:\n # MySQL连接配置\n mysql_config = {\n 'host': 'localhost',\n 'user': 'username',\n 'password': 'password',\n 'database': 'database_name'\n }\n \n # Elasticsearch连接配置\n es = Elasticsearch([{'host': 'localhost', 'port': 9200}])\n \n # 连接MySQL\n conn = mysql.connector.connect(**mysql_config)\n cursor = conn.cursor(dictionary=True)\n \n # 查询用户数据\n query = \"SELECT * FROM users WHERE status = 'active'\"\n cursor.execute(query)\n \n # 处理数据\n for row in cursor:\n # 数据清洗\n if validate_user_data(row):\n # 写入Elasticsearch\n es.index(index='users', body=row)\n logger.info(f\"用户数据已写入ES: {row['id']}\")\n else:\n logger.warning(f\"无效用户数据跳过: {row['id']}\")\n \n except Exception as e:\n logger.error(f\"脚本执行失败: {str(e)}\")\n finally:\n if 'conn' in locals():\n conn.close()\n\ndef validate_user_data(user):\n \"\"\"验证用户数据有效性\"\"\"\n return user.get('email') and '@' in user.get('email', '')\n\nif __name__ == '__main__':\n main()", "format": "txt", "generated_at": "2023-12-01T16:30:00" } } ``` #### 示例代码 **Python示例**: ```python import requests url = "http://your-domain/api/data-flow/create-script" data = { "request_data": "生成一个数据备份脚本,每天凌晨2点自动备份PostgreSQL数据库到云存储" } response = requests.post(url, json=data) result = response.json() if result["code"] == 200: script = result["data"]["script_content"] # 保存脚本到文件 with open("backup_script.py", "w", encoding="utf-8") as f: f.write(script) print("脚本生成并保存成功") ``` #### 测试数据 ```json { "request_data": "创建一个简单的CSV文件处理脚本,读取销售数据并计算月度汇总" } ``` --- ## 错误处理 ### 常见错误码 | 错误码 | 错误类型 | 解决方案 | |--------|----------|----------| | 400 | 请求参数错误 | 检查请求参数格式和必填字段 | | 404 | 资源不存在 | 确认资源ID是否正确 | | 500 | 服务器内部错误 | 查看服务器日志,联系技术支持 | ### 错误响应示例 ```json { "code": 400, "message": "参数错误: 缺少必填字段: name", "data": null } ``` ### 错误处理最佳实践 1. **检查HTTP状态码**: 首先检查HTTP响应状态码 2. **解析错误消息**: 根据返回的message字段了解具体错误 3. **参数验证**: 发送请求前在客户端进行参数验证 4. **重试机制**: 对于网络错误实现适当的重试机制 5. **日志记录**: 记录API调用和错误信息便于排查 --- ## 使用示例 ### 完整工作流示例 以下是一个完整的数据流管理工作流示例: ```python import requests import time import json class DataFlowClient: def __init__(self, base_url): self.base_url = base_url def create_dataflow(self, name, description, config=None): """创建数据流""" url = f"{self.base_url}/add-dataflow" data = { "name": name, "description": description, "status": "active", "config": config or {} } response = requests.post(url, json=data) return response.json() def execute_dataflow(self, dataflow_id, params=None): """执行数据流""" url = f"{self.base_url}/execute-dataflow/{dataflow_id}" data = {"params": params or {}} response = requests.post(url, json=data) return response.json() def wait_for_completion(self, dataflow_id, timeout=300): """等待数据流执行完成""" start_time = time.time() while time.time() - start_time < timeout: url = f"{self.base_url}/get-dataflow-status/{dataflow_id}" response = requests.get(url) data = response.json() if data["code"] == 200: status = data["data"]["status"] progress = data["data"]["progress"] print(f"执行状态: {status}, 进度: {progress}%") if status in ["completed", "failed"]: return status time.sleep(5) return "timeout" # 使用示例 client = DataFlowClient("http://your-domain/api/data-flow") # 1. 创建数据流 result = client.create_dataflow( "销售数据分析", "分析每日销售数据并生成报表", {"source": "mysql", "target": "report"} ) if result["code"] == 200: dataflow_id = result["data"]["id"] print(f"数据流创建成功,ID: {dataflow_id}") # 2. 执行数据流 exec_result = client.execute_dataflow(dataflow_id, { "date_range": "2023-12-01,2023-12-07" }) if exec_result["code"] == 200: print("数据流开始执行") # 3. 等待执行完成 final_status = client.wait_for_completion(dataflow_id) print(f"最终状态: {final_status}") ``` --- ## 注意事项 1. **认证授权**: 生产环境中需要添加适当的认证机制 2. **限流控制**: 注意API调用频率限制 3. **数据大小**: 大量数据操作可能需要较长时间 4. **并发执行**: 同一数据流不能同时执行多个实例 5. **资源清理**: 及时清理不需要的数据流以节省资源 --- ## 更新日志 - **v1.0** (2025-06-10): 初始版本,包含基础数据流管理功能 - **v1.1** (待发布): 计划增加数据流模板功能 --- ## 技术支持 如有问题请联系技术支持团队或查看详细的开发文档。 **文档版本**: v1.0 **最后更新**: 2025-06-10