data_flow_apis.md 22 KB

DataFlow API 操作说明文档

概述

DataFlow API 提供了完整的数据流管理功能,包括数据流的创建、查询、更新、删除、执行等操作。系统基于Neo4j图数据库和PostgreSQL关系数据库的混合架构,支持复杂的数据流编排和管理。

版本: v1.0
基础URL: /api/dataflow
内容类型: application/json
字符编码: UTF-8

通用响应格式

所有API接口都遵循统一的响应格式:

{
  "code": 200,           // HTTP状态码
  "message": "操作成功",  // 操作结果消息
  "data": {}             // 返回的具体数据
}

状态码说明

状态码 含义 说明
200 成功 请求成功执行
400 请求错误 请求参数错误或缺失
404 未找到 请求的资源不存在
500 服务器错误 服务器内部错误

API 接口详情

1. 获取数据流列表

接口描述: 获取数据流列表,支持分页和搜索功能。

请求信息

  • HTTP方法: GET
  • 请求路径: /get-dataflows-list
  • 请求头: Content-Type: application/json

请求参数

参数名 类型 必填 默认值 说明
page int 1 页码,从1开始
page_size int 10 每页记录数,最大100
search string "" 搜索关键词,支持名称和描述模糊匹配

响应数据

{
  "code": 200,
  "message": "success",
  "data": {
    "list": [
      {
        "id": 1,
        "name": "用户数据ETL流程",
        "en_name": "user_data_etl_process",
        "description": "处理用户数据的ETL流程",
        "status": "active",
        "created_at": "2023-12-01 10:00:00",
        "updated_at": "2023-12-01 12:00:00",
        "created_by": "admin",
        "config": "{\"source\": \"mysql\", \"target\": \"elasticsearch\"}"
      }
    ],
    "pagination": {
      "page": 1,
      "page_size": 10,
      "total": 25,
      "total_pages": 3
    }
  }
}

示例代码

Python示例:

import requests

url = "http://your-domain/api/data-flow/get-dataflows-list"
params = {
    "page": 1,
    "page_size": 10,
    "search": "ETL"
}

response = requests.get(url, params=params)
data = response.json()

if data["code"] == 200:
    dataflows = data["data"]["list"]
    for df in dataflows:
        print(f"数据流: {df['name']} - {df['description']}")

JavaScript示例:

const params = new URLSearchParams({
    page: 1,
    page_size: 10,
    search: 'ETL'
});

fetch(`/api/data-flow/get-dataflows-list?${params}`)
    .then(response => response.json())
    .then(data => {
        if (data.code === 200) {
            console.log('数据流列表:', data.data.list);
        }
    })
    .catch(error => console.error('错误:', error));

测试数据

GET /api/data-flow/get-dataflows-list?page=1&page_size=5&search=数据处理

2. 获取数据流详情

接口描述: 根据数据流ID获取详细信息,包括关联的标签、子节点和父节点。

请求信息

  • HTTP方法: GET
  • 请求路径: /get-dataflow/{dataflow_id}
  • 请求头: Content-Type: application/json

路径参数

参数名 类型 必填 说明
dataflow_id int 数据流ID

响应数据

{
  "code": 200,
  "message": "success",
  "data": {
    "id": 1,
    "name": "用户数据ETL流程",
    "en_name": "user_data_etl_process",
    "description": "处理用户数据的ETL流程",
    "status": "active",
    "created_at": "2023-12-01 10:00:00",
    "updated_at": "2023-12-01 12:00:00",
    "created_by": "admin",
    "config": "{\"source\": \"mysql\", \"target\": \"elasticsearch\"}",
    "tags": [
      {"id": 10, "name": "数据处理"},
      {"id": 11, "name": "ETL"}
    ],
    "children": [
      {"id": 2, "name": "数据清洗子流程"},
      {"id": 3, "name": "数据转换子流程"}
    ],
    "parents": [
      {"id": 0, "name": "主数据流"}
    ]
  }
}

错误响应

{
  "code": 404,
  "message": "数据流不存在",
  "data": null
}

示例代码

