import logging from typing import Dict, List, Optional, Any from datetime import datetime import json from app.core.llm.llm_service import llm_client from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists from app.core.meta_data import translate_and_parse, get_formatted_time from py2neo import Relationship from app import db from sqlalchemy import text logger = logging.getLogger(__name__) class DataFlowService: """数据流服务类,处理数据流相关的业务逻辑""" @staticmethod def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]: """ 获取数据流列表 Args: page: 页码 page_size: 每页大小 search: 搜索关键词 Returns: 包含数据流列表和分页信息的字典 """ try: # 从图数据库查询数据流列表 skip_count = (page - 1) * page_size # 构建搜索条件 where_clause = "" params = {'skip': skip_count, 'limit': page_size} if search: where_clause = "WHERE n.name CONTAINS $search OR n.description CONTAINS $search" params['search'] = search # 查询数据流列表 query = f""" MATCH (n:DataFlow) {where_clause} RETURN n, id(n) as node_id ORDER BY n.created_at DESC SKIP $skip LIMIT $limit """ with connect_graph().session() as session: list_result = session.run(query, **params).data() # 查询总数 count_query = f""" MATCH (n:DataFlow) {where_clause} RETURN count(n) as total """ count_params = {'search': search} if search else {} count_result = session.run(count_query, **count_params).single() total = count_result['total'] if count_result else 0 # 格式化结果 dataflows = [] for record in list_result: node = record['n'] dataflow = dict(node) dataflow['id'] = record['node_id'] # 使用查询返回的node_id dataflows.append(dataflow) return { 'list': dataflows, 'pagination': { 'page': page, 'page_size': page_size, 'total': total, 'total_pages': (total + page_size - 1) // page_size } } except Exception as e: logger.error(f"获取数据流列表失败: {str(e)}") raise e @staticmethod def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取数据流详情 Args: dataflow_id: 数据流ID Returns: 数据流详情字典,如果不存在则返回None """ try: query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id OPTIONAL MATCH (n)-[:label]-(la:data_label) OPTIONAL MATCH (n)-[:child]-(child) OPTIONAL MATCH (parent)-[:child]-(n) RETURN n, id(n) as node_id, collect(DISTINCT {id: id(la), name: la.name}) as tags, collect(DISTINCT {id: id(child), name: child.name}) as children, collect(DISTINCT {id: id(parent), name: parent.name}) as parents """ with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: return None record = result[0] node = record['n'] dataflow = dict(node) dataflow['id'] = record['node_id'] # 使用查询返回的node_id dataflow['tags'] = record['tags'] dataflow['children'] = record['children'] dataflow['parents'] = record['parents'] return dataflow except Exception as e: logger.error(f"获取数据流详情失败: {str(e)}") raise e @staticmethod def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]: """ 创建新的数据流 Args: data: 数据流配置数据 Returns: 创建的数据流信息 """ try: # 验证必填字段 required_fields = ['name', 'describe'] for field in required_fields: if field not in data: raise ValueError(f"缺少必填字段: {field}") dataflow_name = data['name'] # 使用LLM翻译名称生成英文名 try: result_list = translate_and_parse(dataflow_name) name_en = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_') except Exception as e: logger.warning(f"翻译失败,使用默认英文名: {str(e)}") name_en = dataflow_name.lower().replace(' ', '_') # 准备节点数据 node_data = { 'name_zh': dataflow_name, 'name_en': name_en, 'category': data.get('category', ''), 'organization': data.get('organization', ''), 'leader': data.get('leader', ''), 'frequency': data.get('frequency', ''), 'tag': data.get('tag', ''), 'describe': data.get('describe', ''), 'status': data.get('status', 'inactive'), 'created_at': get_formatted_time(), 'updated_at': get_formatted_time() } # 创建或获取数据流节点 dataflow_id = get_node('DataFlow', name=dataflow_name) if dataflow_id: raise ValueError(f"数据流 '{dataflow_name}' 已存在") dataflow_id = create_or_get_node('DataFlow', **node_data) # 处理标签关系 tag_id = data.get('tag') if tag_id is not None: try: DataFlowService._handle_tag_relationship(dataflow_id, tag_id) except Exception as e: logger.warning(f"处理标签关系时出错: {str(e)}") # 成功创建图数据库节点后,写入PG数据库 try: DataFlowService._save_to_pg_database(data, dataflow_name, name_en) logger.info(f"数据流信息已写入PG数据库: {dataflow_name}") # PG数据库记录成功写入后,在neo4j图数据库中创建script关系 try: DataFlowService._handle_script_relationships(data,dataflow_name,name_en) logger.info(f"脚本关系创建成功: {dataflow_name}") except Exception as script_error: logger.warning(f"创建脚本关系失败: {str(script_error)}") except Exception as pg_error: logger.error(f"写入PG数据库失败: {str(pg_error)}") # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据 # 在实际应用中,可能需要考虑分布式事务 # 返回创建的数据流信息 # 查询创建的节点获取完整信息 query = "MATCH (n:DataFlow {name: $name}) RETURN n, id(n) as node_id" with connect_graph().session() as session: id_result = session.run(query, name=dataflow_name).single() if id_result: dataflow_node = id_result['n'] node_id = id_result['node_id'] # 将节点属性转换为字典 result = dict(dataflow_node) result['id'] = node_id else: # 如果查询失败,返回基本信息 result = { 'id': dataflow_id if isinstance(dataflow_id, int) else None, 'name': dataflow_name, 'name_en': name_en, 'created_at': get_formatted_time() } logger.info(f"创建数据流成功: {dataflow_name}") return result except Exception as e: logger.error(f"创建数据流失败: {str(e)}") raise e @staticmethod def _save_to_pg_database(data: Dict[str, Any], script_name: str, name_en: str): """ 将脚本信息保存到PG数据库 Args: data: 包含脚本信息的数据 script_name: 脚本名称 en_name: 英文名称 """ try: # 提取脚本相关信息 script_requirement = data.get('script_requirement', '') script_content = data.get('script_content', '') source_table = data.get('source_table', '') target_table = data.get('target_table', name_en) # 如果没有指定目标表,使用英文名 script_type = data.get('script_type', 'python') user_name = data.get('created_by', 'system') target_dt_column = data.get('target_dt_column', '') # 验证必需字段 if not target_table: target_table = name_en if not script_name: raise ValueError("script_name不能为空") # 构建插入SQL insert_sql = text(""" INSERT INTO dags.data_transform_scripts (source_table, target_table, script_name, script_type, script_requirement, script_content, user_name, create_time, update_time, target_dt_column) VALUES (:source_table, :target_table, :script_name, :script_type, :script_requirement, :script_content, :user_name, :create_time, :update_time, :target_dt_column) ON CONFLICT (target_table, script_name) DO UPDATE SET source_table = EXCLUDED.source_table, script_type = EXCLUDED.script_type, script_requirement = EXCLUDED.script_requirement, script_content = EXCLUDED.script_content, user_name = EXCLUDED.user_name, update_time = EXCLUDED.update_time, target_dt_column = EXCLUDED.target_dt_column """) # 准备参数 current_time = datetime.now() params = { 'source_table': source_table, 'target_table': target_table, 'script_name': script_name, 'script_type': script_type, 'script_requirement': script_requirement, 'script_content': script_content, 'user_name': user_name, 'create_time': current_time, 'update_time': current_time, 'target_dt_column': target_dt_column } # 执行插入操作 db.session.execute(insert_sql, params) db.session.commit() logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}") except Exception as e: db.session.rollback() logger.error(f"写入PG数据库失败: {str(e)}") raise e @staticmethod def _handle_children_relationships(dataflow_node, children_ids): """处理子节点关系""" logger.debug(f"处理子节点关系,原始children_ids: {children_ids}, 类型: {type(children_ids)}") # 确保children_ids是列表格式 if not isinstance(children_ids, (list, tuple)): if children_ids is not None: children_ids = [children_ids] # 如果是单个值,转换为列表 logger.debug(f"将单个值转换为列表: {children_ids}") else: children_ids = [] # 如果是None,转换为空列表 logger.debug("将None转换为空列表") for child_id in children_ids: try: # 查找子节点 query = "MATCH (n) WHERE id(n) = $child_id RETURN n" with connect_graph().session() as session: result = session.run(query, child_id=child_id).data() if result: child_node = result[0]['n'] # 获取dataflow_node的ID dataflow_id = getattr(dataflow_node, 'identity', None) if dataflow_id is None: # 如果没有identity属性,从名称查询ID query_id = "MATCH (n:DataFlow) WHERE n.name = $name RETURN id(n) as node_id" id_result = session.run(query_id, name=dataflow_node.get('name')).single() dataflow_id = id_result['node_id'] if id_result else None # 创建关系 - 使用ID调用relationship_exists if dataflow_id and not relationship_exists(dataflow_id, 'child', child_id): session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $child_id CREATE (a)-[:child]->(b)", dataflow_id=dataflow_id, child_id=child_id) logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}") except Exception as e: logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}") @staticmethod def _handle_tag_relationship(dataflow_id, tag_id): """处理标签关系""" try: # 查找标签节点 query = "MATCH (n:data_label) WHERE id(n) = $tag_id RETURN n" with connect_graph().session() as session: result = session.run(query, tag_id=tag_id).data() if result: tag_node = result[0]['n'] # 创建关系 - 使用ID调用relationship_exists if dataflow_id and not relationship_exists(dataflow_id, 'label', tag_id): session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $tag_id CREATE (a)-[:label]->(b)", dataflow_id=dataflow_id, tag_id=tag_id) logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}") except Exception as e: logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}") @staticmethod def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ 更新数据流 Args: dataflow_id: 数据流ID data: 更新的数据 Returns: 更新后的数据流信息,如果不存在则返回None """ try: # 查找节点 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: return None # 更新节点属性 update_fields = [] params = {'dataflow_id': dataflow_id} for key, value in data.items(): if key not in ['id', 'created_at']: # 保护字段 if key == 'config' and isinstance(value, dict): value = json.dumps(value, ensure_ascii=False) update_fields.append(f"n.{key} = ${key}") params[key] = value if update_fields: params['updated_at'] = get_formatted_time() update_fields.append("n.updated_at = $updated_at") update_query = f""" MATCH (n:DataFlow) WHERE id(n) = $dataflow_id SET {', '.join(update_fields)} RETURN n, id(n) as node_id """ result = session.run(update_query, **params).data() if result: node = result[0]['n'] updated_dataflow = dict(node) updated_dataflow['id'] = result[0]['node_id'] # 使用查询返回的node_id logger.info(f"更新数据流成功: ID={dataflow_id}") return updated_dataflow return None except Exception as e: logger.error(f"更新数据流失败: {str(e)}") raise e @staticmethod def delete_dataflow(dataflow_id: int) -> bool: """ 删除数据流 Args: dataflow_id: 数据流ID Returns: 删除是否成功 """ try: # 删除节点及其关系 query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id DETACH DELETE n RETURN count(n) as deleted_count """ with connect_graph().session() as session: delete_result = session.run(query, dataflow_id=dataflow_id).single() result = delete_result['deleted_count'] if delete_result else 0 if result and result > 0: logger.info(f"删除数据流成功: ID={dataflow_id}") return True return False except Exception as e: logger.error(f"删除数据流失败: {str(e)}") raise e @staticmethod def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]: """ 执行数据流 Args: dataflow_id: 数据流ID params: 执行参数 Returns: 执行结果信息 """ try: # 检查数据流是否存在 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}" # TODO: 这里应该实际执行数据流 # 目前返回模拟结果 result = { 'execution_id': execution_id, 'dataflow_id': dataflow_id, 'status': 'running', 'started_at': datetime.now().isoformat(), 'params': params or {}, 'progress': 0 } logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}") return result except Exception as e: logger.error(f"执行数据流失败: {str(e)}") raise e @staticmethod def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]: """ 获取数据流执行状态 Args: dataflow_id: 数据流ID Returns: 执行状态信息 """ try: # TODO: 这里应该查询实际的执行状态 # 目前返回模拟状态 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4] return { 'dataflow_id': dataflow_id, 'status': status, 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100, 'started_at': datetime.now().isoformat(), 'completed_at': datetime.now().isoformat() if status == 'completed' else None, 'error_message': '执行过程中发生错误' if status == 'failed' else None } except Exception as e: logger.error(f"获取数据流状态失败: {str(e)}") raise e @staticmethod def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]: """ 获取数据流执行日志 Args: dataflow_id: 数据流ID page: 页码 page_size: 每页大小 Returns: 执行日志列表和分页信息 """ try: # TODO: 这里应该查询实际的执行日志 # 目前返回模拟日志 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") mock_logs = [ { 'id': i, 'timestamp': datetime.now().isoformat(), 'level': ['INFO', 'WARNING', 'ERROR'][i % 3], 'message': f'数据流执行日志消息 {i}', 'component': ['source', 'transform', 'target'][i % 3] } for i in range(1, 101) ] # 分页处理 total = len(mock_logs) start = (page - 1) * page_size end = start + page_size logs = mock_logs[start:end] return { 'logs': logs, 'pagination': { 'page': page, 'page_size': page_size, 'total': total, 'total_pages': (total + page_size - 1) // page_size } } except Exception as e: logger.error(f"获取数据流日志失败: {str(e)}") raise e @staticmethod def create_script(request_data: str) -> str: """ 使用Deepseek模型生成脚本 Args: request_data: 请求数据,用户需求的文本描述 Returns: 生成的脚本内容(TXT格式) """ try: # 构建prompt prompt_parts = [] # 添加系统提示 prompt_parts.append("请根据以下需求生成相应的数据处理脚本:") # 直接将request_data作为文本描述添加到prompt中 prompt_parts.append(request_data) # 添加格式要求 prompt_parts.append("\n请生成完整可执行的脚本代码,包含必要的注释和错误处理。") # 组合prompt full_prompt = "\n\n".join(prompt_parts) logger.info(f"开始调用Deepseek模型生成脚本,prompt长度: {len(full_prompt)}") # 调用LLM服务 script_content = llm_client(full_prompt) if not script_content: raise ValueError("Deepseek模型返回空内容") # 确保返回的是文本格式 if not isinstance(script_content, str): script_content = str(script_content) logger.info(f"脚本生成成功,内容长度: {len(script_content)}") return script_content except Exception as e: logger.error(f"生成脚本失败: {str(e)}") raise e @staticmethod def _handle_script_relationships(data: Dict[str, Any],dataflow_name:str,name_en:str): """ 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的DERIVED_FROM关系 Args: data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode """ try: # 从data中读取键值对 script_name = dataflow_name, script_type = data.get('script_type', 'sql') schedule_status = data.get('status', 'inactive') source_table = data.get('source_table', '') target_table = data.get('target_table', '') update_mode = data.get('update_mode', 'full') # 验证必要字段 if not source_table or not target_table: logger.warning(f"source_table或target_table为空,跳过关系创建: source_table={source_table}, target_table={target_table}") return logger.info(f"开始创建脚本关系: {source_table} -> {target_table}") with connect_graph().session() as session: # 创建或获取source_table节点 source_node_query = """ MERGE (source:table {name: $source_table}) ON CREATE SET source.created_at = $created_at, source.type = 'source' RETURN source, id(source) as source_id """ # 创建或获取target_table节点 target_node_query = """ MERGE (target:table {name: $target_table}) ON CREATE SET target.created_at = $created_at, target.type = 'target' RETURN target, id(target) as target_id """ current_time = get_formatted_time() # 执行创建节点的查询 source_result = session.run(source_node_query, source_table=source_table, created_at=current_time).single() target_result = session.run(target_node_query, target_table=target_table, created_at=current_time).single() if source_result and target_result: source_id = source_result['source_id'] target_id = target_result['target_id'] # 检查关系是否已存在 relationship_check_query = """ MATCH (source:table)-[r:DERIVED_FROM]-(target:table) WHERE id(source) = $source_id AND id(target) = $target_id RETURN r """ existing_relationship = session.run(relationship_check_query, source_id=source_id, target_id=target_id).single() if not existing_relationship: # 创建DERIVED_FROM关系,从source_table到target_table create_relationship_query = """ MATCH (source:table), (target:table) WHERE id(source) = $source_id AND id(target) = $target_id CREATE (target)-[r:DERIVED_FROM]->(source) SET r.script_name = $script_name, r.script_type = $script_type, r.schedule_status = $schedule_status, r.update_mode = $update_mode, r.created_at = $created_at, r.updated_at = $created_at RETURN r """ session.run(create_relationship_query, source_id=source_id, target_id=target_id, script_name=script_name, script_type=script_type, schedule_status=schedule_status, update_mode=update_mode, created_at=current_time) logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})") else: logger.info(f"DERIVED_FROM关系已存在: {target_table} -> {source_table}") else: logger.error(f"创建表节点失败: source_table={source_table}, target_table={target_table}") except Exception as e: logger.error(f"处理脚本关系失败: {str(e)}") raise e