from flask import request, jsonify from app.api.data_flow import bp from app.core.data_flow.dataflows import DataFlowService import logging from datetime import datetime import json from app.models.result import success, failed from app.core.graph.graph_operations import MyEncoder logger = logging.getLogger(__name__) @bp.route('/get-dataflows-list', methods=['GET']) def get_dataflows(): """获取数据流列表""" try: page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 10, type=int) search = request.args.get('search', '') result = DataFlowService.get_dataflows(page=page, page_size=page_size, search=search) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据流列表失败: {str(e)}") res = failed(f'获取数据流列表失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/get-dataflow/', methods=['GET']) def get_dataflow(dataflow_id): """根据ID获取数据流详情""" try: result = DataFlowService.get_dataflow_by_id(dataflow_id) if result: res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) else: res = failed("数据流不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据流详情失败: {str(e)}") res = failed(f'获取数据流详情失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/add-dataflow', methods=['POST']) def create_dataflow(): """创建新的数据流""" try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) result = DataFlowService.create_dataflow(data) res = success(result, "数据流创建成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.error(f"创建数据流参数错误: {str(ve)}") res = failed(f'参数错误: {str(ve)}', code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"创建数据流失败: {str(e)}") res = failed(f'创建数据流失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/update-dataflow/', methods=['PUT']) def update_dataflow(dataflow_id): """更新数据流""" try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) result = DataFlowService.update_dataflow(dataflow_id, data) if result: res = success(result, "数据流更新成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) else: res = failed("数据流不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"更新数据流失败: {str(e)}") res = failed(f'更新数据流失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/delete-dataflow/', methods=['DELETE']) def delete_dataflow(dataflow_id): """删除数据流""" try: result = DataFlowService.delete_dataflow(dataflow_id) if result: res = success({}, "数据流删除成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) else: res = failed("数据流不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"删除数据流失败: {str(e)}") res = failed(f'删除数据流失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/execute-dataflow/', methods=['POST']) def execute_dataflow(dataflow_id): """执行数据流""" try: data = request.get_json() or {} result = DataFlowService.execute_dataflow(dataflow_id, data) res = success(result, "数据流执行成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"执行数据流失败: {str(e)}") res = failed(f'执行数据流失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/get-dataflow-status/', methods=['GET']) def get_dataflow_status(dataflow_id): """获取数据流执行状态""" try: result = DataFlowService.get_dataflow_status(dataflow_id) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据流状态失败: {str(e)}") res = failed(f'获取数据流状态失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/get-dataflow-logs/', methods=['GET']) def get_dataflow_logs(dataflow_id): """获取数据流执行日志""" try: page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 50, type=int) result = DataFlowService.get_dataflow_logs(dataflow_id, page=page, page_size=page_size) res = success(result, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据流日志失败: {str(e)}") res = failed(f'获取数据流日志失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route('/create-script', methods=['POST']) def create_script(): """使用Deepseek模型生成脚本""" try: json_data = request.get_json() if not json_data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 直接使用前端提交的json_data作为request_data参数 script_content = DataFlowService.create_script(json_data) result_data = { 'script_content': script_content, 'format': 'txt', 'generated_at': datetime.now().isoformat() } res = success(result_data, "脚本生成成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.error(f"脚本生成参数错误: {str(ve)}") res = failed(f'参数错误: {str(ve)}', code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"脚本生成失败: {str(e)}") res = failed(f'脚本生成失败: {str(e)}') return json.dumps(res, ensure_ascii=False, cls=MyEncoder)