Python示例:

import requests

dataflow_id = 1
url = f"http://your-domain/api/data-flow/get-dataflow/{dataflow_id}"

response = requests.get(url)
data = response.json()

if data["code"] == 200:
    dataflow = data["data"]
    print(f"数据流名称: {dataflow['name']}")
    print(f"关联标签: {[tag['name'] for tag in dataflow['tags']]}")
elif data["code"] == 404:
    print("数据流不存在")

测试数据

GET /api/data-flow/get-dataflow/1

3. 创建数据流

接口描述: 创建新的数据流,自动生成英文名称,支持标签和子节点关联。

请求信息

  • HTTP方法: POST
  • 请求路径: /add-dataflow
  • 请求头: Content-Type: application/json

请求参数

参数名 类型 必填 说明
name string 数据流名称
description string 数据流描述
status string 状态,默认"inactive",可选值: "active", "inactive"
created_by string 创建者,默认"system"
config object 配置信息,JSON对象
children_ids array 子节点ID列表
tag_id int 关联标签ID

请求体示例

{
  "name": "客户数据同步流程",
  "description": "将客户数据从CRM同步到数据仓库",
  "status": "active",
  "created_by": "admin",
  "config": {
    "source": {
      "type": "mysql",
      "host": "192.168.1.100",
      "database": "crm"
    },
    "target": {
      "type": "postgresql",
      "host": "192.168.1.200",
      "database": "datawarehouse"
    },
    "schedule": "0 2 * * *"
  },
  "children_ids": [2, 3],
  "tag_id": 10
}

响应数据

{
  "code": 200,
  "message": "数据流创建成功",
  "data": {
    "id": 5,
    "name": "客户数据同步流程",
    "en_name": "customer_data_sync_process",
    "description": "将客户数据从CRM同步到数据仓库",
    "status": "active",
    "created_at": "2023-12-01 14:30:00",
    "updated_at": "2023-12-01 14:30:00",
    "created_by": "admin",
    "config": "{\"source\":{\"type\":\"mysql\",\"host\":\"192.168.1.100\",\"database\":\"crm\"},\"target\":{\"type\":\"postgresql\",\"host\":\"192.168.1.200\",\"database\":\"datawarehouse\"},\"schedule\":\"0 2 * * *\"}"
  }
}

错误响应

{
  "code": 400,
  "message": "参数错误: 缺少必填字段: name",
  "data": null
}

示例代码

Python示例:

import requests
import json

url = "http://your-domain/api/data-flow/add-dataflow"
data = {
    "name": "订单数据处理流程",
    "description": "处理电商订单数据",
    "status": "active",
    "created_by": "data_team",
    "config": {
        "batch_size": 1000,
        "retry_count": 3
    }
}

response = requests.post(url, json=data)
result = response.json()

if result["code"] == 200:
    print(f"创建成功,数据流ID: {result['data']['id']}")
else:
    print(f"创建失败: {result['message']}")

测试数据

{
  "name": "测试数据流",
  "description": "用于测试的数据流",
  "status": "inactive",
  "config": {"test": true}
}

4. 更新数据流

接口描述: 更新现有数据流的信息,支持部分字段更新。

请求信息

  • HTTP方法: PUT
  • 请求路径: /update-dataflow/{dataflow_id}
  • 请求头: Content-Type: application/json

路径参数

参数名 类型 必填 说明
dataflow_id int 数据流ID

请求参数

参数名 类型 必填 说明
name string 数据流名称
description string 数据流描述
status string 状态
config object 配置信息

请求体示例

{
  "description": "更新后的数据流描述",
  "status": "active",
  "config": {
    "batch_size": 2000,
    "timeout": 300
  }
}

响应数据

{
  "code": 200,
  "message": "数据流更新成功",
  "data": {
    "id": 1,
    "name": "用户数据ETL流程",
    "en_name": "user_data_etl_process",
    "description": "更新后的数据流描述",
    "status": "active",
    "created_at": "2023-12-01 10:00:00",
    "updated_at": "2023-12-01 15:45:00",
    "created_by": "admin",
    "config": "{\"batch_size\":2000,\"timeout\":300}"
  }
}

