浏览代码

新增数据流API,增删改查。

maxiaolong 3 周之前
父节点
当前提交
a4e8cc2b68
共有 5 个文件被更改,包括 619 次插入0 次删除
  1. 3 0
      app/__init__.py
  2. 5 0
      app/api/data_flow/__init__.py
  3. 245 0
      app/api/data_flow/routes.py
  4. 1 0
      app/core/data_flow/__init__.py
  5. 365 0
      app/core/data_flow/dataflows.py

+ 3 - 0
app/__init__.py

@@ -30,6 +30,7 @@ def create_app():
     from app.api.system import bp as system_bp
     from app.api.data_source import bp as data_source_bp
     from app.api.data_parse import bp as data_parse_bp
+    from app.api.data_flow import bp as data_flow_bp
 
     app.register_blueprint(meta_bp, url_prefix='/api/meta')
     app.register_blueprint(resource_bp, url_prefix='/api/resource')
@@ -41,6 +42,8 @@ def create_app():
     app.register_blueprint(system_bp, url_prefix='/api/system')
     app.register_blueprint(data_source_bp, url_prefix='/api/datasource')
     app.register_blueprint(data_parse_bp, url_prefix='/api/parse')
+    app.register_blueprint(data_flow_bp, url_prefix='/api/dataflow')
+    
     # Configure logging
     configure_logging(app)
     

+ 5 - 0
app/api/data_flow/__init__.py

@@ -0,0 +1,5 @@
+from flask import Blueprint
+
+bp = Blueprint('data_flow', __name__)
+
+from app.api.data_flow import routes 

+ 245 - 0
app/api/data_flow/routes.py

