DataFlow API 提供了完整的数据流管理功能,包括数据流的创建、查询、更新、删除、执行等操作。系统基于Neo4j图数据库和PostgreSQL关系数据库的混合架构,支持复杂的数据流编排和管理。
版本: v1.0
基础URL: /api/data-flow
内容类型: application/json
字符编码: UTF-8
所有API接口都遵循统一的响应格式:
{
"code": 200, // HTTP状态码
"message": "操作成功", // 操作结果消息
"data": {} // 返回的具体数据
}
状态码 | 含义 | 说明 |
---|---|---|
200 | 成功 | 请求成功执行 |
400 | 请求错误 | 请求参数错误或缺失 |
404 | 未找到 | 请求的资源不存在 |
500 | 服务器错误 | 服务器内部错误 |
接口描述: 获取数据流列表,支持分页和搜索功能。
GET
/get-dataflows-list
Content-Type: application/json
参数名 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
page | int | 否 | 1 | 页码,从1开始 |
page_size | int | 否 | 10 | 每页记录数,最大100 |
search | string | 否 | "" | 搜索关键词,支持名称和描述模糊匹配 |
{
"code": 200,
"message": "success",
"data": {
"list": [
{
"id": 1,
"name": "用户数据ETL流程",
"en_name": "user_data_etl_process",
"description": "处理用户数据的ETL流程",
"status": "active",
"created_at": "2023-12-01 10:00:00",
"updated_at": "2023-12-01 12:00:00",
"created_by": "admin",
"config": "{\"source\": \"mysql\", \"target\": \"elasticsearch\"}"
}
],
"pagination": {
"page": 1,
"page_size": 10,
"total": 25,
"total_pages": 3
}
}
}
Python示例:
import requests
url = "http://your-domain/api/data-flow/get-dataflows-list"
params = {
"page": 1,
"page_size": 10,
"search": "ETL"
}
response = requests.get(url, params=params)
data = response.json()
if data["code"] == 200:
dataflows = data["data"]["list"]
for df in dataflows:
print(f"数据流: {df['name']} - {df['description']}")
JavaScript示例:
const params = new URLSearchParams({
page: 1,
page_size: 10,
search: 'ETL'
});
fetch(`/api/data-flow/get-dataflows-list?${params}`)
.then(response => response.json())
.then(data => {
if (data.code === 200) {
console.log('数据流列表:', data.data.list);
}
})
.catch(error => console.error('错误:', error));
GET /api/data-flow/get-dataflows-list?page=1&page_size=5&search=数据处理
接口描述: 根据数据流ID获取详细信息,包括关联的标签、子节点和父节点。
GET
/get-dataflow/{dataflow_id}
Content-Type: application/json
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
dataflow_id | int | 是 | 数据流ID |
{
"code": 200,
"message": "success",
"data": {
"id": 1,
"name": "用户数据ETL流程",
"en_name": "user_data_etl_process",
"description": "处理用户数据的ETL流程",
"status": "active",
"created_at": "2023-12-01 10:00:00",
"updated_at": "2023-12-01 12:00:00",
"created_by": "admin",
"config": "{\"source\": \"mysql\", \"target\": \"elasticsearch\"}",
"tags": [
{"id": 10, "name": "数据处理"},
{"id": 11, "name": "ETL"}
],
"children": [
{"id": 2, "name": "数据清洗子流程"},
{"id": 3, "name": "数据转换子流程"}
],
"parents": [
{"id": 0, "name": "主数据流"}
]
}
}
{
"code": 404,
"message": "数据流不存在",
"data": null
}
Python示例:
import requests
dataflow_id = 1
url = f"http://your-domain/api/data-flow/get-dataflow/{dataflow_id}"
response = requests.get(url)
data = response.json()
if data["code"] == 200:
dataflow = data["data"]
print(f"数据流名称: {dataflow['name']}")
print(f"关联标签: {[tag['name'] for tag in dataflow['tags']]}")
elif data["code"] == 404:
print("数据流不存在")
GET /api/data-flow/get-dataflow/1
接口描述: 创建新的数据流,自动生成英文名称,支持标签和子节点关联。
POST
/add-dataflow
Content-Type: application/json
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
name | string | 是 | 数据流名称 |
description | string | 是 | 数据流描述 |
status | string | 否 | 状态,默认"inactive",可选值: "active", "inactive" |
created_by | string | 否 | 创建者,默认"system" |
config | object | 否 | 配置信息,JSON对象 |
children_ids | array | 否 | 子节点ID列表 |
tag_id | int | 否 | 关联标签ID |
{
"name": "客户数据同步流程",
"description": "将客户数据从CRM同步到数据仓库",
"status": "active",
"created_by": "admin",
"config": {
"source": {
"type": "mysql",
"host": "192.168.1.100",
"database": "crm"
},
"target": {
"type": "postgresql",
"host": "192.168.1.200",
"database": "datawarehouse"
},
"schedule": "0 2 * * *"
},
"children_ids": [2, 3],
"tag_id": 10
}
{
"code": 200,
"message": "数据流创建成功",
"data": {
"id": 5,
"name": "客户数据同步流程",
"en_name": "customer_data_sync_process",
"description": "将客户数据从CRM同步到数据仓库",
"status": "active",
"created_at": "2023-12-01 14:30:00",
"updated_at": "2023-12-01 14:30:00",
"created_by": "admin",
"config": "{\"source\":{\"type\":\"mysql\",\"host\":\"192.168.1.100\",\"database\":\"crm\"},\"target\":{\"type\":\"postgresql\",\"host\":\"192.168.1.200\",\"database\":\"datawarehouse\"},\"schedule\":\"0 2 * * *\"}"
}
}
{
"code": 400,
"message": "参数错误: 缺少必填字段: name",
"data": null
}
Python示例:
import requests
import json
url = "http://your-domain/api/data-flow/add-dataflow"
data = {
"name": "订单数据处理流程",
"description": "处理电商订单数据",
"status": "active",
"created_by": "data_team",
"config": {
"batch_size": 1000,
"retry_count": 3
}
}
response = requests.post(url, json=data)
result = response.json()
if result["code"] == 200:
print(f"创建成功,数据流ID: {result['data']['id']}")
else:
print(f"创建失败: {result['message']}")
{
"name": "测试数据流",
"description": "用于测试的数据流",
"status": "inactive",
"config": {"test": true}
}
接口描述: 更新现有数据流的信息,支持部分字段更新。
PUT
/update-dataflow/{dataflow_id}
Content-Type: application/json
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
dataflow_id | int | 是 | 数据流ID |
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
name | string | 否 | 数据流名称 |
description | string | 否 | 数据流描述 |
status | string | 否 | 状态 |
config | object | 否 | 配置信息 |
{
"description": "更新后的数据流描述",
"status": "active",
"config": {
"batch_size": 2000,
"timeout": 300
}
}
{
"code": 200,
"message": "数据流更新成功",
"data": {
"id": 1,
"name": "用户数据ETL流程",
"en_name": "user_data_etl_process",
"description": "更新后的数据流描述",
"status": "active",
"created_at": "2023-12-01 10:00:00",
"updated_at": "2023-12-01 15:45:00",
"created_by": "admin",
"config": "{\"batch_size\":2000,\"timeout\":300}"
}
}
Python示例:
import requests
dataflow_id = 1
url = f"http://your-domain/api/data-flow/update-dataflow/{dataflow_id}"
data = {
"status": "active",
"config": {"enabled": True}
}
response = requests.put(url, json=data)
result = response.json()
if result["code"] == 200:
print("更新成功")
elif result["code"] == 404:
print("数据流不存在")
接口描述: 删除指定的数据流及其所有关联关系。
DELETE
/delete-dataflow/{dataflow_id}
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
dataflow_id | int | 是 | 数据流ID |
成功响应:
{
"code": 200,
"message": "数据流删除成功",
"data": {}
}
失败响应:
{
"code": 404,
"message": "数据流不存在",
"data": {}
}
Python示例:
import requests
dataflow_id = 1
url = f"http://your-domain/api/data-flow/delete-dataflow/{dataflow_id}"
response = requests.delete(url)
result = response.json()
if result["code"] == 200:
print("删除成功")
elif result["code"] == 404:
print("数据流不存在")
DELETE /api/data-flow/delete-dataflow/1
接口描述: 启动数据流执行,支持传入执行参数。
POST
/execute-dataflow/{dataflow_id}
Content-Type: application/json
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
dataflow_id | int | 是 | 数据流ID |
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
params | object | 否 | 执行参数,JSON对象 |
{
"params": {
"start_date": "2023-12-01",
"end_date": "2023-12-07",
"force_refresh": true
}
}
{
"code": 200,
"message": "数据流执行成功",
"data": {
"execution_id": "exec_1_1701420000",
"dataflow_id": 1,
"status": "running",
"started_at": "2023-12-01T16:00:00",
"params": {
"start_date": "2023-12-01",
"end_date": "2023-12-07",
"force_refresh": true
},
"progress": 0
}
}
Python示例:
import requests
dataflow_id = 1
url = f"http://your-domain/api/data-flow/execute-dataflow/{dataflow_id}"
data = {
"params": {
"batch_size": 500,
"parallel_workers": 4
}
}
response = requests.post(url, json=data)
result = response.json()
if result["code"] == 200:
execution_id = result["data"]["execution_id"]
print(f"执行开始,执行ID: {execution_id}")
接口描述: 查询数据流的当前执行状态和进度。
GET
/get-dataflow-status/{dataflow_id}
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
dataflow_id | int | 是 | 数据流ID |
{
"code": 200,
"message": "success",
"data": {
"dataflow_id": 1,
"status": "running",
"progress": 45,
"started_at": "2023-12-01T16:00:00",
"completed_at": null,
"error_message": null
}
}
状态值 | 含义 | 说明 |
---|---|---|
pending | 等待中 | 数据流等待执行 |
running | 运行中 | 数据流正在执行 |
completed | 已完成 | 数据流执行成功完成 |
failed | 执行失败 | 数据流执行过程中出现错误 |
JavaScript示例:
function checkDataflowStatus(dataflowId) {
fetch(`/api/data-flow/get-dataflow-status/${dataflowId}`)
.then(response => response.json())
.then(data => {
if (data.code === 200) {
const status = data.data;
console.log(`状态: ${status.status}, 进度: ${status.progress}%`);
if (status.status === 'running') {
// 如果还在运行,5秒后再次检查
setTimeout(() => checkDataflowStatus(dataflowId), 5000);
}
}
});
}
接口描述: 获取数据流执行过程中的详细日志信息,支持分页。
GET
/get-dataflow-logs/{dataflow_id}
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
dataflow_id | int | 是 | 数据流ID |
参数名 | 类型 | 必填 | 默认值 | 说明 |
---|---|---|---|---|
page | int | 否 | 1 | 页码 |
page_size | int | 否 | 50 | 每页记录数 |
{
"code": 200,
"message": "success",
"data": {
"logs": [
{
"id": 1,
"timestamp": "2023-12-01T16:00:00",
"level": "INFO",
"message": "数据流开始执行",
"component": "source"
},
{
"id": 2,
"timestamp": "2023-12-01T16:00:30",
"level": "INFO",
"message": "成功连接到数据源",
"component": "source"
},
{
"id": 3,
"timestamp": "2023-12-01T16:01:00",
"level": "WARNING",
"message": "发现重复数据,已自动处理",
"component": "transform"
}
],
"pagination": {
"page": 1,
"page_size": 50,
"total": 100,
"total_pages": 2
}
}
}
级别 | 含义 | 说明 |
---|---|---|
INFO | 信息 | 一般性信息日志 |
WARNING | 警告 | 警告信息,不影响执行 |
ERROR | 错误 | 错误信息,可能导致执行失败 |
Python示例:
import requests
dataflow_id = 1
url = f"http://your-domain/api/data-flow/get-dataflow-logs/{dataflow_id}"
params = {"page": 1, "page_size": 20}
response = requests.get(url, params=params)
data = response.json()
if data["code"] == 200:
logs = data["data"]["logs"]
for log in logs:
print(f"[{log['timestamp']}] {log['level']}: {log['message']}")
接口描述: 使用Deepseek AI模型根据文本描述生成数据处理脚本。
POST
/create-script
Content-Type: application/json
参数名 | 类型 | 必填 | 说明 |
---|---|---|---|
request_data | string | 是 | 需求描述文本 |
{
"request_data": "请生成一个Python脚本,从MySQL数据库读取用户表数据,清洗无效数据后写入Elasticsearch。需要包含错误处理和日志记录功能。"
}
{
"code": 200,
"message": "脚本生成成功",
"data": {
"script_content": "#!/usr/bin/env python3\n# -*- coding: utf-8 -*-\n\nimport mysql.connector\nfrom elasticsearch import Elasticsearch\nimport logging\nimport json\nfrom datetime import datetime\n\n# 配置日志\nlogging.basicConfig(level=logging.INFO)\nlogger = logging.getLogger(__name__)\n\ndef main():\n try:\n # MySQL连接配置\n mysql_config = {\n 'host': 'localhost',\n 'user': 'username',\n 'password': 'password',\n 'database': 'database_name'\n }\n \n # Elasticsearch连接配置\n es = Elasticsearch([{'host': 'localhost', 'port': 9200}])\n \n # 连接MySQL\n conn = mysql.connector.connect(**mysql_config)\n cursor = conn.cursor(dictionary=True)\n \n # 查询用户数据\n query = \"SELECT * FROM users WHERE status = 'active'\"\n cursor.execute(query)\n \n # 处理数据\n for row in cursor:\n # 数据清洗\n if validate_user_data(row):\n # 写入Elasticsearch\n es.index(index='users', body=row)\n logger.info(f\"用户数据已写入ES: {row['id']}\")\n else:\n logger.warning(f\"无效用户数据跳过: {row['id']}\")\n \n except Exception as e:\n logger.error(f\"脚本执行失败: {str(e)}\")\n finally:\n if 'conn' in locals():\n conn.close()\n\ndef validate_user_data(user):\n \"\"\"验证用户数据有效性\"\"\"\n return user.get('email') and '@' in user.get('email', '')\n\nif __name__ == '__main__':\n main()",
"format": "txt",
"generated_at": "2023-12-01T16:30:00"
}
}
Python示例:
import requests
url = "http://your-domain/api/data-flow/create-script"
data = {
"request_data": "生成一个数据备份脚本,每天凌晨2点自动备份PostgreSQL数据库到云存储"
}
response = requests.post(url, json=data)
result = response.json()
if result["code"] == 200:
script = result["data"]["script_content"]
# 保存脚本到文件
with open("backup_script.py", "w", encoding="utf-8") as f:
f.write(script)
print("脚本生成并保存成功")
{
"request_data": "创建一个简单的CSV文件处理脚本,读取销售数据并计算月度汇总"
}
错误码 | 错误类型 | 解决方案 |
---|---|---|
400 | 请求参数错误 | 检查请求参数格式和必填字段 |
404 | 资源不存在 | 确认资源ID是否正确 |
500 | 服务器内部错误 | 查看服务器日志,联系技术支持 |
{
"code": 400,
"message": "参数错误: 缺少必填字段: name",
"data": null
}
以下是一个完整的数据流管理工作流示例:
import requests
import time
import json
class DataFlowClient:
def __init__(self, base_url):
self.base_url = base_url
def create_dataflow(self, name, description, config=None):
"""创建数据流"""
url = f"{self.base_url}/add-dataflow"
data = {
"name": name,
"description": description,
"status": "active",
"config": config or {}
}
response = requests.post(url, json=data)
return response.json()
def execute_dataflow(self, dataflow_id, params=None):
"""执行数据流"""
url = f"{self.base_url}/execute-dataflow/{dataflow_id}"
data = {"params": params or {}}
response = requests.post(url, json=data)
return response.json()
def wait_for_completion(self, dataflow_id, timeout=300):
"""等待数据流执行完成"""
start_time = time.time()
while time.time() - start_time < timeout:
url = f"{self.base_url}/get-dataflow-status/{dataflow_id}"
response = requests.get(url)
data = response.json()
if data["code"] == 200:
status = data["data"]["status"]
progress = data["data"]["progress"]
print(f"执行状态: {status}, 进度: {progress}%")
if status in ["completed", "failed"]:
return status
time.sleep(5)
return "timeout"
# 使用示例
client = DataFlowClient("http://your-domain/api/data-flow")
# 1. 创建数据流
result = client.create_dataflow(
"销售数据分析",
"分析每日销售数据并生成报表",
{"source": "mysql", "target": "report"}
)
if result["code"] == 200:
dataflow_id = result["data"]["id"]
print(f"数据流创建成功,ID: {dataflow_id}")
# 2. 执行数据流
exec_result = client.execute_dataflow(dataflow_id, {
"date_range": "2023-12-01,2023-12-07"
})
if exec_result["code"] == 200:
print("数据流开始执行")
# 3. 等待执行完成
final_status = client.wait_for_completion(dataflow_id)
print(f"最终状态: {final_status}")
如有问题请联系技术支持团队或查看详细的开发文档。
文档版本: v1.0
最后更新: 2025-06-10