示例代码

Python示例:

import requests

dataflow_id = 1
url = f"http://your-domain/api/data-flow/update-dataflow/{dataflow_id}"
data = {
    "status": "active",
    "config": {"enabled": True}
}

response = requests.put(url, json=data)
result = response.json()

if result["code"] == 200:
    print("更新成功")
elif result["code"] == 404:
    print("数据流不存在")

5. 删除数据流

接口描述: 删除指定的数据流及其所有关联关系。

请求信息

  • HTTP方法: DELETE
  • 请求路径: /delete-dataflow/{dataflow_id}

路径参数

参数名 类型 必填 说明
dataflow_id int 数据流ID

响应数据

成功响应:

{
  "code": 200,
  "message": "数据流删除成功",
  "data": {}
}

失败响应:

{
  "code": 404,
  "message": "数据流不存在",
  "data": {}
}

示例代码

Python示例:

import requests

dataflow_id = 1
url = f"http://your-domain/api/data-flow/delete-dataflow/{dataflow_id}"

response = requests.delete(url)
result = response.json()

if result["code"] == 200:
    print("删除成功")
elif result["code"] == 404:
    print("数据流不存在")

测试数据

DELETE /api/data-flow/delete-dataflow/1

6. 执行数据流

接口描述: 启动数据流执行,支持传入执行参数。

请求信息

  • HTTP方法: POST
  • 请求路径: /execute-dataflow/{dataflow_id}
  • 请求头: Content-Type: application/json

路径参数

参数名 类型 必填 说明
dataflow_id int 数据流ID

请求参数

参数名 类型 必填 说明
params object 执行参数,JSON对象

请求体示例

{
  "params": {
    "start_date": "2023-12-01",
    "end_date": "2023-12-07",
    "force_refresh": true
  }
}

响应数据

{
  "code": 200,
  "message": "数据流执行成功",
  "data": {
    "execution_id": "exec_1_1701420000",
    "dataflow_id": 1,
    "status": "running",
    "started_at": "2023-12-01T16:00:00",
    "params": {
      "start_date": "2023-12-01",
      "end_date": "2023-12-07",
      "force_refresh": true
    },
    "progress": 0
  }
}

示例代码

Python示例:

import requests

dataflow_id = 1
url = f"http://your-domain/api/data-flow/execute-dataflow/{dataflow_id}"
data = {
    "params": {
        "batch_size": 500,
        "parallel_workers": 4
    }
}

response = requests.post(url, json=data)
result = response.json()

if result["code"] == 200:
    execution_id = result["data"]["execution_id"]
    print(f"执行开始,执行ID: {execution_id}")

7. 获取数据流执行状态

接口描述: 查询数据流的当前执行状态和进度。

请求信息

  • HTTP方法: GET
  • 请求路径: /get-dataflow-status/{dataflow_id}

路径参数

参数名 类型 必填 说明
dataflow_id int 数据流ID

响应数据

{
  "code": 200,
  "message": "success",
  "data": {
    "dataflow_id": 1,
    "status": "running",
    "progress": 45,
    "started_at": "2023-12-01T16:00:00",
    "completed_at": null,
    "error_message": null
  }
}

状态说明

状态值 含义 说明
pending 等待中 数据流等待执行
running 运行中 数据流正在执行
completed 已完成 数据流执行成功完成
failed 执行失败 数据流执行过程中出现错误

示例代码

JavaScript示例:

function checkDataflowStatus(dataflowId) {
    fetch(`/api/data-flow/get-dataflow-status/${dataflowId}`)
        .then(response => response.json())
        .then(data => {
            if (data.code === 200) {
                const status = data.data;
                console.log(`状态: ${status.status}, 进度: ${status.progress}%`);
                
                if (status.status === 'running') {
                    // 如果还在运行,5秒后再次检查
                    setTimeout(() => checkDataflowStatus(dataflowId), 5000);
                }
            }
        });
}

8. 获取数据流执行日志