@@ -0,0 +1,245 @@
+from flask import request, jsonify
+from app.api.data_flow import bp
+from app.core.data_flow.dataflows import DataFlowService
+import logging
+from datetime import datetime
+
+logger = logging.getLogger(__name__)
+
+@bp.route('/get-dataflows-list', methods=['GET'])
+def get_dataflows():
+    """获取数据流列表"""
+    try:
+        page = request.args.get('page', 1, type=int)
+        page_size = request.args.get('page_size', 10, type=int)
+        search = request.args.get('search', '')
+        
+        result = DataFlowService.get_dataflows(page=page, page_size=page_size, search=search)
+        return jsonify({
+            'code': 200,
+            'message': 'success',
+            'data': result
+        })
+    except Exception as e:
+        logger.error(f"获取数据流列表失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'获取数据流列表失败: {str(e)}',
+            'data': None
+        }), 500
+
+@bp.route('/get-dataflow/<int:dataflow_id>', methods=['GET'])
+def get_dataflow(dataflow_id):
+    """根据ID获取数据流详情"""
+    try:
+        result = DataFlowService.get_dataflow_by_id(dataflow_id)
+        if result:
+            return jsonify({
+                'code': 200,
+                'message': 'success',
+                'data': result
+            })
+        else:
+            return jsonify({
+                'code': 404,
+                'message': '数据流不存在',
+                'data': None
+            }), 404
+    except Exception as e:
+        logger.error(f"获取数据流详情失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'获取数据流详情失败: {str(e)}',
+            'data': None
+        }), 500
+
+@bp.route('/add-dataflow', methods=['POST'])
+def create_dataflow():
+    """创建新的数据流"""
+    try:
+        data = request.get_json()
+        if not data:
+            return jsonify({
+                'code': 400,
+                'message': '请求数据不能为空',
+                'data': None
+            }), 400
+            
+        result = DataFlowService.create_dataflow(data)
+        return jsonify({
+            'code': 200,
+            'message': '数据流创建成功',
+            'data': result
+        })
+    except Exception as e:
+        logger.error(f"创建数据流失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'创建数据流失败: {str(e)}',
+            'data': None
+        }), 500
+
+@bp.route('/update-dataflow/<int:dataflow_id>', methods=['PUT'])
+def update_dataflow(dataflow_id):
+    """更新数据流"""
+    try:
+        data = request.get_json()
+        if not data:
+            return jsonify({
+                'code': 400,
+                'message': '请求数据不能为空',
+                'data': None
+            }), 400
+            
+        result = DataFlowService.update_dataflow(dataflow_id, data)
+        if result:
+            return jsonify({
+                'code': 200,
+                'message': '数据流更新成功',
+                'data': result
+            })
+        else:
+            return jsonify({
+                'code': 404,
+                'message': '数据流不存在',
+                'data': None
+            }), 404
+    except Exception as e:
+        logger.error(f"更新数据流失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'更新数据流失败: {str(e)}',
+            'data': None
+        }), 500
+
+@bp.route('/delete-dataflow/<int:dataflow_id>', methods=['DELETE'])
+def delete_dataflow(dataflow_id):
+    """删除数据流"""
+    try:
+        result = DataFlowService.delete_dataflow(dataflow_id)
+        if result:
+            return jsonify({
+                'code': 200,
+                'message': '数据流删除成功',
+                'data': None
+            })
+        else:
+            return jsonify({
+                'code': 404,
+                'message': '数据流不存在',
+                'data': None
+            }), 404
+    except Exception as e:
+        logger.error(f"删除数据流失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'删除数据流失败: {str(e)}',
+            'data': None
+        }), 500
+
+@bp.route('/execute-dataflow/<int:dataflow_id>', methods=['POST'])
+def execute_dataflow(dataflow_id):
+    """执行数据流"""
+    try:
+        data = request.get_json() or {}
+        result = DataFlowService.execute_dataflow(dataflow_id, data)
+        return jsonify({
+            'code': 200,
+            'message': '数据流执行成功',
+            'data': result
+        })
+    except Exception as e:
+        logger.error(f"执行数据流失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'执行数据流失败: {str(e)}',
+            'data': None
+        }), 500
+
+@bp.route('/get-dataflow-status/<int:dataflow_id>', methods=['GET'])
+def get_dataflow_status(dataflow_id):
+    """获取数据流执行状态"""
+    try:
+        result = DataFlowService.get_dataflow_status(dataflow_id)
+        return jsonify({
+            'code': 200,
+            'message': 'success',
+            'data': result
+        })
+    except Exception as e:
+        logger.error(f"获取数据流状态失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'获取数据流状态失败: {str(e)}',
+            'data': None
+        }), 500
+
+@bp.route('/get-dataflow-logs/<int:dataflow_id>', methods=['GET'])
+def get_dataflow_logs(dataflow_id):
+    """获取数据流执行日志"""
+    try:
+        page = request.args.get('page', 1, type=int)
+        page_size = request.args.get('page_size', 50, type=int)
+        
+        result = DataFlowService.get_dataflow_logs(dataflow_id, page=page, page_size=page_size)
+        return jsonify({
+            'code': 200,
+            'message': 'success',
+            'data': result
+        })
+    except Exception as e:
+        logger.error(f"获取数据流日志失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'获取数据流日志失败: {str(e)}',
+            'data': None
+        }), 500
+
+@bp.route('/create-script', methods=['POST'])
+def create_script():
+    """使用Deepseek模型生成脚本"""
+    try:
+        json_data = request.get_json()
+        if not json_data:
+            return jsonify({
+                'code': 400,
+                'message': '请求数据不能为空',
+                'data': None
+            }), 400
+        
+        # 提取文本描述
+        request_data = json_data.get('request_data')
+        
+        if not request_data or not isinstance(request_data, str):
+            return jsonify({
+                'code': 400,
+                'message': '请求数据必须是文本描述',
+                'data': None
+            }), 400
+        
+        # 调用DataFlowService的create_script方法
+        script_content = DataFlowService.create_script(request_data)
+        
+        return jsonify({
+            'code': 200,
+            'message': '脚本生成成功',
+            'data': {
+                'script_content': script_content,
+                'format': 'txt',
+                'generated_at': datetime.now().isoformat()
+            }
+        })
+    except ValueError as ve:
+        logger.error(f"脚本生成参数错误: {str(ve)}")
+        return jsonify({
+            'code': 400,
+            'message': f'参数错误: {str(ve)}',
+            'data': None
+        }), 400
+    except Exception as e:
+        logger.error(f"脚本生成失败: {str(e)}")
+        return jsonify({
+            'code': 500,
+            'message': f'脚本生成失败: {str(e)}',
+            'data': None
+        }), 500 

+ 1 - 0
app/core/data_flow/__init__.py

@@ -0,0 +1 @@
+# Data Flow Core Module 

+ 365 - 0
app/core/data_flow/dataflows.py

