import contextlib import json import logging import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Union from sqlalchemy import text from app import db from app.core.data_service.data_product_service import DataProductService from app.core.graph.graph_operations import ( connect_graph, create_or_get_node, get_node, relationship_exists, ) from app.core.meta_data import get_formatted_time, translate_and_parse logger = logging.getLogger(__name__) # 项目根目录 PROJECT_ROOT = Path(__file__).parent.parent.parent.parent class DataFlowService: """数据流服务类,处理数据流相关的业务逻辑""" @staticmethod def get_dataflows( page: int = 1, page_size: int = 10, search: str = "", ) -> Dict[str, Any]: """ 获取数据流列表 Args: page: 页码 page_size: 每页大小 search: 搜索关键词 Returns: 包含数据流列表和分页信息的字典 """ try: # 从图数据库查询数据流列表 skip_count = (page - 1) * page_size # 构建搜索条件 where_clause = "" params: Dict[str, Union[int, str]] = { "skip": skip_count, "limit": page_size, } if search: where_clause = ( "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search" ) params["search"] = search # 查询数据流列表(包含标签数组) # 使用WITH子句先分页,再聚合标签,避免分页结果不准确 query = f""" MATCH (n:DataFlow) {where_clause} WITH n ORDER BY n.created_at DESC SKIP $skip LIMIT $limit OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel) RETURN n, id(n) as node_id, n.created_at as created_at, collect({{ id: id(label), name_zh: label.name_zh, name_en: label.name_en }}) as tags """ # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常) try: with connect_graph().session() as session: list_result = session.run(query, params).data() # 查询总数 count_query = f""" MATCH (n:DataFlow) {where_clause} RETURN count(n) as total """ count_params = {"search": search} if search else {} count_result = session.run(count_query, count_params).single() total = count_result["total"] if count_result else 0 except Exception as e: # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭 # driver,因为connect_graph可能返回单例或新实例。如果是新实例, # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭, # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。 # 用户反馈:The driver.close() call prematurely closes a shared # driver instance,所以直接使用 session,并不关闭 driver。 logger.error(f"查询数据流失败: {str(e)}") raise e # 格式化结果 dataflows = [] for record in list_result: node = record["n"] dataflow = dict(node) dataflow["id"] = record["node_id"] # 使用查询返回的node_id # 处理标签数组,过滤掉空标签 tags = record.get("tags", []) dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None] dataflows.append(dataflow) return { "list": dataflows, "pagination": { "page": page, "page_size": page_size, "total": total, "total_pages": (total + page_size - 1) // page_size, }, } except Exception as e: logger.error(f"获取数据流列表失败: {str(e)}") raise e @staticmethod def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取数据流详情 Args: dataflow_id: 数据流ID Returns: 数据流详情字典,如果不存在则返回None """ try: # 从Neo4j获取DataFlow节点的所有属性(包含标签数组) neo4j_query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel) RETURN n, id(n) as node_id, collect({ id: id(label), name_zh: label.name_zh, name_en: label.name_en }) as tags """ with connect_graph().session() as session: neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data() if not neo4j_result: logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点") return None record = neo4j_result[0] node = record["n"] # 将节点属性转换为字典 dataflow = dict(node) dataflow["id"] = record["node_id"] # 处理标签数组,过滤掉空标签 tags = record.get("tags", []) dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None] # 处理 script_requirement:如果是JSON字符串,解析为对象 script_requirement_str = dataflow.get("script_requirement", "") if script_requirement_str: try: # 尝试解析JSON字符串 script_requirement_obj = json.loads(script_requirement_str) dataflow["script_requirement"] = script_requirement_obj logger.debug( "成功解析script_requirement: %s", script_requirement_obj, ) except (json.JSONDecodeError, TypeError) as e: logger.warning(f"script_requirement解析失败,保持原值: {e}") # 保持原值(字符串) dataflow["script_requirement"] = script_requirement_str else: # 如果为空,设置为None dataflow["script_requirement"] = None logger.info( "成功获取DataFlow详情,ID: %s, 名称: %s", dataflow_id, dataflow.get("name_zh"), ) return dataflow except Exception as e: logger.error(f"获取数据流详情失败: {str(e)}") raise e @staticmethod def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]: """ 创建新的数据流 Args: data: 数据流配置数据 Returns: 创建的数据流信息 """ try: # 验证必填字段 required_fields = ["name_zh", "describe"] for field in required_fields: if field not in data: raise ValueError(f"缺少必填字段: {field}") dataflow_name = data["name_zh"] # 使用LLM翻译名称生成英文名 try: result_list = translate_and_parse(dataflow_name) name_en = ( result_list[0] if result_list else dataflow_name.lower().replace(" ", "_") ) except Exception as e: logger.warning(f"翻译失败,使用默认英文名: {str(e)}") name_en = dataflow_name.lower().replace(" ", "_") # 处理 script_requirement,将其转换为 JSON 字符串 script_requirement = data.get("script_requirement") if script_requirement is not None: # 如果是字典或列表,转换为 JSON 字符串 if isinstance(script_requirement, (dict, list)): script_requirement_str = json.dumps( script_requirement, ensure_ascii=False ) else: # 如果已经是字符串,直接使用 script_requirement_str = str(script_requirement) else: script_requirement_str = "" # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联) node_data = { "name_zh": dataflow_name, "name_en": name_en, "category": data.get("category", ""), "organization": data.get("organization", ""), "leader": data.get("leader", ""), "frequency": data.get("frequency", ""), "describe": data.get("describe", ""), "status": data.get("status", "inactive"), "update_mode": data.get("update_mode", "append"), "script_type": data.get("script_type", "python"), "script_requirement": script_requirement_str, "script_path": "", # 脚本路径,任务完成后更新 "created_at": get_formatted_time(), "updated_at": get_formatted_time(), } # 创建或获取数据流节点 dataflow_id = get_node("DataFlow", name=dataflow_name) if dataflow_id: raise ValueError(f"数据流 '{dataflow_name}' 已存在") dataflow_id = create_or_get_node("DataFlow", **node_data) # 处理标签关系(支持多标签数组) tag_list = data.get("tag", []) if tag_list: try: DataFlowService._handle_tag_relationships(dataflow_id, tag_list) except Exception as e: logger.warning(f"处理标签关系时出错: {str(e)}") # 成功创建图数据库节点后,写入PG数据库 try: DataFlowService._save_to_pg_database(data, dataflow_name, name_en) logger.info(f"数据流信息已写入PG数据库: {dataflow_name}") # PG数据库记录成功写入后,在neo4j图数据库中创建script关系 try: DataFlowService._handle_script_relationships( data, dataflow_name, name_en ) logger.info(f"脚本关系创建成功: {dataflow_name}") except Exception as script_error: logger.warning(f"创建脚本关系失败: {str(script_error)}") except Exception as pg_error: logger.error(f"写入PG数据库失败: {str(pg_error)}") # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据 # 在实际应用中,可能需要考虑分布式事务 # 返回创建的数据流信息 # 查询创建的节点获取完整信息 query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id" with connect_graph().session() as session: id_result = session.run(query, name_zh=dataflow_name).single() if id_result: dataflow_node = id_result["n"] node_id = id_result["node_id"] # 将节点属性转换为字典 result = dict(dataflow_node) result["id"] = node_id else: # 如果查询失败,返回基本信息 result = { "id": (dataflow_id if isinstance(dataflow_id, int) else None), "name_zh": dataflow_name, "name_en": name_en, "created_at": get_formatted_time(), } # 注册数据产品到数据服务 try: DataFlowService._register_data_product( data=data, dataflow_name=dataflow_name, name_en=name_en, dataflow_id=result.get("id"), ) logger.info(f"数据产品注册成功: {dataflow_name}") except Exception as product_error: logger.warning(f"注册数据产品失败: {str(product_error)}") # 不影响主流程,仅记录警告 logger.info(f"创建数据流成功: {dataflow_name}") return result except Exception as e: logger.error(f"创建数据流失败: {str(e)}") raise e @staticmethod def _save_to_pg_database( data: Dict[str, Any], script_name: str, name_en: str, ): """ 将任务信息保存到PG数据库的task_list表 Args: data: 包含脚本信息的数据 script_name: 脚本名称 name_en: 英文名称 """ from app.config.config import config, current_env try: # 获取当前环境的配置 current_config = config.get(current_env, config["default"]) dataflow_schema = getattr(current_config, "DATAFLOW_SCHEMA", "dags") # 提取脚本相关信息 # 处理 script_requirement,确保保存为 JSON 字符串 script_requirement_raw = data.get("script_requirement") if script_requirement_raw is not None: if isinstance(script_requirement_raw, (dict, list)): script_requirement = json.dumps( script_requirement_raw, ensure_ascii=False ) else: script_requirement = str(script_requirement_raw) else: script_requirement = "" # 验证必需字段 if not script_name: raise ValueError("script_name不能为空") current_time = datetime.now() # 保存到task_list表 try: # 1. 解析script_requirement并构建详细的任务描述 task_description_md = script_requirement try: # 尝试解析JSON try: req_json = json.loads(script_requirement) except (json.JSONDecodeError, TypeError): req_json = None if isinstance(req_json, dict): # 1. 从script_requirement中提取rule字段作为request_content_str request_content_str = req_json.get("rule", "") # 2. 从script_requirement中提取source_table和 # target_table字段信息 source_table_ids = req_json.get("source_table", []) target_table_ids = req_json.get("target_table", []) # 确保是列表格式 if not isinstance(source_table_ids, list): source_table_ids = ( [source_table_ids] if source_table_ids else [] ) if not isinstance(target_table_ids, list): target_table_ids = ( [target_table_ids] if target_table_ids else [] ) # 合并所有BusinessDomain ID all_bd_ids = source_table_ids + target_table_ids # 4. 从data参数中提取update_mode update_mode = data.get("update_mode", "append") # 生成Business Domain DDLs source_ddls = [] target_ddls = [] data_source_info = None if all_bd_ids: try: with connect_graph().session() as session: # 处理source tables for bd_id in source_table_ids: ddl_info = DataFlowService._generate_businessdomain_ddl( session, bd_id, is_target=False, ) if ddl_info: source_ddls.append(ddl_info["ddl"]) # 3. 如果BELONGS_TO关系连接的是 # "数据资源",获取数据源信息 if ( ddl_info.get("data_source") and not data_source_info ): data_source_info = ddl_info[ "data_source" ] # 处理target tables(5. 目标表缺省要有create_time字段) for bd_id in target_table_ids: ddl_info = DataFlowService._generate_businessdomain_ddl( session, bd_id, is_target=True, update_mode=update_mode, ) if ddl_info: target_ddls.append(ddl_info["ddl"]) # 同样检查BELONGS_TO关系,获取数据源信息 if ( ddl_info.get("data_source") and not data_source_info ): data_source_info = ddl_info[ "data_source" ] except Exception as neo_e: logger.error( f"获取BusinessDomain DDL失败: {str(neo_e)}" ) # 构建Markdown格式的任务描述 task_desc_parts = [f"# Task: {script_name}\n"] # 添加DataFlow Schema配置信息 task_desc_parts.append("## DataFlow Configuration") task_desc_parts.append(f"- **Schema**: {dataflow_schema}\n") # 添加数据源信息 if data_source_info: task_desc_parts.append("## Data Source") task_desc_parts.append( f"- **Type**: {data_source_info.get('type', 'N/A')}" ) task_desc_parts.append( f"- **Host**: {data_source_info.get('host', 'N/A')}" ) task_desc_parts.append( f"- **Port**: {data_source_info.get('port', 'N/A')}" ) task_desc_parts.append( f"- **Database**: " f"{data_source_info.get('database', 'N/A')}\n" ) # 添加源表DDL if source_ddls: task_desc_parts.append("## Source Tables (DDL)") for ddl in source_ddls: task_desc_parts.append(f"```sql\n{ddl}\n```\n") # 添加目标表DDL if target_ddls: task_desc_parts.append("## Target Tables (DDL)") for ddl in target_ddls: task_desc_parts.append(f"```sql\n{ddl}\n```\n") # 添加更新模式说明 task_desc_parts.append("## Update Mode") if update_mode == "append": task_desc_parts.append("- **Mode**: Append (追加模式)") task_desc_parts.append( "- **Description**: 新数据将追加到目标表,不删除现有数据\n" ) else: task_desc_parts.append( "- **Mode**: Full Refresh (全量更新)" ) task_desc_parts.append( "- **Description**: 目标表将被清空后重新写入数据\n" ) # 添加请求内容(rule) if request_content_str: task_desc_parts.append("## Request Content") task_desc_parts.append(f"{request_content_str}\n") # 添加实施步骤(根据任务类型优化) task_desc_parts.append("## Implementation Steps") # 判断是否为远程数据源导入任务 if data_source_info: # 从远程数据源导入数据的简化步骤 task_desc_parts.append( "1. Create an n8n workflow to execute the " "data import task" ) task_desc_parts.append( "2. Configure the workflow to call " "`import_resource_data.py` Python script" ) task_desc_parts.append( "3. Pass the following parameters to the " "Python execution node:" ) task_desc_parts.append( " - `--source-config`: JSON configuration " "for the remote data source" ) task_desc_parts.append( " - `--target-table`: Target table name " "(data resource English name)" ) task_desc_parts.append( f" - `--update-mode`: {update_mode}" ) task_desc_parts.append( "4. The Python script will automatically:" ) task_desc_parts.append( " - Connect to the remote data source" ) task_desc_parts.append( " - Extract data from the source table" ) task_desc_parts.append( f" - Write data to target table using " f"{update_mode} mode" ) else: # 数据转换任务的完整步骤 task_desc_parts.append( "1. Extract data from source tables as " "specified in the DDL" ) task_desc_parts.append( "2. Apply transformation logic according to the rule:" ) if request_content_str: task_desc_parts.append( f" - Rule: {request_content_str}" ) task_desc_parts.append( "3. Generate Python program to implement the " "data transformation logic" ) task_desc_parts.append( f"4. Write transformed data to target table " f"using {update_mode} mode" ) task_desc_parts.append( "5. Create an n8n workflow to schedule and " "execute the Python program" ) task_description_md = "\n".join(task_desc_parts) except Exception as parse_e: logger.warning( f"解析任务描述详情失败,使用原始描述: {str(parse_e)}" ) task_description_md = script_requirement # 判断任务类型并设置code_path和code_name # 如果是远程数据源导入任务,使用通用的import_resource_data.py脚本 if data_source_info: # 远程数据源导入任务 code_path = "datafactory/scripts" code_name = "import_resource_data.py" logger.info( f"检测到远程数据源导入任务,使用通用脚本: " f"{code_path}/{code_name}" ) else: # 数据转换任务,需要生成专用脚本 code_path = "datafactory/scripts" code_name = script_name task_insert_sql = text( "INSERT INTO public.task_list\n" "(task_name, task_description, status, code_name, " "code_path, create_by, create_time, update_time)\n" "VALUES\n" "(:task_name, :task_description, :status, :code_name, " ":code_path, :create_by, :create_time, :update_time)" ) task_params = { "task_name": script_name, "task_description": task_description_md, "status": "pending", "code_name": code_name, "code_path": code_path, "create_by": "cursor", "create_time": current_time, "update_time": current_time, } db.session.execute(task_insert_sql, task_params) db.session.commit() logger.info(f"成功将任务信息写入task_list表: task_name={script_name}") # 自动生成 n8n 工作流 JSON 文件 try: DataFlowService._generate_n8n_workflow( script_name=script_name, code_name=code_name, code_path=code_path, update_mode=update_mode, is_import_task=bool(data_source_info), ) except Exception as wf_error: logger.warning(f"生成n8n工作流文件失败: {str(wf_error)}") # 不影响主流程 except Exception as task_error: db.session.rollback() logger.error(f"写入task_list表失败: {str(task_error)}") raise task_error except Exception as e: db.session.rollback() logger.error(f"保存到PG数据库失败: {str(e)}") raise e @staticmethod def _generate_n8n_workflow( script_name: str, code_name: str, code_path: str, update_mode: str = "append", is_import_task: bool = False, ) -> Optional[str]: """ 自动生成 n8n 工作流 JSON 文件 Args: script_name: 脚本/任务名称 code_name: 代码文件名 code_path: 代码路径 update_mode: 更新模式 is_import_task: 是否为数据导入任务 Returns: 生成的工作流文件路径,失败返回 None """ try: # 确保工作流目录存在 workflows_dir = PROJECT_ROOT / "datafactory" / "workflows" workflows_dir.mkdir(parents=True, exist_ok=True) # 生成工作流文件名 workflow_filename = f"{script_name}_workflow.json" workflow_path = workflows_dir / workflow_filename # 生成唯一ID def gen_id(): return str(uuid.uuid4()) # 构建完整的 SSH 命令,包含激活 venv # 注意:由于 n8n 服务器与应用服务器分离,必须使用 SSH 节点 ssh_command = ( f"cd /opt/dataops-platform && source venv/bin/activate && " f"python {code_path}/{code_name}" ) workflow_json = { "name": f"{script_name}_工作流", "nodes": [ { "parameters": {}, "id": gen_id(), "name": "Manual Trigger", "type": "n8n-nodes-base.manualTrigger", "typeVersion": 1, "position": [250, 300], }, { "parameters": { "resource": "command", "operation": "execute", "command": ssh_command, "cwd": "/opt/dataops-platform", }, "id": gen_id(), "name": "Execute Script", "type": "n8n-nodes-base.ssh", "typeVersion": 1, "position": [450, 300], "credentials": { "sshPassword": { "id": "pYTwwuyC15caQe6y", "name": "SSH Password account", } }, }, { "parameters": { "conditions": { "options": { "caseSensitive": True, "leftValue": "", "typeValidation": "strict", }, "conditions": [ { "id": "condition-success", "leftValue": "={{ $json.code }}", "rightValue": 0, "operator": { "type": "number", "operation": "equals", }, } ], "combinator": "and", } }, "id": gen_id(), "name": "Check Result", "type": "n8n-nodes-base.if", "typeVersion": 2, "position": [650, 300], }, { "parameters": { "assignments": { "assignments": [ { "id": "result-success", "name": "status", "value": "success", "type": "string", }, { "id": "result-message", "name": "message", "value": f"{script_name} 执行成功", "type": "string", }, { "id": "result-output", "name": "output", "value": "={{ $json.stdout }}", "type": "string", }, { "id": "result-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string", }, ] } }, "id": gen_id(), "name": "Success Response", "type": "n8n-nodes-base.set", "typeVersion": 3.4, "position": [850, 200], }, { "parameters": { "assignments": { "assignments": [ { "id": "error-status", "name": "status", "value": "error", "type": "string", }, { "id": "error-message", "name": "message", "value": f"{script_name} 执行失败", "type": "string", }, { "id": "error-output", "name": "error", "value": "={{ $json.stderr }}", "type": "string", }, { "id": "error-code", "name": "exitCode", "value": "={{ $json.code }}", "type": "number", }, { "id": "error-time", "name": "executionTime", "value": "={{ $now.toISO() }}", "type": "string", }, ] } }, "id": gen_id(), "name": "Error Response", "type": "n8n-nodes-base.set", "typeVersion": 3.4, "position": [850, 400], }, ], "connections": { "Manual Trigger": { "main": [ [ { "node": "Execute Script", "type": "main", "index": 0, } ] ] }, "Execute Script": { "main": [ [ { "node": "Check Result", "type": "main", "index": 0, } ] ] }, "Check Result": { "main": [ [ { "node": "Success Response", "type": "main", "index": 0, } ], [ { "node": "Error Response", "type": "main", "index": 0, } ], ] }, }, "active": False, "settings": {"executionOrder": "v1"}, "versionId": "1", "meta": { "templateCredsSetupCompleted": False, "instanceId": "dataops-platform", }, "tags": [ { "createdAt": datetime.now().isoformat() + "Z", "updatedAt": datetime.now().isoformat() + "Z", "id": "1", "name": "数据导入" if is_import_task else "数据流程", } ], } # 写入文件 with open(workflow_path, "w", encoding="utf-8") as f: json.dump(workflow_json, f, ensure_ascii=False, indent=2) logger.info(f"成功生成n8n工作流文件: {workflow_path}") return str(workflow_path) except Exception as e: logger.error(f"生成n8n工作流失败: {str(e)}") return None @staticmethod def _handle_children_relationships(dataflow_node, children_ids): """处理子节点关系""" logger.debug( "处理子节点关系,原始children_ids: %s, 类型: %s", children_ids, type(children_ids), ) # 确保children_ids是列表格式 if not isinstance(children_ids, (list, tuple)): if children_ids is not None: children_ids = [children_ids] # 如果是单个值,转换为列表 logger.debug(f"将单个值转换为列表: {children_ids}") else: children_ids = [] # 如果是None,转换为空列表 logger.debug("将None转换为空列表") for child_id in children_ids: try: # 查找子节点 query = "MATCH (n) WHERE id(n) = $child_id RETURN n" with connect_graph().session() as session: result = session.run(query, child_id=child_id).data() if result: # 获取dataflow_node的ID dataflow_id = getattr(dataflow_node, "identity", None) if dataflow_id is None: # 如果没有identity属性,从名称查询ID query_id = ( "MATCH (n:DataFlow) WHERE n.name_zh = " "$name_zh RETURN id(n) as node_id" ) id_result = session.run( query_id, name_zh=dataflow_node.get("name_zh"), ).single() dataflow_id = id_result["node_id"] if id_result else None # 创建关系 - 使用ID调用relationship_exists if dataflow_id and not relationship_exists( dataflow_id, "child", child_id ): session.run( "MATCH (a), (b) WHERE id(a) = $dataflow_id " "AND id(b) = $child_id " "CREATE (a)-[:child]->(b)", dataflow_id=dataflow_id, child_id=child_id, ) logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}") except Exception as e: logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}") @staticmethod def _handle_tag_relationships(dataflow_id, tag_list): """ 处理多标签关系 Args: dataflow_id: 数据流节点ID tag_list: 标签列表,可以是ID数组或包含id字段的对象数组 """ # 确保tag_list是列表格式 if not isinstance(tag_list, list): tag_list = [tag_list] if tag_list else [] for tag_item in tag_list: tag_id = None if isinstance(tag_item, dict) and "id" in tag_item: tag_id = int(tag_item["id"]) elif isinstance(tag_item, (int, str)): with contextlib.suppress(ValueError, TypeError): tag_id = int(tag_item) if tag_id: DataFlowService._handle_single_tag_relationship(dataflow_id, tag_id) @staticmethod def _handle_single_tag_relationship(dataflow_id, tag_id): """处理单个标签关系""" try: # 查找标签节点 query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n" with connect_graph().session() as session: result = session.run(query, tag_id=tag_id).data() # 创建关系 - 使用ID调用relationship_exists if ( result and dataflow_id and not relationship_exists(dataflow_id, "LABEL", tag_id) ): session.run( "MATCH (a), (b) WHERE id(a) = $dataflow_id " "AND id(b) = $tag_id " "CREATE (a)-[:LABEL]->(b)", dataflow_id=dataflow_id, tag_id=tag_id, ) logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}") except Exception as e: logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}") @staticmethod def update_dataflow_script_path( dataflow_name: str, script_path: str, ) -> bool: """ 更新 DataFlow 节点的脚本路径 当任务完成后,将创建的 Python 脚本路径更新到 DataFlow 节点 Args: dataflow_name: 数据流名称(中文名) script_path: Python 脚本的完整路径 Returns: 是否更新成功 """ try: query = """ MATCH (n:DataFlow {name_zh: $name_zh}) SET n.script_path = $script_path, n.updated_at = $updated_at RETURN n """ with connect_graph().session() as session: result = session.run( query, name_zh=dataflow_name, script_path=script_path, updated_at=get_formatted_time(), ).single() if result: logger.info( f"已更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}" ) return True else: logger.warning(f"未找到 DataFlow 节点: {dataflow_name}") return False except Exception as e: logger.error(f"更新 DataFlow 脚本路径失败: {str(e)}") return False @staticmethod def get_script_content(dataflow_id: int) -> Dict[str, Any]: """ 根据 DataFlow ID 获取关联的脚本内容 Args: dataflow_id: 数据流ID Returns: 包含脚本内容和元信息的字典: - script_path: 脚本路径 - script_content: 脚本内容 - script_type: 脚本类型(如 python) - dataflow_name: 数据流名称 Raises: ValueError: 当 DataFlow 不存在或脚本路径为空时 FileNotFoundError: 当脚本文件不存在时 """ from pathlib import Path try: # 从 Neo4j 获取 DataFlow 节点 query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n, id(n) as node_id """ with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).single() if not result: raise ValueError(f"未找到 ID 为 {dataflow_id} 的 DataFlow 节点") node = result["n"] node_props = dict(node) # 获取脚本路径 script_path = node_props.get("script_path", "") if not script_path: raise ValueError( f"DataFlow (ID: {dataflow_id}) 的 script_path 属性为空" ) # 确定脚本文件的完整路径 # script_path 可能是相对路径或绝对路径 script_file = Path(script_path) # 如果是相对路径,相对于项目根目录 if not script_file.is_absolute(): # 获取项目根目录(假设 app 目录的父目录是项目根) project_root = Path(__file__).parent.parent.parent.parent script_file = project_root / script_path # 检查文件是否存在 if not script_file.exists(): raise FileNotFoundError(f"脚本文件不存在: {script_file}") # 读取脚本内容 with script_file.open("r", encoding="utf-8") as f: script_content = f.read() # 确定脚本类型 suffix = script_file.suffix.lower() script_type_map = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".sql": "sql", ".sh": "shell", } script_type = script_type_map.get(suffix, "text") logger.info( f"成功读取脚本内容: DataFlow ID={dataflow_id}, " f"路径={script_path}, 类型={script_type}" ) return { "script_path": script_path, "script_content": script_content, "script_type": script_type, "dataflow_id": dataflow_id, "dataflow_name": node_props.get("name_zh", ""), "dataflow_name_en": node_props.get("name_en", ""), } except (ValueError, FileNotFoundError): raise except Exception as e: logger.error(f"获取脚本内容失败: {str(e)}") raise @staticmethod def update_dataflow( dataflow_id: int, data: Dict[str, Any], ) -> Optional[Dict[str, Any]]: """ 更新数据流 Args: dataflow_id: 数据流ID data: 更新的数据 Returns: 更新后的数据流信息,如果不存在则返回None """ try: # 提取 tag 数组(不作为节点属性存储) tag_list = data.pop("tag", None) # 查找节点 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: return None # 更新节点属性 update_fields = [] params: Dict[str, Any] = {"dataflow_id": dataflow_id} for key, value in data.items(): if key not in ["id", "created_at"]: # 保护字段 # 复杂对象序列化为 JSON 字符串 if key in ["config", "script_requirement"] and isinstance( value, dict ): value = json.dumps(value, ensure_ascii=False) update_fields.append(f"n.{key} = ${key}") params[key] = value if update_fields: params["updated_at"] = get_formatted_time() update_fields.append("n.updated_at = $updated_at") update_query = f""" MATCH (n:DataFlow) WHERE id(n) = $dataflow_id SET {", ".join(update_fields)} RETURN n, id(n) as node_id """ result = session.run(update_query, params).data() # 处理 tag 关系(支持多标签数组) if tag_list is not None: # 确保是列表格式 if not isinstance(tag_list, list): tag_list = [tag_list] if tag_list else [] # 先删除现有的 LABEL 关系 delete_query = """ MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel) WHERE id(n) = $dataflow_id DELETE r """ session.run(delete_query, dataflow_id=dataflow_id) logger.info(f"删除数据流 {dataflow_id} 的现有标签关系") # 为每个 tag 创建新的 LABEL 关系 for tag_item in tag_list: tag_id = None if isinstance(tag_item, dict) and "id" in tag_item: tag_id = int(tag_item["id"]) elif isinstance(tag_item, (int, str)): with contextlib.suppress(ValueError, TypeError): tag_id = int(tag_item) if tag_id: DataFlowService._handle_single_tag_relationship( dataflow_id, tag_id ) if result: node = result[0]["n"] updated_dataflow = dict(node) # 使用查询返回的node_id updated_dataflow["id"] = result[0]["node_id"] # 查询并添加标签数组到返回数据 tags_query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel) RETURN collect({ id: id(label), name_zh: label.name_zh, name_en: label.name_en }) as tags """ tags_result = session.run( tags_query, dataflow_id=dataflow_id ).single() if tags_result: tags = tags_result.get("tags", []) updated_dataflow["tag"] = [ tag for tag in tags if tag.get("id") is not None ] else: updated_dataflow["tag"] = [] logger.info(f"更新数据流成功: ID={dataflow_id}") return updated_dataflow return None except Exception as e: logger.error(f"更新数据流失败: {str(e)}") raise e @staticmethod def delete_dataflow(dataflow_id: int) -> bool: """ 删除数据流 Args: dataflow_id: 数据流ID Returns: 删除是否成功 """ try: # 删除节点及其关系 query = """ MATCH (n:DataFlow) WHERE id(n) = $dataflow_id DETACH DELETE n RETURN count(n) as deleted_count """ with connect_graph().session() as session: delete_result = session.run(query, dataflow_id=dataflow_id).single() result = delete_result["deleted_count"] if delete_result else 0 if result and result > 0: logger.info(f"删除数据流成功: ID={dataflow_id}") return True return False except Exception as e: logger.error(f"删除数据流失败: {str(e)}") raise e @staticmethod def execute_dataflow( dataflow_id: int, params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ 执行数据流 Args: dataflow_id: 数据流ID params: 执行参数 Returns: 执行结果信息 """ try: # 检查数据流是否存在 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}" # TODO: 这里应该实际执行数据流 # 目前返回模拟结果 result = { "execution_id": execution_id, "dataflow_id": dataflow_id, "status": "running", "started_at": datetime.now().isoformat(), "params": params or {}, "progress": 0, } logger.info( "开始执行数据流: ID=%s, execution_id=%s", dataflow_id, execution_id, ) return result except Exception as e: logger.error(f"执行数据流失败: {str(e)}") raise e @staticmethod def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]: """ 获取数据流执行状态 Args: dataflow_id: 数据流ID Returns: 执行状态信息 """ try: # TODO: 这里应该查询实际的执行状态 # 目前返回模拟状态 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") status = ["running", "completed", "failed", "pending"][dataflow_id % 4] return { "dataflow_id": dataflow_id, "status": status, "progress": ( 100 if status == "completed" else (dataflow_id * 10) % 100 ), "started_at": datetime.now().isoformat(), "completed_at": ( datetime.now().isoformat() if status == "completed" else None ), "error_message": ("执行过程中发生错误" if status == "failed" else None), } except Exception as e: logger.error(f"获取数据流状态失败: {str(e)}") raise e @staticmethod def get_dataflow_logs( dataflow_id: int, page: int = 1, page_size: int = 50, ) -> Dict[str, Any]: """ 获取数据流执行日志 Args: dataflow_id: 数据流ID page: 页码 page_size: 每页大小 Returns: 执行日志列表和分页信息 """ try: # TODO: 这里应该查询实际的执行日志 # 目前返回模拟日志 query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n" with connect_graph().session() as session: result = session.run(query, dataflow_id=dataflow_id).data() if not result: raise ValueError(f"数据流不存在: ID={dataflow_id}") mock_logs = [ { "id": i, "timestamp": datetime.now().isoformat(), "level": ["INFO", "WARNING", "ERROR"][i % 3], "message": f"数据流执行日志消息 {i}", "component": ["source", "transform", "target"][i % 3], } for i in range(1, 101) ] # 分页处理 total = len(mock_logs) start = (page - 1) * page_size end = start + page_size logs = mock_logs[start:end] return { "logs": logs, "pagination": { "page": page, "page_size": page_size, "total": total, "total_pages": (total + page_size - 1) // page_size, }, } except Exception as e: logger.error(f"获取数据流日志失败: {str(e)}") raise e @staticmethod def _generate_businessdomain_ddl( session, bd_id: int, is_target: bool = False, update_mode: str = "append", ) -> Optional[Dict[str, Any]]: """ 根据BusinessDomain节点ID生成DDL Args: session: Neo4j session对象 bd_id: BusinessDomain节点ID is_target: 是否为目标表(目标表需要添加create_time字段) update_mode: 更新模式(append或full) Returns: 包含ddl和data_source信息的字典,如果节点不存在则返回None """ try: # 查询BusinessDomain节点、元数据、标签关系和数据源关系 cypher = """ MATCH (bd:BusinessDomain) WHERE id(bd) = $bd_id OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta) OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel) OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource) RETURN bd, collect(DISTINCT m) as metadata, collect(DISTINCT { id: id(label), name_zh: label.name_zh, name_en: label.name_en }) as labels, ds.type as ds_type, ds.host as ds_host, ds.port as ds_port, ds.database as ds_database """ result = session.run(cypher, bd_id=bd_id).single() if not result or not result["bd"]: logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点") return None node = result["bd"] metadata = result["metadata"] # 处理标签数组,获取第一个有效标签名称用于判断 labels = result.get("labels", []) valid_labels = [label for label in labels if label.get("id") is not None] label_name = valid_labels[0].get("name_zh") if valid_labels else None # 生成DDL node_props = dict(node) table_name = node_props.get("name_en", f"table_{bd_id}") ddl_lines = [] ddl_lines.append(f"CREATE TABLE {table_name} (") column_definitions = [] # 添加元数据列 if metadata: for meta in metadata: if meta: meta_props = dict(meta) column_name = meta_props.get( "name_en", meta_props.get("name_zh", "unknown_column"), ) data_type = meta_props.get("data_type", "VARCHAR(255)") comment = meta_props.get("name_zh", "") column_def = f" {column_name} {data_type}" if comment: column_def += f" COMMENT '{comment}'" column_definitions.append(column_def) # 如果没有元数据,添加默认主键 if not column_definitions: column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'") # 5. 如果是目标表,添加create_time字段 if is_target: column_definitions.append( " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP " "COMMENT '数据创建时间'" ) ddl_lines.append(",\n".join(column_definitions)) ddl_lines.append(");") # 添加表注释 table_comment = node_props.get( "name_zh", node_props.get("describe", table_name) ) if table_comment and table_comment != table_name: ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';") ddl_content = "\n".join(ddl_lines) # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息 data_source = None if label_name == "数据资源" and result["ds_type"]: data_source = { "type": result["ds_type"], "host": result["ds_host"], "port": result["ds_port"], "database": result["ds_database"], } logger.info(f"获取到数据源信息: {data_source}") logger.debug( f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}" ) return { "ddl": ddl_content, "table_name": table_name, "data_source": data_source, } except Exception as e: logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}") return None @staticmethod def _handle_script_relationships( data: Dict[str, Any], dataflow_name: str, name_en: str, ): """ 处理脚本关系,在Neo4j图数据库中创建从source BusinessDomain到DataFlow的 INPUT关系,以及从DataFlow到target BusinessDomain的OUTPUT关系。 关系模型: - (source:BusinessDomain)-[:INPUT]->(dataflow:DataFlow) - (dataflow:DataFlow)-[:OUTPUT]->(target:BusinessDomain) Args: data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode """ try: # 从data中读取键值对 source_table_full = data.get("source_table", "") target_table_full = data.get("target_table", "") # 处理source_table和target_table的格式 # 格式: "label:name" 或 直接 "name" source_table = ( source_table_full.split(":")[-1] if ":" in source_table_full else source_table_full ) target_table = ( target_table_full.split(":")[-1] if ":" in target_table_full else target_table_full ) source_label = ( source_table_full.split(":")[0] if ":" in source_table_full else "BusinessDomain" ) target_label = ( target_table_full.split(":")[0] if ":" in target_table_full else "BusinessDomain" ) # 验证必要字段 if not source_table or not target_table: logger.warning( "source_table或target_table为空,跳过关系创建: " "source_table=%s, target_table=%s", source_table, target_table, ) return logger.info( "开始创建INPUT/OUTPUT关系: %s -[INPUT]-> %s -[OUTPUT]-> %s", source_table, dataflow_name, target_table, ) with connect_graph().session() as session: # 步骤1:获取DataFlow节点ID dataflow_query = """ MATCH (df:DataFlow {name_zh: $dataflow_name}) RETURN id(df) as dataflow_id """ df_result = session.run( dataflow_query, # type: ignore[arg-type] {"dataflow_name": dataflow_name}, ).single() if not df_result: logger.error(f"未找到DataFlow节点: {dataflow_name}") return dataflow_id = df_result["dataflow_id"] # 步骤2:获取或创建source节点 # 优先通过name_en匹配,其次通过name匹配 source_query = f""" MATCH (source:{source_label}) WHERE source.name_en = $source_table OR source.name = $source_table RETURN id(source) as source_id LIMIT 1 """ source_result = session.run( source_query, # type: ignore[arg-type] {"source_table": source_table}, ).single() if not source_result: logger.warning( "未找到source节点: %s,将创建新节点", source_table, ) # 创建source节点 create_source_query = f""" CREATE (source:{source_label} {{ name: $source_table, name_en: $source_table, created_at: $created_at, type: 'source' }}) RETURN id(source) as source_id """ source_result = session.run( create_source_query, # type: ignore[arg-type] { "source_table": source_table, "created_at": get_formatted_time(), }, ).single() source_id = source_result["source_id"] if source_result else None # 步骤3:获取或创建target节点 target_query = f""" MATCH (target:{target_label}) WHERE target.name_en = $target_table OR target.name = $target_table RETURN id(target) as target_id LIMIT 1 """ target_result = session.run( target_query, # type: ignore[arg-type] {"target_table": target_table}, ).single() if not target_result: logger.warning( "未找到target节点: %s,将创建新节点", target_table, ) # 创建target节点 create_target_query = f""" CREATE (target:{target_label} {{ name: $target_table, name_en: $target_table, created_at: $created_at, type: 'target' }}) RETURN id(target) as target_id """ target_result = session.run( create_target_query, # type: ignore[arg-type] { "target_table": target_table, "created_at": get_formatted_time(), }, ).single() target_id = target_result["target_id"] if target_result else None if not source_id or not target_id: logger.error( "无法获取source或target节点ID: source_id=%s, target_id=%s", source_id, target_id, ) return # 步骤4:创建 INPUT 关系 (source)-[:INPUT]->(dataflow) create_input_query = """ MATCH (source), (dataflow:DataFlow) WHERE id(source) = $source_id AND id(dataflow) = $dataflow_id MERGE (source)-[r:INPUT]->(dataflow) ON CREATE SET r.created_at = $created_at ON MATCH SET r.updated_at = $created_at RETURN r """ input_result = session.run( create_input_query, # type: ignore[arg-type] { "source_id": source_id, "dataflow_id": dataflow_id, "created_at": get_formatted_time(), }, ).single() if input_result: logger.info( "成功创建INPUT关系: %s -> %s", source_table, dataflow_name, ) else: logger.warning( "INPUT关系创建失败或已存在: %s -> %s", source_table, dataflow_name, ) # 步骤5:创建 OUTPUT 关系 (dataflow)-[:OUTPUT]->(target) create_output_query = """ MATCH (dataflow:DataFlow), (target) WHERE id(dataflow) = $dataflow_id AND id(target) = $target_id MERGE (dataflow)-[r:OUTPUT]->(target) ON CREATE SET r.created_at = $created_at ON MATCH SET r.updated_at = $created_at RETURN r """ output_result = session.run( create_output_query, # type: ignore[arg-type] { "dataflow_id": dataflow_id, "target_id": target_id, "created_at": get_formatted_time(), }, ).single() if output_result: logger.info( "成功创建OUTPUT关系: %s -> %s", dataflow_name, target_table, ) else: logger.warning( "OUTPUT关系创建失败或已存在: %s -> %s", dataflow_name, target_table, ) logger.info( "血缘关系创建完成: %s -[INPUT]-> %s -[OUTPUT]-> %s", source_table, dataflow_name, target_table, ) except Exception as e: logger.error(f"处理脚本关系失败: {str(e)}") raise e @staticmethod def get_business_domain_list() -> List[Dict[str, Any]]: """ 获取BusinessDomain节点列表 Returns: BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag """ try: logger.info("开始查询BusinessDomain节点列表") with connect_graph().session() as session: # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签) query = """ MATCH (bd:BusinessDomain) OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel) RETURN id(bd) as id, bd.name_zh as name_zh, bd.name_en as name_en, bd.create_time as create_time, collect({ id: id(label), name_zh: label.name_zh, name_en: label.name_en }) as tags ORDER BY create_time DESC """ result = session.run(query) bd_list = [] for record in result: # 处理标签数组,过滤掉空标签 tags = record.get("tags", []) tag_list = [tag for tag in tags if tag.get("id") is not None] bd_item = { "id": record["id"], "name_zh": record.get("name_zh", "") or "", "name_en": record.get("name_en", "") or "", "tag": tag_list, } bd_list.append(bd_item) logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点") return bd_list except Exception as e: logger.error(f"查询BusinessDomain节点列表失败: {str(e)}") raise e @staticmethod def _register_data_product( data: Dict[str, Any], dataflow_name: str, name_en: str, dataflow_id: Optional[int] = None, ) -> None: """ 注册数据产品到数据服务 当数据流创建成功后,自动将其注册为数据产品, 以便在数据服务模块中展示和管理。 从 script_requirement.target_table 中获取 BusinessDomain ID, 然后查询 Neo4j 获取对应节点的 name_zh 和 name_en 作为数据产品名称。 Args: data: 数据流配置数据 dataflow_name: 数据流名称(中文) name_en: 数据流英文名 dataflow_id: 数据流ID(Neo4j节点ID) """ try: # 从script_requirement中获取target_table(BusinessDomain ID列表) script_requirement = data.get("script_requirement") description = data.get("describe", "") # 解析 script_requirement req_json: Optional[Dict[str, Any]] = None if script_requirement: if isinstance(script_requirement, dict): req_json = script_requirement elif isinstance(script_requirement, str): try: parsed = json.loads(script_requirement) if isinstance(parsed, dict): req_json = parsed except (json.JSONDecodeError, TypeError): pass # 获取target_table中的BusinessDomain ID列表 target_bd_ids: List[int] = [] if req_json: target_table_ids = req_json.get("target_table", []) if isinstance(target_table_ids, list): target_bd_ids = [ int(bid) for bid in target_table_ids if bid is not None ] elif target_table_ids is not None: target_bd_ids = [int(target_table_ids)] # 如果有rule字段,添加到描述中 rule = req_json.get("rule", "") if rule and not description: description = rule # 如果没有target_table ID,则不注册数据产品 if not target_bd_ids: logger.warning( f"数据流 {dataflow_name} 没有指定target_table,跳过数据产品注册" ) return # 从Neo4j查询每个BusinessDomain节点的name_zh和name_en with connect_graph().session() as session: for bd_id in target_bd_ids: try: query = """ MATCH (bd:BusinessDomain) WHERE id(bd) = $bd_id RETURN bd.name_zh as name_zh, bd.name_en as name_en, bd.describe as describe """ result = session.run(query, bd_id=bd_id).single() if not result: logger.warning( f"未找到ID为 {bd_id} 的BusinessDomain节点,跳过" ) continue # 使用BusinessDomain节点的name_zh和name_en product_name = result.get("name_zh") or "" product_name_en = result.get("name_en") or "" # 如果没有name_zh,使用name_en if not product_name: product_name = product_name_en # 如果没有name_en,使用name_zh转换 if not product_name_en: product_name_en = product_name.lower().replace(" ", "_") # 目标表名使用BusinessDomain的name_en target_table = product_name_en # 如果BusinessDomain有describe且当前description为空,使用它 bd_describe = result.get("describe") or "" if bd_describe and not description: description = bd_describe # 解析目标schema(默认为public) target_schema = "public" # 调用数据产品服务进行注册 DataProductService.register_data_product( product_name=product_name, product_name_en=product_name_en, target_table=target_table, target_schema=target_schema, description=description, source_dataflow_id=dataflow_id, source_dataflow_name=dataflow_name, created_by=data.get("created_by", "dataflow"), ) logger.info( f"数据产品注册成功: {product_name} -> " f"{target_schema}.{target_table}" ) except Exception as bd_error: logger.error( f"处理BusinessDomain {bd_id} 失败: {str(bd_error)}" ) # 继续处理下一个 except Exception as e: logger.error(f"注册数据产品失败: {str(e)}") raise