接口描述: 获取数据流执行过程中的详细日志信息,支持分页。

请求信息

  • HTTP方法: GET
  • 请求路径: /get-dataflow-logs/{dataflow_id}

路径参数

参数名 类型 必填 说明
dataflow_id int 数据流ID

请求参数

参数名 类型 必填 默认值 说明
page int 1 页码
page_size int 50 每页记录数

响应数据

{
  "code": 200,
  "message": "success",
  "data": {
    "logs": [
      {
        "id": 1,
        "timestamp": "2023-12-01T16:00:00",
        "level": "INFO",
        "message": "数据流开始执行",
        "component": "source"
      },
      {
        "id": 2,
        "timestamp": "2023-12-01T16:00:30",
        "level": "INFO",
        "message": "成功连接到数据源",
        "component": "source"
      },
      {
        "id": 3,
        "timestamp": "2023-12-01T16:01:00",
        "level": "WARNING",
        "message": "发现重复数据,已自动处理",
        "component": "transform"
      }
    ],
    "pagination": {
      "page": 1,
      "page_size": 50,
      "total": 100,
      "total_pages": 2
    }
  }
}

日志级别说明

级别 含义 说明
INFO 信息 一般性信息日志
WARNING 警告 警告信息,不影响执行
ERROR 错误 错误信息,可能导致执行失败

示例代码

Python示例:

import requests

dataflow_id = 1
url = f"http://your-domain/api/data-flow/get-dataflow-logs/{dataflow_id}"
params = {"page": 1, "page_size": 20}

response = requests.get(url, params=params)
data = response.json()

if data["code"] == 200:
    logs = data["data"]["logs"]
    for log in logs:
        print(f"[{log['timestamp']}] {log['level']}: {log['message']}")

9. 使用AI生成脚本

接口描述: 使用Deepseek AI模型根据文本描述生成数据处理脚本。

请求信息

  • HTTP方法: POST
  • 请求路径: /create-script
  • 请求头: Content-Type: application/json

请求参数

参数名 类型 必填 说明
request_data string 需求描述文本

请求体示例

{
  "request_data": "请生成一个Python脚本,从MySQL数据库读取用户表数据,清洗无效数据后写入Elasticsearch。需要包含错误处理和日志记录功能。"
}

响应数据

{
  "code": 200,
  "message": "脚本生成成功",
  "data": {
    "script_content": "#!/usr/bin/env python3\n# -*- coding: utf-8 -*-\n\nimport mysql.connector\nfrom elasticsearch import Elasticsearch\nimport logging\nimport json\nfrom datetime import datetime\n\n# 配置日志\nlogging.basicConfig(level=logging.INFO)\nlogger = logging.getLogger(__name__)\n\ndef main():\n    try:\n        # MySQL连接配置\n        mysql_config = {\n            'host': 'localhost',\n            'user': 'username',\n            'password': 'password',\n            'database': 'database_name'\n        }\n        \n        # Elasticsearch连接配置\n        es = Elasticsearch([{'host': 'localhost', 'port': 9200}])\n        \n        # 连接MySQL\n        conn = mysql.connector.connect(**mysql_config)\n        cursor = conn.cursor(dictionary=True)\n        \n        # 查询用户数据\n        query = \"SELECT * FROM users WHERE status = 'active'\"\n        cursor.execute(query)\n        \n        # 处理数据\n        for row in cursor:\n            # 数据清洗\n            if validate_user_data(row):\n                # 写入Elasticsearch\n                es.index(index='users', body=row)\n                logger.info(f\"用户数据已写入ES: {row['id']}\")\n            else:\n                logger.warning(f\"无效用户数据跳过: {row['id']}\")\n                \n    except Exception as e:\n        logger.error(f\"脚本执行失败: {str(e)}\")\n    finally:\n        if 'conn' in locals():\n            conn.close()\n\ndef validate_user_data(user):\n    \"\"\"验证用户数据有效性\"\"\"\n    return user.get('email') and '@' in user.get('email', '')\n\nif __name__ == '__main__':\n    main()",
    "format": "txt",
    "generated_at": "2023-12-01T16:30:00"
  }
}