@@ -0,0 +1,365 @@
+import logging
+from typing import Dict, List, Optional, Any
+from datetime import datetime
+import json
+from app.core.llm.llm_service import llm_client
+
+logger = logging.getLogger(__name__)
+
+class DataFlowService:
+    """数据流服务类,处理数据流相关的业务逻辑"""
+    
+    @staticmethod
+    def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
+        """
+        获取数据流列表
+        
+        Args:
+            page: 页码
+            page_size: 每页大小
+            search: 搜索关键词
+            
+        Returns:
+            包含数据流列表和分页信息的字典
+        """
+        try:
+            # TODO: 这里应该从数据库查询数据流列表
+            # 目前返回模拟数据
+            mock_dataflows = [
+                {
+                    'id': i,
+                    'name': f'数据流_{i}',
+                    'description': f'这是第{i}个数据流的描述',
+                    'status': 'active' if i % 2 == 0 else 'inactive',
+                    'created_at': datetime.now().isoformat(),
+                    'updated_at': datetime.now().isoformat(),
+                    'created_by': f'user_{i % 3 + 1}'
+                }
+                for i in range(1, 21)
+            ]
+            
+            # 简单的搜索过滤
+            if search:
+                mock_dataflows = [
+                    df for df in mock_dataflows 
+                    if search.lower() in df['name'].lower() or search.lower() in df['description'].lower()
+                ]
+            
+            # 分页处理
+            total = len(mock_dataflows)
+            start = (page - 1) * page_size
+            end = start + page_size
+            dataflows = mock_dataflows[start:end]
+            
+            return {
+                'list': dataflows,
+                'pagination': {
+                    'page': page,
+                    'page_size': page_size,
+                    'total': total,
+                    'total_pages': (total + page_size - 1) // page_size
+                }
+            }
+        except Exception as e:
+            logger.error(f"获取数据流列表失败: {str(e)}")
+            raise e
+    
+    @staticmethod
+    def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
+        """
+        根据ID获取数据流详情
+        
+        Args:
+            dataflow_id: 数据流ID
+            
+        Returns:
+            数据流详情字典,如果不存在则返回None
+        """
+        try:
+            # TODO: 这里应该从数据库查询指定的数据流
+            # 目前返回模拟数据
+            if dataflow_id <= 0 or dataflow_id > 20:
+                return None
+                
+            return {
+                'id': dataflow_id,
+                'name': f'数据流_{dataflow_id}',
+                'description': f'这是第{dataflow_id}个数据流的详细描述',
+                'status': 'active' if dataflow_id % 2 == 0 else 'inactive',
+                'created_at': datetime.now().isoformat(),
+                'updated_at': datetime.now().isoformat(),
+                'created_by': f'user_{dataflow_id % 3 + 1}',
+                'config': {
+                    'source': {
+                        'type': 'database',
+                        'connection': 'mysql://localhost:3306/test'
+                    },
+                    'target': {
+                        'type': 'file',
+                        'path': '/data/output/'
+                    },
+                    'transformations': [
+                        {'type': 'filter', 'condition': 'age > 18'},
+                        {'type': 'aggregate', 'group_by': 'department', 'function': 'count'}
+                    ]
+                }
+            }
+        except Exception as e:
+            logger.error(f"获取数据流详情失败: {str(e)}")
+            raise e
+    
+    @staticmethod
+    def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        创建新的数据流
+        
+        Args:
+            data: 数据流配置数据
+            
+        Returns:
+            创建的数据流信息
+        """
+        try:
+            # TODO: 这里应该验证数据并保存到数据库
+            # 目前返回模拟数据
+            required_fields = ['name', 'description']
+            for field in required_fields:
+                if field not in data:
+                    raise ValueError(f"缺少必填字段: {field}")
+            
+            new_dataflow = {
+                'id': 21,  # 模拟新生成的ID
+                'name': data['name'],
+                'description': data['description'],
+                'status': 'inactive',
+                'created_at': datetime.now().isoformat(),
+                'updated_at': datetime.now().isoformat(),
+                'created_by': 'current_user',
+                'config': data.get('config', {})
+            }
+            
+            logger.info(f"创建数据流成功: {new_dataflow['name']}")
+            return new_dataflow
+        except Exception as e:
+            logger.error(f"创建数据流失败: {str(e)}")
+            raise e
+    
+    @staticmethod
+    def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+        """
+        更新数据流
+        
+        Args:
+            dataflow_id: 数据流ID
+            data: 更新的数据
+            
+        Returns:
+            更新后的数据流信息,如果不存在则返回None
+        """
+        try:
+            # TODO: 这里应该更新数据库中的数据流
+            # 目前返回模拟数据
+            if dataflow_id <= 0 or dataflow_id > 20:
+                return None
+            
+            updated_dataflow = {
+                'id': dataflow_id,
+                'name': data.get('name', f'数据流_{dataflow_id}'),
+                'description': data.get('description', f'这是第{dataflow_id}个数据流的描述'),
+                'status': data.get('status', 'active' if dataflow_id % 2 == 0 else 'inactive'),
+                'created_at': datetime.now().isoformat(),
+                'updated_at': datetime.now().isoformat(),
+                'created_by': f'user_{dataflow_id % 3 + 1}',
+                'config': data.get('config', {})
+            }
+            
+            logger.info(f"更新数据流成功: ID={dataflow_id}")
+            return updated_dataflow
+        except Exception as e:
+            logger.error(f"更新数据流失败: {str(e)}")
+            raise e
+    
+    @staticmethod
+    def delete_dataflow(dataflow_id: int) -> bool:
+        """
+        删除数据流
+        
+        Args:
+            dataflow_id: 数据流ID
+            
+        Returns:
+            删除是否成功
+        """
+        try:
+            # TODO: 这里应该从数据库删除数据流
+            # 目前返回模拟结果
+            if dataflow_id <= 0 or dataflow_id > 20:
+                return False
+            
+            logger.info(f"删除数据流成功: ID={dataflow_id}")
+            return True
+        except Exception as e:
+            logger.error(f"删除数据流失败: {str(e)}")
+            raise e
+    
+    @staticmethod
+    def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
+        """
+        执行数据流
+        
+        Args:
+            dataflow_id: 数据流ID
+            params: 执行参数
+            
+        Returns:
+            执行结果信息
+        """
+        try:
+            # TODO: 这里应该实际执行数据流
+            # 目前返回模拟结果
+            if dataflow_id <= 0 or dataflow_id > 20:
+                raise ValueError(f"数据流不存在: ID={dataflow_id}")
+            
+            execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
+            
+            result = {
+                'execution_id': execution_id,
+                'dataflow_id': dataflow_id,
+                'status': 'running',
+                'started_at': datetime.now().isoformat(),
+                'params': params or {},
+                'progress': 0
+            }
+            
+            logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
+            return result
+        except Exception as e:
+            logger.error(f"执行数据流失败: {str(e)}")
+            raise e
+    
+    @staticmethod
+    def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
+        """
+        获取数据流执行状态
+        
+        Args:
+            dataflow_id: 数据流ID
+            
+        Returns:
+            执行状态信息
+        """
+        try:
+            # TODO: 这里应该查询实际的执行状态
+            # 目前返回模拟状态
+            if dataflow_id <= 0 or dataflow_id > 20:
+                raise ValueError(f"数据流不存在: ID={dataflow_id}")
+            
+            status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
+            
+            return {
+                'dataflow_id': dataflow_id,
+                'status': status,
+                'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
+                'started_at': datetime.now().isoformat(),
+                'completed_at': datetime.now().isoformat() if status == 'completed' else None,
+                'error_message': '执行过程中发生错误' if status == 'failed' else None
+            }
+        except Exception as e:
+            logger.error(f"获取数据流状态失败: {str(e)}")
+            raise e
+    
+    @staticmethod
+    def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
+        """
+        获取数据流执行日志
+        
+        Args:
+            dataflow_id: 数据流ID
+            page: 页码
+            page_size: 每页大小
+            
+        Returns:
+            执行日志列表和分页信息
+        """
+        try:
+            # TODO: 这里应该查询实际的执行日志
+            # 目前返回模拟日志
+            if dataflow_id <= 0 or dataflow_id > 20:
+                raise ValueError(f"数据流不存在: ID={dataflow_id}")
+            
+            mock_logs = [
+                {
+                    'id': i,
+                    'timestamp': datetime.now().isoformat(),
+                    'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
+                    'message': f'数据流执行日志消息 {i}',
+                    'component': ['source', 'transform', 'target'][i % 3]
+                }
+                for i in range(1, 101)
+            ]
+            
+            # 分页处理
+            total = len(mock_logs)
+            start = (page - 1) * page_size
+            end = start + page_size
+            logs = mock_logs[start:end]
+            
+            return {
+                'logs': logs,
+                'pagination': {
+                    'page': page,
+                    'page_size': page_size,
+                    'total': total,
+                    'total_pages': (total + page_size - 1) // page_size
+                }
+            }
+        except Exception as e:
+            logger.error(f"获取数据流日志失败: {str(e)}")
+            raise e
+
+    @staticmethod
+    def create_script(request_data: str) -> str:
+        """
+        使用Deepseek模型生成脚本
+        
+        Args:
+            request_data: 请求数据,用户需求的文本描述
+            
+        Returns:
+            生成的脚本内容(TXT格式)
+        """
+        try:
+            # 构建prompt
+            prompt_parts = []
+            
+            # 添加系统提示
+            prompt_parts.append("请根据以下需求生成相应的数据处理脚本:")
+            
+            # 直接将request_data作为文本描述添加到prompt中
+            prompt_parts.append(request_data)
+            
+            # 添加格式要求
+            prompt_parts.append("\n请生成完整可执行的脚本代码,包含必要的注释和错误处理。")
+            
+            # 组合prompt
+            full_prompt = "\n\n".join(prompt_parts)
+            
+            logger.info(f"开始调用Deepseek模型生成脚本,prompt长度: {len(full_prompt)}")
+            
+            # 调用LLM服务
+            script_content = llm_client(full_prompt)
+            
+            if not script_content:
+                raise ValueError("Deepseek模型返回空内容")
+            
+            # 确保返回的是文本格式
+            if not isinstance(script_content, str):
+                script_content = str(script_content)
+            
+            logger.info(f"脚本生成成功,内容长度: {len(script_content)}")
+            
+            return script_content
+            
+        except Exception as e:
+            logger.error(f"生成脚本失败: {str(e)}")
+            raise e