123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- from flask import request, jsonify
- from app.api.data_flow import bp
- from app.core.data_flow.dataflows import DataFlowService
- import logging
- from datetime import datetime
- import json
- from app.models.result import success, failed
- from app.core.graph.graph_operations import MyEncoder
- logger = logging.getLogger(__name__)
- @bp.route('/get-dataflows-list', methods=['GET'])
- def get_dataflows():
- """获取数据流列表"""
- try:
- page = request.args.get('page', 1, type=int)
- page_size = request.args.get('page_size', 10, type=int)
- search = request.args.get('search', '')
-
- result = DataFlowService.get_dataflows(page=page, page_size=page_size, search=search)
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据流列表失败: {str(e)}")
- res = failed(f'获取数据流列表失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/get-dataflow/<int:dataflow_id>', methods=['GET'])
- def get_dataflow(dataflow_id):
- """根据ID获取数据流详情"""
- try:
- result = DataFlowService.get_dataflow_by_id(dataflow_id)
- if result:
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- else:
- res = failed("数据流不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据流详情失败: {str(e)}")
- res = failed(f'获取数据流详情失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/add-dataflow', methods=['POST'])
- def create_dataflow():
- """创建新的数据流"""
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
-
- result = DataFlowService.create_dataflow(data)
- res = success(result, "数据流创建成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.error(f"创建数据流参数错误: {str(ve)}")
- res = failed(f'参数错误: {str(ve)}', code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"创建数据流失败: {str(e)}")
- res = failed(f'创建数据流失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/update-dataflow/<int:dataflow_id>', methods=['PUT'])
- def update_dataflow(dataflow_id):
- """更新数据流"""
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
-
- result = DataFlowService.update_dataflow(dataflow_id, data)
- if result:
- res = success(result, "数据流更新成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- else:
- res = failed("数据流不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"更新数据流失败: {str(e)}")
- res = failed(f'更新数据流失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/delete-dataflow/<int:dataflow_id>', methods=['DELETE'])
- def delete_dataflow(dataflow_id):
- """删除数据流"""
- try:
- result = DataFlowService.delete_dataflow(dataflow_id)
- if result:
- res = success({}, "数据流删除成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- else:
- res = failed("数据流不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"删除数据流失败: {str(e)}")
- res = failed(f'删除数据流失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/execute-dataflow/<int:dataflow_id>', methods=['POST'])
- def execute_dataflow(dataflow_id):
- """执行数据流"""
- try:
- data = request.get_json() or {}
- result = DataFlowService.execute_dataflow(dataflow_id, data)
- res = success(result, "数据流执行成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"执行数据流失败: {str(e)}")
- res = failed(f'执行数据流失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/get-dataflow-status/<int:dataflow_id>', methods=['GET'])
- def get_dataflow_status(dataflow_id):
- """获取数据流执行状态"""
- try:
- result = DataFlowService.get_dataflow_status(dataflow_id)
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据流状态失败: {str(e)}")
- res = failed(f'获取数据流状态失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/get-dataflow-logs/<int:dataflow_id>', methods=['GET'])
- def get_dataflow_logs(dataflow_id):
- """获取数据流执行日志"""
- try:
- page = request.args.get('page', 1, type=int)
- page_size = request.args.get('page_size', 50, type=int)
-
- result = DataFlowService.get_dataflow_logs(dataflow_id, page=page, page_size=page_size)
- res = success(result, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据流日志失败: {str(e)}")
- res = failed(f'获取数据流日志失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route('/create-script', methods=['POST'])
- def create_script():
- """使用Deepseek模型生成脚本"""
- try:
- json_data = request.get_json()
- if not json_data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
-
- # 记录接收到的数据用于调试
- logger.info(f"create_script接收到的数据: {json_data}")
- logger.info(f"json_data类型: {type(json_data)}")
-
- # 直接使用前端提交的json_data作为request_data参数
- script_content = DataFlowService.create_script(json_data)
-
- result_data = {
- 'script_content': script_content,
- 'format': 'txt',
- 'generated_at': datetime.now().isoformat()
- }
-
- res = success(result_data, "脚本生成成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.error(f"脚本生成参数错误: {str(ve)}")
- res = failed(f'参数错误: {str(ve)}', code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"脚本生成失败: {str(e)}")
- res = failed(f'脚本生成失败: {str(e)}')
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
|