import logging from typing import Dict, List, Optional, Any from datetime import datetime import json from app.core.llm.llm_service import llm_client logger = logging.getLogger(__name__) class DataFlowService: """数据流服务类,处理数据流相关的业务逻辑""" @staticmethod def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]: """ 获取数据流列表 Args: page: 页码 page_size: 每页大小 search: 搜索关键词 Returns: 包含数据流列表和分页信息的字典 """ try: # TODO: 这里应该从数据库查询数据流列表 # 目前返回模拟数据 mock_dataflows = [ { 'id': i, 'name': f'数据流_{i}', 'description': f'这是第{i}个数据流的描述', 'status': 'active' if i % 2 == 0 else 'inactive', 'created_at': datetime.now().isoformat(), 'updated_at': datetime.now().isoformat(), 'created_by': f'user_{i % 3 + 1}' } for i in range(1, 21) ] # 简单的搜索过滤 if search: mock_dataflows = [ df for df in mock_dataflows if search.lower() in df['name'].lower() or search.lower() in df['description'].lower() ] # 分页处理 total = len(mock_dataflows) start = (page - 1) * page_size end = start + page_size dataflows = mock_dataflows[start:end] return { 'list': dataflows, 'pagination': { 'page': page, 'page_size': page_size, 'total': total, 'total_pages': (total + page_size - 1) // page_size } } except Exception as e: logger.error(f"获取数据流列表失败: {str(e)}") raise e @staticmethod def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取数据流详情 Args: dataflow_id: 数据流ID Returns: 数据流详情字典,如果不存在则返回None """ try: # TODO: 这里应该从数据库查询指定的数据流 # 目前返回模拟数据 if dataflow_id <= 0 or dataflow_id > 20: return None return { 'id': dataflow_id, 'name': f'数据流_{dataflow_id}', 'description': f'这是第{dataflow_id}个数据流的详细描述', 'status': 'active' if dataflow_id % 2 == 0 else 'inactive', 'created_at': datetime.now().isoformat(), 'updated_at': datetime.now().isoformat(), 'created_by': f'user_{dataflow_id % 3 + 1}', 'config': { 'source': { 'type': 'database', 'connection': 'mysql://localhost:3306/test' }, 'target': { 'type': 'file', 'path': '/data/output/' }, 'transformations': [ {'type': 'filter', 'condition': 'age > 18'}, {'type': 'aggregate', 'group_by': 'department', 'function': 'count'} ] } } except Exception as e: logger.error(f"获取数据流详情失败: {str(e)}") raise e @staticmethod def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]: """ 创建新的数据流 Args: data: 数据流配置数据 Returns: 创建的数据流信息 """ try: # TODO: 这里应该验证数据并保存到数据库 # 目前返回模拟数据 required_fields = ['name', 'description'] for field in required_fields: if field not in data: raise ValueError(f"缺少必填字段: {field}") new_dataflow = { 'id': 21, # 模拟新生成的ID 'name': data['name'], 'description': data['description'], 'status': 'inactive', 'created_at': datetime.now().isoformat(), 'updated_at': datetime.now().isoformat(), 'created_by': 'current_user', 'config': data.get('config', {}) } logger.info(f"创建数据流成功: {new_dataflow['name']}") return new_dataflow except Exception as e: logger.error(f"创建数据流失败: {str(e)}") raise e @staticmethod def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 更新数据流 Args: dataflow_id: 数据流ID data: 更新的数据 Returns: 更新后的数据流信息,如果不存在则返回None """ try: # TODO: 这里应该更新数据库中的数据流 # 目前返回模拟数据 if dataflow_id <= 0 or dataflow_id > 20: return None updated_dataflow = { 'id': dataflow_id, 'name': data.get('name', f'数据流_{dataflow_id}'), 'description': data.get('description', f'这是第{dataflow_id}个数据流的描述'), 'status': data.get('status', 'active' if dataflow_id % 2 == 0 else 'inactive'), 'created_at': datetime.now().isoformat(), 'updated_at': datetime.now().isoformat(), 'created_by': f'user_{dataflow_id % 3 + 1}', 'config': data.get('config', {}) } logger.info(f"更新数据流成功: ID={dataflow_id}") return updated_dataflow except Exception as e: logger.error(f"更新数据流失败: {str(e)}") raise e @staticmethod def delete_dataflow(dataflow_id: int) -> bool: """ 删除数据流 Args: dataflow_id: 数据流ID Returns: 删除是否成功 """ try: # TODO: 这里应该从数据库删除数据流 # 目前返回模拟结果 if dataflow_id <= 0 or dataflow_id > 20: return False logger.info(f"删除数据流成功: ID={dataflow_id}") return True except Exception as e: logger.error(f"删除数据流失败: {str(e)}") raise e @staticmethod def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]: """ 执行数据流 Args: dataflow_id: 数据流ID params: 执行参数 Returns: 执行结果信息 """ try: # TODO: 这里应该实际执行数据流 # 目前返回模拟结果 if dataflow_id <= 0 or dataflow_id > 20: raise ValueError(f"数据流不存在: ID={dataflow_id}") execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}" result = { 'execution_id': execution_id, 'dataflow_id': dataflow_id, 'status': 'running', 'started_at': datetime.now().isoformat(), 'params': params or {}, 'progress': 0 } logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}") return result except Exception as e: logger.error(f"执行数据流失败: {str(e)}") raise e @staticmethod def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]: """ 获取数据流执行状态 Args: dataflow_id: 数据流ID Returns: 执行状态信息 """ try: # TODO: 这里应该查询实际的执行状态 # 目前返回模拟状态 if dataflow_id <= 0 or dataflow_id > 20: raise ValueError(f"数据流不存在: ID={dataflow_id}") status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4] return { 'dataflow_id': dataflow_id, 'status': status, 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100, 'started_at': datetime.now().isoformat(), 'completed_at': datetime.now().isoformat() if status == 'completed' else None, 'error_message': '执行过程中发生错误' if status == 'failed' else None } except Exception as e: logger.error(f"获取数据流状态失败: {str(e)}") raise e @staticmethod def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]: """ 获取数据流执行日志 Args: dataflow_id: 数据流ID page: 页码 page_size: 每页大小 Returns: 执行日志列表和分页信息 """ try: # TODO: 这里应该查询实际的执行日志 # 目前返回模拟日志 if dataflow_id <= 0 or dataflow_id > 20: raise ValueError(f"数据流不存在: ID={dataflow_id}") mock_logs = [ { 'id': i, 'timestamp': datetime.now().isoformat(), 'level': ['INFO', 'WARNING', 'ERROR'][i % 3], 'message': f'数据流执行日志消息 {i}', 'component': ['source', 'transform', 'target'][i % 3] } for i in range(1, 101) ] # 分页处理 total = len(mock_logs) start = (page - 1) * page_size end = start + page_size logs = mock_logs[start:end] return { 'logs': logs, 'pagination': { 'page': page, 'page_size': page_size, 'total': total, 'total_pages': (total + page_size - 1) // page_size } } except Exception as e: logger.error(f"获取数据流日志失败: {str(e)}") raise e @staticmethod def create_script(request_data: str) -> str: """ 使用Deepseek模型生成脚本 Args: request_data: 请求数据,用户需求的文本描述 Returns: 生成的脚本内容(TXT格式) """ try: # 构建prompt prompt_parts = [] # 添加系统提示 prompt_parts.append("请根据以下需求生成相应的数据处理脚本:") # 直接将request_data作为文本描述添加到prompt中 prompt_parts.append(request_data) # 添加格式要求 prompt_parts.append("\n请生成完整可执行的脚本代码,包含必要的注释和错误处理。") # 组合prompt full_prompt = "\n\n".join(prompt_parts) logger.info(f"开始调用Deepseek模型生成脚本,prompt长度: {len(full_prompt)}") # 调用LLM服务 script_content = llm_client(full_prompt) if not script_content: raise ValueError("Deepseek模型返回空内容") # 确保返回的是文本格式 if not isinstance(script_content, str): script_content = str(script_content) logger.info(f"脚本生成成功,内容长度: {len(script_content)}") return script_content except Exception as e: logger.error(f"生成脚本失败: {str(e)}") raise e