import json import logging from datetime import datetime from typing import Any, Dict, List, Optional, Union from sqlalchemy import text from app import db from app.core.data_service.data_product_service import DataProductService from app.core.graph.graph_operations import ( connect_graph, create_or_get_node, get_node, relationship_exists, ) from app.core.llm.llm_service import llm_sql from app.core.meta_data import get_formatted_time, translate_and_parse logger = logging.getLogger(__name__) class DataFlowService: """数据流服务类,处理数据流相关的业务逻辑""" @staticmethod def get_dataflows( page: int = 1, page_size: int = 10, search: str = "", ) -> Dict[str, Any]: """ 获取数据流列表 Args: page: 页码 page_size: 每页大小 search: 搜索关键词 Returns: 包含数据流列表和分页信息的字典 """ try: # 从图数据库查询数据流列表 skip_count = (page - 1) * page_size # 构建搜索条件 where_clause = "" params: Dict[str, Union[int, str]] = { "skip": skip_count, "limit": page_size, } if search: where_clause = ( "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search" ) params["search"] = search # 查询数据流列表(包含标签数组) # 使用WITH子句先分页,再聚合标签,避免分页结果不准确 query = f""" MATCH (n:DataFlow) {where_clause} WITH n ORDER BY n.created_at DESC SKIP $skip LIMIT $limit OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel) RETURN n, id(n) as node_id, n.created_at as created_at, collect({{ id: id(label), name_zh: label.name_zh, name_en: label.name_en }}) as tags """ # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常) try: with connect_graph().session() as session: list_result = session.run(query, params).data() # 查询总数 count_query = f""" MATCH (n:DataFlow) {where_clause} RETURN count(n) as total """ count_params = {"search": search} if search else {} count_result = session.run(count_query, count_params).single() total = count_result["total"] if count_result else 0 except Exception as e: # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭 # driver,因为connect_graph可能返回单例或新实例。如果是新实例, # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭, # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。 # 用户反馈:The driver.close() call prematurely closes a shared # driver instance,所以直接使用 session,并不关闭 driver。 logger.error(f"查询数据流失败: {str(e)}") raise e # 格式化结果 dataflows = [] for record in list_result: node = record["n"] dataflow = dict(node) dataflow["id"] = record["node_id"] # 使用查询返回的node_id # 处理标签数组,过滤掉空标签 tags = record.get("tags", []) dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None] dataflows.append(dataflow) return { "list": dataflows, "pagination": { "page": page, "page_size": page_size, "total": total, "total_pages": (total + page_size - 1) // page_size, }, } except Exception as e: logger.error(f"获取数据流列表失败: {str(e)}") raise e @staticmethod def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取数据流详情 Args: dataflow_id: 数据流ID Returns: 数据流详情字典,如果不存在则返回None """ try: # 从Neo4j获取DataFlow节点的所有属性(包含标签数组) neo4j_query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel) RETURN n, id(n) as node_id, collect({ id: id(label), name_zh: label.name_zh, name_en: label.name_en }) as tags """ with connect_graph().session() as session: neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data() if not neo4j_result: logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点") return None record = neo4j_result[0] node = record["n"] # 将节点属性转换为字典 dataflow = dict(node) dataflow["id"] = record["node_id"] # 处理标签数组,过滤掉空标签 tags = record.get("tags", []) dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None] # 处理 script_requirement:如果是JSON字符串,解析为对象 script_requirement_str = dataflow.get("script_requirement", "") if script_requirement_str: try: # 尝试解析JSON字符串 script_requirement_obj = json.loads(script_requirement_str) dataflow["script_requirement"] = script_requirement_obj logger.debug( "成功解析script_requirement: %s", script_requirement_obj, ) except (json.JSONDecodeError, TypeError) as e: logger.warning(f"script_requirement解析失败,保持原值: {e}") # 保持原值(字符串) dataflow["script_requirement"] = script_requirement_str else: # 如果为空,设置为None dataflow["script_requirement"] = None logger.info( "成功获取DataFlow详情,ID: %s, 名称: %s", dataflow_id, dataflow.get("name_zh"), ) return dataflow except Exception as e: logger.error(f"获取数据流详情失败: {str(e)}") raise e @staticmethod def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]: """ 创建新的数据流 Args: data: 数据流配置数据 Returns: 创建的数据流信息 """ try: # 验证必填字段 required_fields = ["name_zh", "describe"] for field in required_fields: if field not in data: raise ValueError(f"缺少必填字段: {field}") dataflow_name = data["name_zh"] # 使用LLM翻译名称生成英文名 try: result_list = translate_and_parse(dataflow_name) name_en = ( result_list[0] if result_list else dataflow_name.lower().replace(" ", "_") ) except Exception as e: logger.warning(f"翻译失败,使用默认英文名: {str(e)}") name_en = dataflow_name.lower().replace(" ", "_") # 处理 script_requirement,将其转换为 JSON 字符串 script_requirement = data.get("script_requirement") if script_requirement is not None: # 如果是字典或列表,转换为 JSON 字符串 if isinstance(script_requirement, (dict, list)): script_requirement_str = json.dumps( script_requirement, ensure_ascii=False ) else: # 如果已经是字符串,直接使用 script_requirement_str = str(script_requirement) else: script_requirement_str = "" # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联) node_data = { "name_zh": dataflow_name, "name_en": name_en, "category": data.get("category", ""), "organization": data.get("organization", ""), "leader": data.get("leader", ""), "frequency": data.get("frequency", ""), "describe": data.get("describe", ""), "status": data.get("status", "inactive"), "update_mode": data.get("update_mode", "append"), "script_type": data.get("script_type", "python"), "script_requirement": script_requirement_str, "created_at": get_formatted_time(), "updated_at": get_formatted_time(), } # 创建或获取数据流节点 dataflow_id = get_node("DataFlow", name=dataflow_name) if dataflow_id: raise ValueError(f"数据流 '{dataflow_name}' 已存在") dataflow_id = create_or_get_node("DataFlow", **node_data) # 处理标签关系(支持多标签数组) tag_list = data.get("tag", []) if tag_list: try: DataFlowService._handle_tag_relationships(dataflow_id, tag_list) except Exception as e: logger.warning(f"处理标签关系时出错: {str(e)}") # 成功创建图数据库节点后,写入PG数据库 try: DataFlowService._save_to_pg_database(data, dataflow_name, name_en) logger.info(f"数据流信息已写入PG数据库: {dataflow_name}") # PG数据库记录成功写入后,在neo4j图数据库中创建script关系 try: DataFlowService._handle_script_relationships( data, dataflow_name, name_en ) logger.info(f"脚本关系创建成功: {dataflow_name}") except Exception as script_error: logger.warning(f"创建脚本关系失败: {str(script_error)}") except Exception as pg_error: logger.error(f"写入PG数据库失败: {str(pg_error)}") # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据 # 在实际应用中,可能需要考虑分布式事务 # 返回创建的数据流信息 # 查询创建的节点获取完整信息 query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id" with connect_graph().session() as session: id_result = session.run(query, name_zh=dataflow_name).single() if id_result: dataflow_node = id_result["n"] node_id = id_result["node_id"] # 将节点属性转换为字典 result = dict(dataflow_node) result["id"] = node_id else: # 如果查询失败,返回基本信息 result = { "id": (dataflow_id if isinstance(dataflow_id, int) else None), "name_zh": dataflow_name, "name_en": name_en, "created_at": get_formatted_time(), } # 注册数据产品到数据服务 try: DataFlowService._register_data_product( data=data, dataflow_name=dataflow_name, name_en=name_en, dataflow_id=result.get("id"), ) logger.info(f"数据产品注册成功: {dataflow_name}") except Exception as product_error: logger.warning(f"注册数据产品失败: {str(product_error)}") # 不影响主流程,仅记录警告 logger.info(f"创建数据流成功: {dataflow_name}") return result except Exception as e: logger.error(f"创建数据流失败: {str(e)}") raise e @staticmethod def _save_to_pg_database( data: Dict[str, Any], script_name: str, name_en: str, ): """ 将脚本信息保存到PG数据库 Args: data: 包含脚本信息的数据 script_name: 脚本名称 name_en: 英文名称 """ try: # 提取脚本相关信息 # 处理 script_requirement,确保保存为 JSON 字符串 script_requirement_raw = data.get("script_requirement") # 用于保存从 script_requirement 中提取的 rule rule_from_requirement = "" if script_requirement_raw is not None: # 如果是字典,提取 rule 字段 if isinstance(script_requirement_raw, dict): rule_from_requirement = script_requirement_raw.get("rule", "") script_requirement = json.dumps( script_requirement_raw, ensure_ascii=False ) elif isinstance(script_requirement_raw, list): script_requirement = json.dumps( script_requirement_raw, ensure_ascii=False ) else: # 如果已经是字符串,尝试解析以提取 rule script_requirement = str(script_requirement_raw) try: parsed_req = json.loads(script_requirement) if isinstance(parsed_req, dict): rule_from_requirement = parsed_req.get("rule", "") except (json.JSONDecodeError, TypeError): pass else: script_requirement = "" # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule script_content = data.get("script_content", "") if not script_content and rule_from_requirement: script_content = rule_from_requirement logger.info( "script_content为空,使用从script_requirement提取的rule: %s", rule_from_requirement, ) # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误) source_table_raw = data.get("source_table") or "" source_table = ( source_table_raw.split(":")[-1] if ":" in source_table_raw else source_table_raw ) target_table_raw = data.get("target_table") or "" target_table = ( target_table_raw.split(":")[-1] if ":" in target_table_raw else (target_table_raw or name_en) ) script_type = data.get("script_type", "python") user_name = data.get("created_by", "system") target_dt_column = data.get("target_dt_column", "") # 验证必需字段 if not target_table: target_table = name_en if not script_name: raise ValueError("script_name不能为空") # 构建插入SQL insert_sql = text( """ INSERT INTO dags.data_transform_scripts (source_table, target_table, script_name, script_type, script_requirement, script_content, user_name, create_time, update_time, target_dt_column) VALUES (:source_table, :target_table, :script_name, :script_type, :script_requirement, :script_content, :user_name, :create_time, :update_time, :target_dt_column) ON CONFLICT (target_table, script_name) DO UPDATE SET source_table = EXCLUDED.source_table, script_type = EXCLUDED.script_type, script_requirement = EXCLUDED.script_requirement, script_content = EXCLUDED.script_content, user_name = EXCLUDED.user_name, update_time = EXCLUDED.update_time, target_dt_column = EXCLUDED.target_dt_column """ ) # 准备参数 current_time = datetime.now() params = { "source_table": source_table, "target_table": target_table, "script_name": script_name, "script_type": script_type, "script_requirement": script_requirement, "script_content": script_content, "user_name": user_name, "create_time": current_time, "update_time": current_time, "target_dt_column": target_dt_column, } # 执行插入操作 db.session.execute(insert_sql, params) # 新增:保存到task_list表 try: # 1. 解析script_requirement并构建详细的任务描述 task_description_md = script_requirement try: # 尝试解析JSON try: req_json = json.loads(script_requirement) except (json.JSONDecodeError, TypeError): req_json = None if isinstance(req_json, dict): # 1. 从script_requirement中提取rule字段作为request_content_str request_content_str = req_json.get("rule", "") # 2. 从script_requirement中提取source_table和 # target_table字段信息 source_table_ids = req_json.get("source_table", []) target_table_ids = req_json.get("target_table", []) # 确保是列表格式 if not isinstance(source_table_ids, list): source_table_ids = ( [source_table_ids] if source_table_ids else [] ) if not isinstance(target_table_ids, list): target_table_ids = ( [target_table_ids] if target_table_ids else [] ) # 合并所有BusinessDomain ID all_bd_ids = source_table_ids + target_table_ids # 4. 从data参数中提取update_mode update_mode = data.get("update_mode", "append") # 生成Business Domain DDLs source_ddls = [] target_ddls = [] data_source_info = None if all_bd_ids: try: with connect_graph().session() as session: # 处理source tables for bd_id in source_table_ids: ddl_info = DataFlowService._generate_businessdomain_ddl( session, bd_id, is_target=False, ) if ddl_info: source_ddls.append(ddl_info["ddl"]) # 3. 如果BELONGS_TO关系连接的是 # "数据资源",获取数据源信息 if ( ddl_info.get("data_source") and not data_source_info ): data_source_info = ddl_info[ "data_source" ] # 处理target tables(5. 目标表缺省要有create_time字段) for bd_id in target_table_ids: ddl_info = DataFlowService._generate_businessdomain_ddl( session, bd_id, is_target=True, update_mode=update_mode, ) if ddl_info: target_ddls.append(ddl_info["ddl"]) # 同样检查BELONGS_TO关系,获取数据源信息 if ( ddl_info.get("data_source") and not data_source_info ): data_source_info = ddl_info[ "data_source" ] except Exception as neo_e: logger.error( f"获取BusinessDomain DDL失败: {str(neo_e)}" ) # 构建Markdown格式的任务描述 task_desc_parts = [f"# Task: {script_name}\n"] # 添加数据源信息 if data_source_info: task_desc_parts.append("## Data Source") task_desc_parts.append( f"- **Type**: {data_source_info.get('type', 'N/A')}" ) task_desc_parts.append( f"- **Host**: {data_source_info.get('host', 'N/A')}" ) task_desc_parts.append( f"- **Port**: {data_source_info.get('port', 'N/A')}" ) task_desc_parts.append( f"- **Database**: " f"{data_source_info.get('database', 'N/A')}\n" ) # 添加源表DDL if source_ddls: task_desc_parts.append("## Source Tables (DDL)") for ddl in source_ddls: task_desc_parts.append(f"```sql\n{ddl}\n```\n") # 添加目标表DDL if target_ddls: task_desc_parts.append("## Target Tables (DDL)") for ddl in target_ddls: task_desc_parts.append(f"```sql\n{ddl}\n```\n") # 添加更新模式说明 task_desc_parts.append("## Update Mode") if update_mode == "append": task_desc_parts.append("- **Mode**: Append (追加模式)") task_desc_parts.append( "- **Description**: 新数据将追加到目标表,不删除现有数据\n" ) else: task_desc_parts.append( "- **Mode**: Full Refresh (全量更新)" ) task_desc_parts.append( "- **Description**: 目标表将被清空后重新写入数据\n" ) # 添加请求内容(rule) if request_content_str: task_desc_parts.append("## Request Content") task_desc_parts.append(f"{request_content_str}\n") # 添加实施步骤(根据任务类型优化) task_desc_parts.append("## Implementation Steps") # 判断是否为远程数据源导入任务 if data_source_info: # 从远程数据源导入数据的简化步骤 task_desc_parts.append( "1. Create an n8n workflow to execute the " "data import task" ) task_desc_parts.append( "2. Configure the workflow to call " "`import_resource_data.py` Python script" ) task_desc_parts.append( "3. Pass the following parameters to the " "Python execution node:" ) task_desc_parts.append( " - `--source-config`: JSON configuration " "for the remote data source" ) task_desc_parts.append( " - `--target-table`: Target table name " "(data resource English name)" ) task_desc_parts.append( f" - `--update-mode`: {update_mode}" ) task_desc_parts.append( "4. The Python script will automatically:" ) task_desc_parts.append( " - Connect to the remote data source" ) task_desc_parts.append( " - Extract data from the source table" ) task_desc_parts.append( f" - Write data to target table using " f"{update_mode} mode" ) else: # 数据转换任务的完整步骤 task_desc_parts.append( "1. Extract data from source tables as " "specified in the DDL" ) task_desc_parts.append( "2. Apply transformation logic according to the rule:" ) if request_content_str: task_desc_parts.append( f" - Rule: {request_content_str}" ) task_desc_parts.append( "3. Generate Python program to implement the " "data transformation logic" ) task_desc_parts.append( f"4. Write transformed data to target table " f"using {update_mode} mode" ) task_desc_parts.append( "5. Create an n8n workflow to schedule and " "execute the Python program" ) task_description_md = "\n".join(task_desc_parts) except Exception as parse_e: logger.warning( f"解析任务描述详情失败,使用原始描述: {str(parse_e)}" ) task_description_md = script_requirement # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/ code_path = "app/core/data_flow" task_insert_sql = text( "INSERT INTO public.task_list\n" "(task_name, task_description, status, code_name, " "code_path, create_by, create_time, update_time)\n" "VALUES\n" "(:task_name, :task_description, :status, :code_name, " ":code_path, :create_by, :create_time, :update_time)" ) task_params = { "task_name": script_name, "task_description": task_description_md, "status": "pending", "code_name": script_name, "code_path": code_path, "create_by": "cursor", "create_time": current_time, "update_time": current_time, } # 使用嵌套事务,确保task_list插入失败不影响主流程 with db.session.begin_nested(): db.session.execute(task_insert_sql, task_params) logger.info(f"成功将任务信息写入task_list表: task_name={script_name}") except Exception as task_error: # 记录错误但不中断主流程 logger.error(f"写入task_list表失败: {str(task_error)}") # 如果要求必须成功写入任务列表,则这里应该raise task_error # raise task_error db.session.commit() logger.info( "成功将脚本信息写入PG数据库: target_table=%s, script_name=%s", target_table, script_name, ) except Exception as e: db.session.rollback() logger.error(f"写入PG数据库失败: {str(e)}") raise e @staticmethod def _handle_children_relationships(dataflow_node, children_ids): """处理子节点关系""" logger.debug( "处理子节点关系,原始children_ids: %s, 类型: %s", children_ids, type(children_ids), ) # 确保children_ids是列表格式 if not isinstance(children_ids, (list, tuple)): if children_ids is not None: children_ids = [children_ids] # 如果是单个值,转换为列表 logger.debug(f"将单个值转换为列表: {children_ids}") else: children_ids = [] # 如果是None,转换为空列表 logger.debug("将None转换为空列表") for child_id in children_ids: try: # 查找子节点 query = "MATCH (n) WHERE id(n) = $child_id RETURN n" with connect_graph().session() as session: result = session.run(query, child_id=child_id).data() if result: # 获取dataflow_node的ID dataflow_id = getattr(dataflow_node, "identity", None) if dataflow_id is None: # 如果没有identity属性,从名称查询ID query_id = ( "MATCH (n:DataFlow) WHERE n.name_zh = " "$name_zh RETURN id(n) as node_id" ) id_result = session.run( query_id, name_zh=dataflow_node.get("name_zh"), ).single() dataflow_id = id_result["node_id"] if id_result else None # 创建关系 - 使用ID调用relationship_exists if dataflow_id and not relationship_exists( dataflow_id, "child", child_id ): session.run( "MATCH (a), (b) WHERE id(a) = $dataflow_id " "AND id(b) = $child_id " "CREATE (a)-[:child]->(b)", dataflow_id=dataflow_id, child_id=child_id, ) logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}") except Exception as e: logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}") @staticmethod def _handle_tag_relationships(dataflow_id, tag_list): """ 处理多标签关系 Args: dataflow_id: 数据流节点ID tag_list: 标签列表,可以是ID数组或包含id字段的对象数组 """ # 确保tag_list是列表格式 if not isinstance(tag_list, list): tag_list = [tag_list] if tag_list else [] for tag_item in tag_list: tag_id = None if isinstance(tag_item, dict) and "id" in tag_item: tag_id = int(tag_item["id"]) elif isinstance(tag_item, (int, str)): try: tag_id = int(tag_item) except (ValueError, TypeError): pass if tag_id: DataFlowService._handle_single_tag_relationship(dataflow_id, tag_id) @staticmethod def _handle_single_tag_relationship(dataflow_id, tag_id): """处理单个标签关系""" try: # 查找标签节点 query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n" with connect_graph().session() as session: result = session.run(query, tag_id=tag_id).data() if result: # 创建关系 - 使用ID调用relationship_exists if dataflow_id and not relationship_exists( dataflow_id, "LABEL", tag_id ): session.run( "MATCH (a), (b) WHERE id(a) = $dataflow_id " "AND id(b) = $tag_id " "CREATE (a)-[:LABEL]->(b)", dataflow_id=dataflow_id, tag_id=tag_id, ) logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}") except Exception as e: logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}") @staticmethod def update_dataflow( dataflow_id: int, data: Dict[str, Any], ) -> Optional[Dict[str, Any]]: """ 更新数据流 Args: dataflow_id: 数据流ID data: 更新的数据 Returns: 更新后的数据流信息,如果不存在则返回None """ try: # 提取 tag 数组(不作为节点属性存储) tag_list = data.pop("tag", None) # 查找节点 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: return None # 更新节点属性 update_fields = [] params: Dict[str, Any] = {"dataflow_id": dataflow_id} for key, value in data.items(): if key not in ["id", "created_at"]: # 保护字段 # 复杂对象序列化为 JSON 字符串 if key in ["config", "script_requirement"]: if isinstance(value, dict): value = json.dumps(value, ensure_ascii=False) update_fields.append(f"n.{key} = ${key}") params[key] = value if update_fields: params["updated_at"] = get_formatted_time() update_fields.append("n.updated_at = $updated_at") update_query = f""" MATCH (n:DataFlow) WHERE id(n) = $dataflow_id SET {", ".join(update_fields)} RETURN n, id(n) as node_id """ result = session.run(update_query, params).data() # 处理 tag 关系(支持多标签数组) if tag_list is not None: # 确保是列表格式 if not isinstance(tag_list, list): tag_list = [tag_list] if tag_list else [] # 先删除现有的 LABEL 关系 delete_query = """ MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel) WHERE id(n) = $dataflow_id DELETE r """ session.run(delete_query, dataflow_id=dataflow_id) logger.info(f"删除数据流 {dataflow_id} 的现有标签关系") # 为每个 tag 创建新的 LABEL 关系 for tag_item in tag_list: tag_id = None if isinstance(tag_item, dict) and "id" in tag_item: tag_id = int(tag_item["id"]) elif isinstance(tag_item, (int, str)): try: tag_id = int(tag_item) except (ValueError, TypeError): pass if tag_id: DataFlowService._handle_single_tag_relationship( dataflow_id, tag_id ) if result: node = result[0]["n"] updated_dataflow = dict(node) # 使用查询返回的node_id updated_dataflow["id"] = result[0]["node_id"] # 查询并添加标签数组到返回数据 tags_query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel) RETURN collect({ id: id(label), name_zh: label.name_zh, name_en: label.name_en }) as tags """ tags_result = session.run( tags_query, dataflow_id=dataflow_id ).single() if tags_result: tags = tags_result.get("tags", []) updated_dataflow["tag"] = [ tag for tag in tags if tag.get("id") is not None ] else: updated_dataflow["tag"] = [] logger.info(f"更新数据流成功: ID={dataflow_id}") return updated_dataflow return None except Exception as e: logger.error(f"更新数据流失败: {str(e)}") raise e @staticmethod def delete_dataflow(dataflow_id: int) -> bool: """ 删除数据流 Args: dataflow_id: 数据流ID Returns: 删除是否成功 """ try: # 删除节点及其关系 query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id DETACH DELETE n RETURN count(n) as deleted_count """ with connect_graph().session() as session: delete_result = session.run(query, dataflow_id=dataflow_id).single() result = delete_result["deleted_count"] if delete_result else 0 if result and result > 0: logger.info(f"删除数据流成功: ID={dataflow_id}") return True return False except Exception as e: logger.error(f"删除数据流失败: {str(e)}") raise e @staticmethod def execute_dataflow( dataflow_id: int, params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ 执行数据流 Args: dataflow_id: 数据流ID params: 执行参数 Returns: 执行结果信息 """ try: # 检查数据流是否存在 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}" # TODO: 这里应该实际执行数据流 # 目前返回模拟结果 result = { "execution_id": execution_id, "dataflow_id": dataflow_id, "status": "running", "started_at": datetime.now().isoformat(), "params": params or {}, "progress": 0, } logger.info( "开始执行数据流: ID=%s, execution_id=%s", dataflow_id, execution_id, ) return result except Exception as e: logger.error(f"执行数据流失败: {str(e)}") raise e @staticmethod def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]: """ 获取数据流执行状态 Args: dataflow_id: 数据流ID Returns: 执行状态信息 """ try: # TODO: 这里应该查询实际的执行状态 # 目前返回模拟状态 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") status = ["running", "completed", "failed", "pending"][dataflow_id % 4] return { "dataflow_id": dataflow_id, "status": status, "progress": ( 100 if status == "completed" else (dataflow_id * 10) % 100 ), "started_at": datetime.now().isoformat(), "completed_at": ( datetime.now().isoformat() if status == "completed" else None ), "error_message": ("执行过程中发生错误" if status == "failed" else None), } except Exception as e: logger.error(f"获取数据流状态失败: {str(e)}") raise e @staticmethod def get_dataflow_logs( dataflow_id: int, page: int = 1, page_size: int = 50, ) -> Dict[str, Any]: """ 获取数据流执行日志 Args: dataflow_id: 数据流ID page: 页码 page_size: 每页大小 Returns: 执行日志列表和分页信息 """ try: # TODO: 这里应该查询实际的执行日志 # 目前返回模拟日志 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") mock_logs = [ { "id": i, "timestamp": datetime.now().isoformat(), "level": ["INFO", "WARNING", "ERROR"][i % 3], "message": f"数据流执行日志消息 {i}", "component": ["source", "transform", "target"][i % 3], } for i in range(1, 101) ] # 分页处理 total = len(mock_logs) start = (page - 1) * page_size end = start + page_size logs = mock_logs[start:end] return { "logs": logs, "pagination": { "page": page, "page_size": page_size, "total": total, "total_pages": (total + page_size - 1) // page_size, }, } except Exception as e: logger.error(f"获取数据流日志失败: {str(e)}") raise e @staticmethod def create_script(request_data: Union[Dict[str, Any], str]) -> str: """ 使用Deepseek模型生成SQL脚本 Args: request_data: 包含input, output, request_content的请求数据字典,或JSON字符串 Returns: 生成的SQL脚本内容 """ try: logger.info(f"开始处理脚本生成请求: {request_data}") logger.info(f"request_data类型: {type(request_data)}") # 类型检查和处理 if isinstance(request_data, str): logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}") try: import json request_data = json.loads(request_data) except json.JSONDecodeError as e: raise ValueError(f"无法解析request_data为JSON: {str(e)}") if not isinstance(request_data, dict): raise ValueError( f"request_data必须是字典类型,实际类型: {type(request_data)}" ) # 1. 从传入的request_data中解析input, output, request_content内容 input_data = request_data.get("input", "") output_data = request_data.get("output", "") request_content = request_data.get("request_data", "") # 如果request_content是HTML格式,提取纯文本 if request_content and ( request_content.startswith("
") or "<" in request_content ): # 简单的HTML标签清理 import re request_content = re.sub(r"<[^>]+>", "", request_content).strip() if not input_data or not output_data or not request_content: raise ValueError( "缺少必要参数:input='{}', output='{}', " "request_content='{}' 不能为空".format( input_data, output_data, request_content[:100] if request_content else "", ) ) logger.info( "解析得到 - input: %s, output: %s, request_content: %s", input_data, output_data, request_content, ) # 2. 解析input中的多个数据表并生成源表DDL source_tables_ddl = [] input_tables = [] if input_data: tables = [ table.strip() for table in input_data.split(",") if table.strip() ] for table in tables: ddl = DataFlowService._parse_table_and_get_ddl(table, "input") if ddl: input_tables.append(table) source_tables_ddl.append(ddl) else: logger.warning(f"无法获取输入表 {table} 的DDL结构") # 3. 解析output中的数据表并生成目标表DDL target_table_ddl = "" if output_data: target_table_ddl = DataFlowService._parse_table_and_get_ddl( output_data.strip(), "output" ) if not target_table_ddl: logger.warning(f"无法获取输出表 {output_data} 的DDL结构") # 4. 按照Deepseek-prompt.txt的框架构建提示语 prompt_parts = [] # 开场白 - 角色定义 prompt_parts.append( "你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。" "请为以下需求生成一段标准的 PostgreSQL SQL 脚本:" ) # 动态生成源表部分(第1点) for i, (table, ddl) in enumerate(zip(input_tables, source_tables_ddl), 1): table_name = table.split(":")[-1] if ":" in table else table prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:") prompt_parts.append(ddl) prompt_parts.append("") # 添加空行分隔 # 动态生成目标表部分(第2点) if target_table_ddl: target_table_name = ( output_data.split(":")[-1] if ":" in output_data else output_data ) next_index = len(input_tables) + 1 prompt_parts.append( f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:" ) prompt_parts.append(target_table_ddl) prompt_parts.append("") # 添加空行分隔 # 动态生成处理逻辑部分(第3点) next_index = ( len(input_tables) + 2 if target_table_ddl else len(input_tables) + 1 ) prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}") prompt_parts.append("") # 添加空行分隔 # 固定的技术要求部分(第4-8点) tech_requirements = [ ( f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法," "适合在 Airflow、Python 脚本、或调度系统中调用;" ), f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT", f"{next_index + 3}.请直接输出SQL,无需进行解释。", ( f'{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用"_"分隔,' "采用蛇形命名法。把sql的名字作为注释写在返回的sql中。" ), ( f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期" "时间now(),不用处理update_time字段" ), ] prompt_parts.extend(tech_requirements) # 组合完整的提示语 full_prompt = "\n".join(prompt_parts) logger.info(f"构建的完整提示语长度: {len(full_prompt)}") logger.info(f"完整提示语内容: {full_prompt}") # 5. 调用LLM生成SQL脚本 logger.info("开始调用Deepseek模型生成SQL脚本") script_content = llm_sql(full_prompt) if not script_content: raise ValueError("Deepseek模型返回空内容") # 确保返回的是文本格式 if not isinstance(script_content, str): script_content = str(script_content) logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}") return script_content except Exception as e: logger.error(f"生成SQL脚本失败: {str(e)}") raise e @staticmethod def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str: """ 解析表格式(A:B)并从Neo4j查询元数据生成DDL Args: table_str: 表格式字符串,格式为"label:name_en" table_type: 表类型,用于日志记录(input/output) Returns: DDL格式的表结构字符串 """ try: # 解析A:B格式 if ":" not in table_str: logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}") return "" parts = table_str.split(":", 1) if len(parts) != 2: logger.error(f"表格式解析失败: {table_str}") return "" label = parts[0].strip() name_en = parts[1].strip() if not label or not name_en: logger.error(f"标签或英文名为空: label={label}, name_en={name_en}") return "" logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}") # 从Neo4j查询节点及其关联的元数据 with connect_graph().session() as session: # 查询节点及其关联的元数据 cypher = f""" MATCH (n:{label} {{name_en: $name_en}}) OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta) RETURN n, collect(m) as metadata """ result = session.run( cypher, # type: ignore[arg-type] {"name_en": name_en}, ) record = result.single() if not record: logger.error(f"未找到节点: label={label}, name_en={name_en}") return "" node = record["n"] metadata = record["metadata"] logger.info(f"找到节点,关联元数据数量: {len(metadata)}") # 生成DDL格式的表结构 ddl_lines = [] ddl_lines.append(f"CREATE TABLE {name_en} (") if metadata: column_definitions = [] for meta in metadata: if meta: # 确保meta不为空 meta_props = dict(meta) column_name = meta_props.get( "name_en", meta_props.get("name_zh", "unknown_column"), ) data_type = meta_props.get("data_type", "VARCHAR(255)") comment = meta_props.get("name_zh", "") # 构建列定义 column_def = f" {column_name} {data_type}" if comment: column_def += f" COMMENT '{comment}'" column_definitions.append(column_def) if column_definitions: ddl_lines.append(",\n".join(column_definitions)) else: ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'") else: # 如果没有元数据,添加默认列 ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'") ddl_lines.append(");") # 添加表注释 node_props = dict(node) table_comment = node_props.get( "name_zh", node_props.get("describe", name_en) ) if table_comment and table_comment != name_en: ddl_lines.append( f"COMMENT ON TABLE {name_en} IS '{table_comment}';" ) ddl_content = "\n".join(ddl_lines) logger.info(f"{table_type}表DDL生成成功: {name_en}") logger.debug(f"生成的DDL: {ddl_content}") return ddl_content except Exception as e: logger.error(f"解析表格式和生成DDL失败: {str(e)}") return "" @staticmethod def _generate_businessdomain_ddl( session, bd_id: int, is_target: bool = False, update_mode: str = "append", ) -> Optional[Dict[str, Any]]: """ 根据BusinessDomain节点ID生成DDL Args: session: Neo4j session对象 bd_id: BusinessDomain节点ID is_target: 是否为目标表(目标表需要添加create_time字段) update_mode: 更新模式(append或full) Returns: 包含ddl和data_source信息的字典,如果节点不存在则返回None """ try: # 查询BusinessDomain节点、元数据、标签关系和数据源关系 cypher = """ MATCH (bd:BusinessDomain) WHERE id(bd) = $bd_id OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta) OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel) OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource) RETURN bd, collect(DISTINCT m) as metadata, collect(DISTINCT { id: id(label), name_zh: label.name_zh, name_en: label.name_en }) as labels, ds.type as ds_type, ds.host as ds_host, ds.port as ds_port, ds.database as ds_database """ result = session.run(cypher, bd_id=bd_id).single() if not result or not result["bd"]: logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点") return None node = result["bd"] metadata = result["metadata"] # 处理标签数组,获取第一个有效标签名称用于判断 labels = result.get("labels", []) valid_labels = [label for label in labels if label.get("id") is not None] label_name = valid_labels[0].get("name_zh") if valid_labels else None # 生成DDL node_props = dict(node) table_name = node_props.get("name_en", f"table_{bd_id}") ddl_lines = [] ddl_lines.append(f"CREATE TABLE {table_name} (") column_definitions = [] # 添加元数据列 if metadata: for meta in metadata: if meta: meta_props = dict(meta) column_name = meta_props.get( "name_en", meta_props.get("name_zh", "unknown_column"), ) data_type = meta_props.get("data_type", "VARCHAR(255)") comment = meta_props.get("name_zh", "") column_def = f" {column_name} {data_type}" if comment: column_def += f" COMMENT '{comment}'" column_definitions.append(column_def) # 如果没有元数据,添加默认主键 if not column_definitions: column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'") # 5. 如果是目标表,添加create_time字段 if is_target: column_definitions.append( " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP " "COMMENT '数据创建时间'" ) ddl_lines.append(",\n".join(column_definitions)) ddl_lines.append(");") # 添加表注释 table_comment = node_props.get( "name_zh", node_props.get("describe", table_name) ) if table_comment and table_comment != table_name: ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';") ddl_content = "\n".join(ddl_lines) # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息 data_source = None if label_name == "数据资源" and result["ds_type"]: data_source = { "type": result["ds_type"], "host": result["ds_host"], "port": result["ds_port"], "database": result["ds_database"], } logger.info(f"获取到数据源信息: {data_source}") logger.debug( f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}" ) return { "ddl": ddl_content, "table_name": table_name, "data_source": data_source, } except Exception as e: logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}") return None @staticmethod def _handle_script_relationships( data: Dict[str, Any], dataflow_name: str, name_en: str, ): """ 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的 DERIVED_FROM关系 Args: data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode """ try: # 从data中读取键值对 script_name = (dataflow_name,) script_type = data.get("script_type", "sql") schedule_status = data.get("status", "inactive") source_table_full = data.get("source_table", "") target_table_full = data.get("target_table", "") update_mode = data.get("update_mode", "full") # 处理source_table和target_table的格式 source_table = ( source_table_full.split(":")[-1] if ":" in source_table_full else source_table_full ) target_table = ( target_table_full.split(":")[-1] if ":" in target_table_full else target_table_full ) source_label = ( source_table_full.split(":")[0] if ":" in source_table_full else source_table_full ) target_label = ( target_table_full.split(":")[0] if ":" in target_table_full else target_table_full ) # 验证必要字段 if not source_table or not target_table: logger.warning( "source_table或target_table为空,跳过关系创建: " "source_table=%s, target_table=%s", source_table, target_table, ) return logger.info(f"开始创建脚本关系: {source_table} -> {target_table}") with connect_graph().session() as session: # 创建或获取source和target节点 create_nodes_query = f""" MERGE (source:{source_label} {{name: $source_table}}) ON CREATE SET source.created_at = $created_at, source.type = 'source' WITH source MERGE (target:{target_label} {{name: $target_table}}) ON CREATE SET target.created_at = $created_at, target.type = 'target' RETURN source, target, id(source) as source_id, id(target) as target_id """ # 执行创建节点的查询 result = session.run( create_nodes_query, # type: ignore[arg-type] { "source_table": source_table, "target_table": target_table, "created_at": get_formatted_time(), }, ).single() if result: source_id = result["source_id"] target_id = result["target_id"] # 检查并创建关系 create_relationship_query = f""" MATCH (source:{source_label}), (target:{target_label}) WHERE id(source) = $source_id AND id(target) = $target_id AND NOT EXISTS((target)-[:DERIVED_FROM]->(source)) CREATE (target)-[r:DERIVED_FROM]->(source) SET r.script_name = $script_name, r.script_type = $script_type, r.schedule_status = $schedule_status, r.update_mode = $update_mode, r.created_at = $created_at, r.updated_at = $created_at RETURN r """ relationship_result = session.run( create_relationship_query, # type: ignore[arg-type] { "source_id": source_id, "target_id": target_id, "script_name": script_name, "script_type": script_type, "schedule_status": schedule_status, "update_mode": update_mode, "created_at": get_formatted_time(), }, ).single() if relationship_result: logger.info( "成功创建DERIVED_FROM关系: %s -> %s (script: %s)", target_table, source_table, script_name, ) else: logger.info( "DERIVED_FROM关系已存在: %s -> %s", target_table, source_table, ) else: logger.error( "创建表节点失败: source_table=%s, target_table=%s", source_table, target_table, ) except Exception as e: logger.error(f"处理脚本关系失败: {str(e)}") raise e @staticmethod def get_business_domain_list() -> List[Dict[str, Any]]: """ 获取BusinessDomain节点列表 Returns: BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag """ try: logger.info("开始查询BusinessDomain节点列表") with connect_graph().session() as session: # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签) query = """ MATCH (bd:BusinessDomain) OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel) RETURN id(bd) as id, bd.name_zh as name_zh, bd.name_en as name_en, bd.create_time as create_time, collect({ id: id(label), name_zh: label.name_zh, name_en: label.name_en }) as tags ORDER BY create_time DESC """ result = session.run(query) bd_list = [] for record in result: # 处理标签数组,过滤掉空标签 tags = record.get("tags", []) tag_list = [tag for tag in tags if tag.get("id") is not None] bd_item = { "id": record["id"], "name_zh": record.get("name_zh", "") or "", "name_en": record.get("name_en", "") or "", "tag": tag_list, } bd_list.append(bd_item) logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点") return bd_list except Exception as e: logger.error(f"查询BusinessDomain节点列表失败: {str(e)}") raise e @staticmethod def _register_data_product( data: Dict[str, Any], dataflow_name: str, name_en: str, dataflow_id: Optional[int] = None, ) -> None: """ 注册数据产品到数据服务 当数据流创建成功后,自动将其注册为数据产品, 以便在数据服务模块中展示和管理。 Args: data: 数据流配置数据 dataflow_name: 数据流名称(中文) name_en: 数据流英文名 dataflow_id: 数据流ID(Neo4j节点ID) """ try: # 解析目标表信息 target_table_raw = data.get("target_table") or "" target_table = ( target_table_raw.split(":")[-1] if ":" in target_table_raw else target_table_raw ) # 如果没有指定目标表,使用英文名作为目标表名 if not target_table: target_table = name_en # 解析目标schema(默认为public) target_schema = "public" # 从script_requirement中尝试获取更多信息 script_requirement = data.get("script_requirement") description = data.get("describe", "") if script_requirement: if isinstance(script_requirement, dict): # 如果有rule字段,添加到描述中 rule = script_requirement.get("rule", "") if rule and not description: description = rule elif isinstance(script_requirement, str): try: req_json = json.loads(script_requirement) if isinstance(req_json, dict): rule = req_json.get("rule", "") if rule and not description: description = rule except (json.JSONDecodeError, TypeError): pass # 调用数据产品服务进行注册 DataProductService.register_data_product( product_name=dataflow_name, product_name_en=name_en, target_table=target_table, target_schema=target_schema, description=description, source_dataflow_id=dataflow_id, source_dataflow_name=dataflow_name, created_by=data.get("created_by", "dataflow"), ) logger.info( f"数据产品注册成功: {dataflow_name} -> {target_schema}.{target_table}" ) except Exception as e: logger.error(f"注册数据产品失败: {str(e)}") raise