示例代码

Python示例:

import requests

url = "http://your-domain/api/data-flow/create-script"
data = {
    "request_data": "生成一个数据备份脚本,每天凌晨2点自动备份PostgreSQL数据库到云存储"
}

response = requests.post(url, json=data)
result = response.json()

if result["code"] == 200:
    script = result["data"]["script_content"]
    # 保存脚本到文件
    with open("backup_script.py", "w", encoding="utf-8") as f:
        f.write(script)
    print("脚本生成并保存成功")

测试数据

{
  "request_data": "创建一个简单的CSV文件处理脚本,读取销售数据并计算月度汇总"
}

错误处理

常见错误码

错误码 错误类型 解决方案
400 请求参数错误 检查请求参数格式和必填字段
404 资源不存在 确认资源ID是否正确
500 服务器内部错误 查看服务器日志,联系技术支持

错误响应示例

{
  "code": 400,
  "message": "参数错误: 缺少必填字段: name",
  "data": null
}

错误处理最佳实践

  1. 检查HTTP状态码: 首先检查HTTP响应状态码
  2. 解析错误消息: 根据返回的message字段了解具体错误
  3. 参数验证: 发送请求前在客户端进行参数验证
  4. 重试机制: 对于网络错误实现适当的重试机制
  5. 日志记录: 记录API调用和错误信息便于排查

使用示例

完整工作流示例

以下是一个完整的数据流管理工作流示例:

import requests
import time
import json

class DataFlowClient:
    def __init__(self, base_url):
        self.base_url = base_url
    
    def create_dataflow(self, name, description, config=None):
        """创建数据流"""
        url = f"{self.base_url}/add-dataflow"
        data = {
            "name": name,
            "description": description,
            "status": "active",
            "config": config or {}
        }
        response = requests.post(url, json=data)
        return response.json()
    
    def execute_dataflow(self, dataflow_id, params=None):
        """执行数据流"""
        url = f"{self.base_url}/execute-dataflow/{dataflow_id}"
        data = {"params": params or {}}
        response = requests.post(url, json=data)
        return response.json()
    
    def wait_for_completion(self, dataflow_id, timeout=300):
        """等待数据流执行完成"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            url = f"{self.base_url}/get-dataflow-status/{dataflow_id}"
            response = requests.get(url)
            data = response.json()
            
            if data["code"] == 200:
                status = data["data"]["status"]
                progress = data["data"]["progress"]
                
                print(f"执行状态: {status}, 进度: {progress}%")
                
                if status in ["completed", "failed"]:
                    return status
            
            time.sleep(5)
        
        return "timeout"

# 使用示例
client = DataFlowClient("http://your-domain/api/data-flow")

# 1. 创建数据流
result = client.create_dataflow(
    "销售数据分析",
    "分析每日销售数据并生成报表",
    {"source": "mysql", "target": "report"}
)

if result["code"] == 200:
    dataflow_id = result["data"]["id"]
    print(f"数据流创建成功,ID: {dataflow_id}")
    
    # 2. 执行数据流
    exec_result = client.execute_dataflow(dataflow_id, {
        "date_range": "2023-12-01,2023-12-07"
    })
    
    if exec_result["code"] == 200:
        print("数据流开始执行")
        
        # 3. 等待执行完成
        final_status = client.wait_for_completion(dataflow_id)
        print(f"最终状态: {final_status}")

注意事项

  1. 认证授权: 生产环境中需要添加适当的认证机制
  2. 限流控制: 注意API调用频率限制
  3. 数据大小: 大量数据操作可能需要较长时间
  4. 并发执行: 同一数据流不能同时执行多个实例
  5. 资源清理: 及时清理不需要的数据流以节省资源

更新日志

  • v1.0 (2025-06-10): 初始版本,包含基础数据流管理功能
  • v1.1 (待发布): 计划增加数据流模板功能

技术支持

如有问题请联系技术支持团队或查看详细的开发文档。

文档版本: v1.0
最后更新: 2025-06-10