| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997 |
- import contextlib
- import json
- import logging
- import uuid
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Union
- from sqlalchemy import text
- from app import db
- from app.core.data_service.data_product_service import DataProductService
- from app.core.graph.graph_operations import (
- connect_graph,
- create_or_get_node,
- get_node,
- relationship_exists,
- )
- from app.core.meta_data import get_formatted_time, translate_and_parse
- logger = logging.getLogger(__name__)
- # 项目根目录
- PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
- class DataFlowService:
- """数据流服务类,处理数据流相关的业务逻辑"""
- @staticmethod
- def get_dataflows(
- page: int = 1,
- page_size: int = 10,
- search: str = "",
- ) -> Dict[str, Any]:
- """
- 获取数据流列表
- Args:
- page: 页码
- page_size: 每页大小
- search: 搜索关键词
- Returns:
- 包含数据流列表和分页信息的字典
- """
- try:
- # 从图数据库查询数据流列表
- skip_count = (page - 1) * page_size
- # 构建搜索条件
- where_clause = ""
- params: Dict[str, Union[int, str]] = {
- "skip": skip_count,
- "limit": page_size,
- }
- if search:
- where_clause = (
- "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
- )
- params["search"] = search
- # 查询数据流列表(包含标签数组)
- # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
- query = f"""
- MATCH (n:DataFlow)
- {where_clause}
- WITH n
- ORDER BY n.created_at DESC
- SKIP $skip
- LIMIT $limit
- OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
- RETURN n, id(n) as node_id,
- n.created_at as created_at,
- collect({{
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }}) as tags
- """
- # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
- try:
- with connect_graph().session() as session:
- list_result = session.run(query, params).data()
- # 查询总数
- count_query = f"""
- MATCH (n:DataFlow)
- {where_clause}
- RETURN count(n) as total
- """
- count_params = {"search": search} if search else {}
- count_result = session.run(count_query, count_params).single()
- total = count_result["total"] if count_result else 0
- except Exception as e:
- # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭
- # driver,因为connect_graph可能返回单例或新实例。如果是新实例,
- # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭,
- # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。
- # 用户反馈:The driver.close() call prematurely closes a shared
- # driver instance,所以直接使用 session,并不关闭 driver。
- logger.error(f"查询数据流失败: {str(e)}")
- raise e
- # 格式化结果
- dataflows = []
- for record in list_result:
- node = record["n"]
- dataflow = dict(node)
- dataflow["id"] = record["node_id"] # 使用查询返回的node_id
- # 处理标签数组,过滤掉空标签
- tags = record.get("tags", [])
- dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
- dataflows.append(dataflow)
- return {
- "list": dataflows,
- "pagination": {
- "page": page,
- "page_size": page_size,
- "total": total,
- "total_pages": (total + page_size - 1) // page_size,
- },
- }
- except Exception as e:
- logger.error(f"获取数据流列表失败: {str(e)}")
- raise e
- @staticmethod
- def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
- """
- 根据ID获取数据流详情
- Args:
- dataflow_id: 数据流ID
- Returns:
- 数据流详情字典,如果不存在则返回None
- """
- try:
- # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
- neo4j_query = """
- MATCH (n:DataFlow)
- WHERE id(n) = $dataflow_id
- OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
- RETURN n, id(n) as node_id,
- collect({
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }) as tags
- """
- with connect_graph().session() as session:
- neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
- if not neo4j_result:
- logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
- return None
- record = neo4j_result[0]
- node = record["n"]
- # 将节点属性转换为字典
- dataflow = dict(node)
- dataflow["id"] = record["node_id"]
- # 处理标签数组,过滤掉空标签
- tags = record.get("tags", [])
- dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
- # 处理 script_requirement:如果是JSON字符串,解析为对象
- script_requirement_str = dataflow.get("script_requirement", "")
- if script_requirement_str:
- try:
- # 尝试解析JSON字符串
- script_requirement_obj = json.loads(script_requirement_str)
- dataflow["script_requirement"] = script_requirement_obj
- logger.debug(
- "成功解析script_requirement: %s",
- script_requirement_obj,
- )
- except (json.JSONDecodeError, TypeError) as e:
- logger.warning(f"script_requirement解析失败,保持原值: {e}")
- # 保持原值(字符串)
- dataflow["script_requirement"] = script_requirement_str
- else:
- # 如果为空,设置为None
- dataflow["script_requirement"] = None
- logger.info(
- "成功获取DataFlow详情,ID: %s, 名称: %s",
- dataflow_id,
- dataflow.get("name_zh"),
- )
- return dataflow
- except Exception as e:
- logger.error(f"获取数据流详情失败: {str(e)}")
- raise e
- @staticmethod
- def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
- """
- 创建新的数据流
- Args:
- data: 数据流配置数据
- Returns:
- 创建的数据流信息
- """
- try:
- # 验证必填字段
- required_fields = ["name_zh", "describe"]
- for field in required_fields:
- if field not in data:
- raise ValueError(f"缺少必填字段: {field}")
- dataflow_name = data["name_zh"]
- # 使用LLM翻译名称生成英文名
- try:
- result_list = translate_and_parse(dataflow_name)
- name_en = (
- result_list[0]
- if result_list
- else dataflow_name.lower().replace(" ", "_")
- )
- except Exception as e:
- logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
- name_en = dataflow_name.lower().replace(" ", "_")
- # 处理 script_requirement,将其转换为 JSON 字符串
- script_requirement = data.get("script_requirement")
- if script_requirement is not None:
- # 如果是字典或列表,转换为 JSON 字符串
- if isinstance(script_requirement, (dict, list)):
- script_requirement_str = json.dumps(
- script_requirement, ensure_ascii=False
- )
- else:
- # 如果已经是字符串,直接使用
- script_requirement_str = str(script_requirement)
- else:
- script_requirement_str = ""
- # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
- node_data = {
- "name_zh": dataflow_name,
- "name_en": name_en,
- "category": data.get("category", ""),
- "organization": data.get("organization", ""),
- "leader": data.get("leader", ""),
- "frequency": data.get("frequency", ""),
- "describe": data.get("describe", ""),
- "status": data.get("status", "inactive"),
- "update_mode": data.get("update_mode", "append"),
- "script_type": data.get("script_type", "python"),
- "script_requirement": script_requirement_str,
- "script_path": "", # 脚本路径,任务完成后更新
- "created_at": get_formatted_time(),
- "updated_at": get_formatted_time(),
- }
- # 创建或获取数据流节点
- dataflow_id = get_node("DataFlow", name=dataflow_name)
- if dataflow_id:
- raise ValueError(f"数据流 '{dataflow_name}' 已存在")
- dataflow_id = create_or_get_node("DataFlow", **node_data)
- # 处理标签关系(支持多标签数组)
- tag_list = data.get("tag", [])
- if tag_list:
- try:
- DataFlowService._handle_tag_relationships(dataflow_id, tag_list)
- except Exception as e:
- logger.warning(f"处理标签关系时出错: {str(e)}")
- # 成功创建图数据库节点后,写入PG数据库
- try:
- DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
- logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
- # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
- try:
- DataFlowService._handle_script_relationships(
- data, dataflow_name, name_en
- )
- logger.info(f"脚本关系创建成功: {dataflow_name}")
- except Exception as script_error:
- logger.warning(f"创建脚本关系失败: {str(script_error)}")
- except Exception as pg_error:
- logger.error(f"写入PG数据库失败: {str(pg_error)}")
- # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
- # 在实际应用中,可能需要考虑分布式事务
- # 返回创建的数据流信息
- # 查询创建的节点获取完整信息
- query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
- with connect_graph().session() as session:
- id_result = session.run(query, name_zh=dataflow_name).single()
- if id_result:
- dataflow_node = id_result["n"]
- node_id = id_result["node_id"]
- # 将节点属性转换为字典
- result = dict(dataflow_node)
- result["id"] = node_id
- else:
- # 如果查询失败,返回基本信息
- result = {
- "id": (dataflow_id if isinstance(dataflow_id, int) else None),
- "name_zh": dataflow_name,
- "name_en": name_en,
- "created_at": get_formatted_time(),
- }
- # 注册数据产品到数据服务
- try:
- DataFlowService._register_data_product(
- data=data,
- dataflow_name=dataflow_name,
- name_en=name_en,
- dataflow_id=result.get("id"),
- )
- logger.info(f"数据产品注册成功: {dataflow_name}")
- except Exception as product_error:
- logger.warning(f"注册数据产品失败: {str(product_error)}")
- # 不影响主流程,仅记录警告
- logger.info(f"创建数据流成功: {dataflow_name}")
- return result
- except Exception as e:
- logger.error(f"创建数据流失败: {str(e)}")
- raise e
- @staticmethod
- def _save_to_pg_database(
- data: Dict[str, Any],
- script_name: str,
- name_en: str,
- ):
- """
- 将任务信息保存到PG数据库的task_list表
- Args:
- data: 包含脚本信息的数据
- script_name: 脚本名称
- name_en: 英文名称
- """
- from app.config.config import config, current_env
- try:
- # 获取当前环境的配置
- current_config = config.get(current_env, config["default"])
- dataflow_schema = getattr(current_config, "DATAFLOW_SCHEMA", "dags")
- # 提取脚本相关信息
- # 处理 script_requirement,确保保存为 JSON 字符串
- script_requirement_raw = data.get("script_requirement")
- if script_requirement_raw is not None:
- if isinstance(script_requirement_raw, (dict, list)):
- script_requirement = json.dumps(
- script_requirement_raw, ensure_ascii=False
- )
- else:
- script_requirement = str(script_requirement_raw)
- else:
- script_requirement = ""
- # 验证必需字段
- if not script_name:
- raise ValueError("script_name不能为空")
- current_time = datetime.now()
- # 保存到task_list表
- try:
- # 1. 解析script_requirement并构建详细的任务描述
- task_description_md = script_requirement
- try:
- # 尝试解析JSON
- try:
- req_json = json.loads(script_requirement)
- except (json.JSONDecodeError, TypeError):
- req_json = None
- if isinstance(req_json, dict):
- # 1. 从script_requirement中提取rule字段作为request_content_str
- request_content_str = req_json.get("rule", "")
- # 2. 从script_requirement中提取source_table和
- # target_table字段信息
- source_table_ids = req_json.get("source_table", [])
- target_table_ids = req_json.get("target_table", [])
- # 确保是列表格式
- if not isinstance(source_table_ids, list):
- source_table_ids = (
- [source_table_ids] if source_table_ids else []
- )
- if not isinstance(target_table_ids, list):
- target_table_ids = (
- [target_table_ids] if target_table_ids else []
- )
- # 合并所有BusinessDomain ID
- all_bd_ids = source_table_ids + target_table_ids
- # 4. 从data参数中提取update_mode
- update_mode = data.get("update_mode", "append")
- # 生成Business Domain DDLs
- source_ddls = []
- target_ddls = []
- data_source_info = None
- if all_bd_ids:
- try:
- with connect_graph().session() as session:
- # 处理source tables
- for bd_id in source_table_ids:
- ddl_info = DataFlowService._generate_businessdomain_ddl(
- session,
- bd_id,
- is_target=False,
- )
- if ddl_info:
- source_ddls.append(ddl_info["ddl"])
- # 3. 如果BELONGS_TO关系连接的是
- # "数据资源",获取数据源信息
- if (
- ddl_info.get("data_source")
- and not data_source_info
- ):
- data_source_info = ddl_info[
- "data_source"
- ]
- # 处理target tables(5. 目标表缺省要有create_time字段)
- for bd_id in target_table_ids:
- ddl_info = DataFlowService._generate_businessdomain_ddl(
- session,
- bd_id,
- is_target=True,
- update_mode=update_mode,
- )
- if ddl_info:
- target_ddls.append(ddl_info["ddl"])
- # 同样检查BELONGS_TO关系,获取数据源信息
- if (
- ddl_info.get("data_source")
- and not data_source_info
- ):
- data_source_info = ddl_info[
- "data_source"
- ]
- except Exception as neo_e:
- logger.error(
- f"获取BusinessDomain DDL失败: {str(neo_e)}"
- )
- # 构建Markdown格式的任务描述
- task_desc_parts = [f"# Task: {script_name}\n"]
- # 添加DataFlow Schema配置信息
- task_desc_parts.append("## DataFlow Configuration")
- task_desc_parts.append(f"- **Schema**: {dataflow_schema}\n")
- # 添加数据源信息
- if data_source_info:
- task_desc_parts.append("## Data Source")
- task_desc_parts.append(
- f"- **Type**: {data_source_info.get('type', 'N/A')}"
- )
- task_desc_parts.append(
- f"- **Host**: {data_source_info.get('host', 'N/A')}"
- )
- task_desc_parts.append(
- f"- **Port**: {data_source_info.get('port', 'N/A')}"
- )
- task_desc_parts.append(
- f"- **Database**: "
- f"{data_source_info.get('database', 'N/A')}\n"
- )
- # 添加源表DDL
- if source_ddls:
- task_desc_parts.append("## Source Tables (DDL)")
- for ddl in source_ddls:
- task_desc_parts.append(f"```sql\n{ddl}\n```\n")
- # 添加目标表DDL
- if target_ddls:
- task_desc_parts.append("## Target Tables (DDL)")
- for ddl in target_ddls:
- task_desc_parts.append(f"```sql\n{ddl}\n```\n")
- # 添加更新模式说明
- task_desc_parts.append("## Update Mode")
- if update_mode == "append":
- task_desc_parts.append("- **Mode**: Append (追加模式)")
- task_desc_parts.append(
- "- **Description**: 新数据将追加到目标表,不删除现有数据\n"
- )
- else:
- task_desc_parts.append(
- "- **Mode**: Full Refresh (全量更新)"
- )
- task_desc_parts.append(
- "- **Description**: 目标表将被清空后重新写入数据\n"
- )
- # 添加请求内容(rule)
- if request_content_str:
- task_desc_parts.append("## Request Content")
- task_desc_parts.append(f"{request_content_str}\n")
- # 添加实施步骤(根据任务类型优化)
- task_desc_parts.append("## Implementation Steps")
- # 判断是否为远程数据源导入任务
- if data_source_info:
- # 从远程数据源导入数据的简化步骤
- task_desc_parts.append(
- "1. Create an n8n workflow to execute the "
- "data import task"
- )
- task_desc_parts.append(
- "2. Configure the workflow to call "
- "`import_resource_data.py` Python script"
- )
- task_desc_parts.append(
- "3. Pass the following parameters to the "
- "Python execution node:"
- )
- task_desc_parts.append(
- " - `--source-config`: JSON configuration "
- "for the remote data source"
- )
- task_desc_parts.append(
- " - `--target-table`: Target table name "
- "(data resource English name)"
- )
- task_desc_parts.append(
- f" - `--update-mode`: {update_mode}"
- )
- task_desc_parts.append(
- "4. The Python script will automatically:"
- )
- task_desc_parts.append(
- " - Connect to the remote data source"
- )
- task_desc_parts.append(
- " - Extract data from the source table"
- )
- task_desc_parts.append(
- f" - Write data to target table using "
- f"{update_mode} mode"
- )
- else:
- # 数据转换任务的完整步骤
- task_desc_parts.append(
- "1. Extract data from source tables as "
- "specified in the DDL"
- )
- task_desc_parts.append(
- "2. Apply transformation logic according to the rule:"
- )
- if request_content_str:
- task_desc_parts.append(
- f" - Rule: {request_content_str}"
- )
- task_desc_parts.append(
- "3. Generate Python program to implement the "
- "data transformation logic"
- )
- task_desc_parts.append(
- f"4. Write transformed data to target table "
- f"using {update_mode} mode"
- )
- task_desc_parts.append(
- "5. Create an n8n workflow to schedule and "
- "execute the Python program"
- )
- task_description_md = "\n".join(task_desc_parts)
- except Exception as parse_e:
- logger.warning(
- f"解析任务描述详情失败,使用原始描述: {str(parse_e)}"
- )
- task_description_md = script_requirement
- # 判断任务类型并设置code_path和code_name
- # 如果是远程数据源导入任务,使用通用的import_resource_data.py脚本
- if data_source_info:
- # 远程数据源导入任务
- code_path = "datafactory/scripts"
- code_name = "import_resource_data.py"
- logger.info(
- f"检测到远程数据源导入任务,使用通用脚本: "
- f"{code_path}/{code_name}"
- )
- else:
- # 数据转换任务,需要生成专用脚本
- code_path = "datafactory/scripts"
- code_name = script_name
- task_insert_sql = text(
- "INSERT INTO public.task_list\n"
- "(task_name, task_description, status, code_name, "
- "code_path, create_by, create_time, update_time)\n"
- "VALUES\n"
- "(:task_name, :task_description, :status, :code_name, "
- ":code_path, :create_by, :create_time, :update_time)"
- )
- task_params = {
- "task_name": script_name,
- "task_description": task_description_md,
- "status": "pending",
- "code_name": code_name,
- "code_path": code_path,
- "create_by": "cursor",
- "create_time": current_time,
- "update_time": current_time,
- }
- db.session.execute(task_insert_sql, task_params)
- db.session.commit()
- logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
- # 自动生成 n8n 工作流 JSON 文件
- try:
- DataFlowService._generate_n8n_workflow(
- script_name=script_name,
- code_name=code_name,
- code_path=code_path,
- update_mode=update_mode,
- is_import_task=bool(data_source_info),
- )
- except Exception as wf_error:
- logger.warning(f"生成n8n工作流文件失败: {str(wf_error)}")
- # 不影响主流程
- except Exception as task_error:
- db.session.rollback()
- logger.error(f"写入task_list表失败: {str(task_error)}")
- raise task_error
- except Exception as e:
- db.session.rollback()
- logger.error(f"保存到PG数据库失败: {str(e)}")
- raise e
- @staticmethod
- def _generate_n8n_workflow(
- script_name: str,
- code_name: str,
- code_path: str,
- update_mode: str = "append",
- is_import_task: bool = False,
- ) -> Optional[str]:
- """
- 自动生成 n8n 工作流 JSON 文件
- Args:
- script_name: 脚本/任务名称
- code_name: 代码文件名
- code_path: 代码路径
- update_mode: 更新模式
- is_import_task: 是否为数据导入任务
- Returns:
- 生成的工作流文件路径,失败返回 None
- """
- try:
- # 确保工作流目录存在
- workflows_dir = PROJECT_ROOT / "datafactory" / "workflows"
- workflows_dir.mkdir(parents=True, exist_ok=True)
- # 生成工作流文件名
- workflow_filename = f"{script_name}_workflow.json"
- workflow_path = workflows_dir / workflow_filename
- # 生成唯一ID
- def gen_id():
- return str(uuid.uuid4())
- # 构建完整的 SSH 命令,包含激活 venv
- # 注意:由于 n8n 服务器与应用服务器分离,必须使用 SSH 节点
- ssh_command = (
- f"cd /opt/dataops-platform && source venv/bin/activate && "
- f"python {code_path}/{code_name}"
- )
- workflow_json = {
- "name": f"{script_name}_工作流",
- "nodes": [
- {
- "parameters": {},
- "id": gen_id(),
- "name": "Manual Trigger",
- "type": "n8n-nodes-base.manualTrigger",
- "typeVersion": 1,
- "position": [250, 300],
- },
- {
- "parameters": {
- "resource": "command",
- "operation": "execute",
- "command": ssh_command,
- "cwd": "/opt/dataops-platform",
- },
- "id": gen_id(),
- "name": "Execute Script",
- "type": "n8n-nodes-base.ssh",
- "typeVersion": 1,
- "position": [450, 300],
- "credentials": {
- "sshPassword": {
- "id": "pYTwwuyC15caQe6y",
- "name": "SSH Password account",
- }
- },
- },
- {
- "parameters": {
- "conditions": {
- "options": {
- "caseSensitive": True,
- "leftValue": "",
- "typeValidation": "strict",
- },
- "conditions": [
- {
- "id": "condition-success",
- "leftValue": "={{ $json.code }}",
- "rightValue": 0,
- "operator": {
- "type": "number",
- "operation": "equals",
- },
- }
- ],
- "combinator": "and",
- }
- },
- "id": gen_id(),
- "name": "Check Result",
- "type": "n8n-nodes-base.if",
- "typeVersion": 2,
- "position": [650, 300],
- },
- {
- "parameters": {
- "assignments": {
- "assignments": [
- {
- "id": "result-success",
- "name": "status",
- "value": "success",
- "type": "string",
- },
- {
- "id": "result-message",
- "name": "message",
- "value": f"{script_name} 执行成功",
- "type": "string",
- },
- {
- "id": "result-output",
- "name": "output",
- "value": "={{ $json.stdout }}",
- "type": "string",
- },
- {
- "id": "result-time",
- "name": "executionTime",
- "value": "={{ $now.toISO() }}",
- "type": "string",
- },
- ]
- }
- },
- "id": gen_id(),
- "name": "Success Response",
- "type": "n8n-nodes-base.set",
- "typeVersion": 3.4,
- "position": [850, 200],
- },
- {
- "parameters": {
- "assignments": {
- "assignments": [
- {
- "id": "error-status",
- "name": "status",
- "value": "error",
- "type": "string",
- },
- {
- "id": "error-message",
- "name": "message",
- "value": f"{script_name} 执行失败",
- "type": "string",
- },
- {
- "id": "error-output",
- "name": "error",
- "value": "={{ $json.stderr }}",
- "type": "string",
- },
- {
- "id": "error-code",
- "name": "exitCode",
- "value": "={{ $json.code }}",
- "type": "number",
- },
- {
- "id": "error-time",
- "name": "executionTime",
- "value": "={{ $now.toISO() }}",
- "type": "string",
- },
- ]
- }
- },
- "id": gen_id(),
- "name": "Error Response",
- "type": "n8n-nodes-base.set",
- "typeVersion": 3.4,
- "position": [850, 400],
- },
- ],
- "connections": {
- "Manual Trigger": {
- "main": [
- [
- {
- "node": "Execute Script",
- "type": "main",
- "index": 0,
- }
- ]
- ]
- },
- "Execute Script": {
- "main": [
- [
- {
- "node": "Check Result",
- "type": "main",
- "index": 0,
- }
- ]
- ]
- },
- "Check Result": {
- "main": [
- [
- {
- "node": "Success Response",
- "type": "main",
- "index": 0,
- }
- ],
- [
- {
- "node": "Error Response",
- "type": "main",
- "index": 0,
- }
- ],
- ]
- },
- },
- "active": False,
- "settings": {"executionOrder": "v1"},
- "versionId": "1",
- "meta": {
- "templateCredsSetupCompleted": False,
- "instanceId": "dataops-platform",
- },
- "tags": [
- {
- "createdAt": datetime.now().isoformat() + "Z",
- "updatedAt": datetime.now().isoformat() + "Z",
- "id": "1",
- "name": "数据导入" if is_import_task else "数据流程",
- }
- ],
- }
- # 写入文件
- with open(workflow_path, "w", encoding="utf-8") as f:
- json.dump(workflow_json, f, ensure_ascii=False, indent=2)
- logger.info(f"成功生成n8n工作流文件: {workflow_path}")
- return str(workflow_path)
- except Exception as e:
- logger.error(f"生成n8n工作流失败: {str(e)}")
- return None
- @staticmethod
- def _handle_children_relationships(dataflow_node, children_ids):
- """处理子节点关系"""
- logger.debug(
- "处理子节点关系,原始children_ids: %s, 类型: %s",
- children_ids,
- type(children_ids),
- )
- # 确保children_ids是列表格式
- if not isinstance(children_ids, (list, tuple)):
- if children_ids is not None:
- children_ids = [children_ids] # 如果是单个值,转换为列表
- logger.debug(f"将单个值转换为列表: {children_ids}")
- else:
- children_ids = [] # 如果是None,转换为空列表
- logger.debug("将None转换为空列表")
- for child_id in children_ids:
- try:
- # 查找子节点
- query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, child_id=child_id).data()
- if result:
- # 获取dataflow_node的ID
- dataflow_id = getattr(dataflow_node, "identity", None)
- if dataflow_id is None:
- # 如果没有identity属性,从名称查询ID
- query_id = (
- "MATCH (n:DataFlow) WHERE n.name_zh = "
- "$name_zh RETURN id(n) as node_id"
- )
- id_result = session.run(
- query_id,
- name_zh=dataflow_node.get("name_zh"),
- ).single()
- dataflow_id = id_result["node_id"] if id_result else None
- # 创建关系 - 使用ID调用relationship_exists
- if dataflow_id and not relationship_exists(
- dataflow_id, "child", child_id
- ):
- session.run(
- "MATCH (a), (b) WHERE id(a) = $dataflow_id "
- "AND id(b) = $child_id "
- "CREATE (a)-[:child]->(b)",
- dataflow_id=dataflow_id,
- child_id=child_id,
- )
- logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
- except Exception as e:
- logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
- @staticmethod
- def _handle_tag_relationships(dataflow_id, tag_list):
- """
- 处理多标签关系
- Args:
- dataflow_id: 数据流节点ID
- tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
- """
- # 确保tag_list是列表格式
- if not isinstance(tag_list, list):
- tag_list = [tag_list] if tag_list else []
- for tag_item in tag_list:
- tag_id = None
- if isinstance(tag_item, dict) and "id" in tag_item:
- tag_id = int(tag_item["id"])
- elif isinstance(tag_item, (int, str)):
- with contextlib.suppress(ValueError, TypeError):
- tag_id = int(tag_item)
- if tag_id:
- DataFlowService._handle_single_tag_relationship(dataflow_id, tag_id)
- @staticmethod
- def _handle_single_tag_relationship(dataflow_id, tag_id):
- """处理单个标签关系"""
- try:
- # 查找标签节点
- query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, tag_id=tag_id).data()
- # 创建关系 - 使用ID调用relationship_exists
- if (
- result
- and dataflow_id
- and not relationship_exists(dataflow_id, "LABEL", tag_id)
- ):
- session.run(
- "MATCH (a), (b) WHERE id(a) = $dataflow_id "
- "AND id(b) = $tag_id "
- "CREATE (a)-[:LABEL]->(b)",
- dataflow_id=dataflow_id,
- tag_id=tag_id,
- )
- logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
- except Exception as e:
- logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
- @staticmethod
- def update_dataflow_script_path(
- dataflow_name: str,
- script_path: str,
- ) -> bool:
- """
- 更新 DataFlow 节点的脚本路径
- 当任务完成后,将创建的 Python 脚本路径更新到 DataFlow 节点
- Args:
- dataflow_name: 数据流名称(中文名)
- script_path: Python 脚本的完整路径
- Returns:
- 是否更新成功
- """
- try:
- query = """
- MATCH (n:DataFlow {name_zh: $name_zh})
- SET n.script_path = $script_path, n.updated_at = $updated_at
- RETURN n
- """
- with connect_graph().session() as session:
- result = session.run(
- query,
- name_zh=dataflow_name,
- script_path=script_path,
- updated_at=get_formatted_time(),
- ).single()
- if result:
- logger.info(
- f"已更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
- )
- return True
- else:
- logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
- return False
- except Exception as e:
- logger.error(f"更新 DataFlow 脚本路径失败: {str(e)}")
- return False
- @staticmethod
- def get_script_content(dataflow_id: int) -> Dict[str, Any]:
- """
- 根据 DataFlow ID 获取关联的脚本内容
- Args:
- dataflow_id: 数据流ID
- Returns:
- 包含脚本内容和元信息的字典:
- - script_path: 脚本路径
- - script_content: 脚本内容
- - script_type: 脚本类型(如 python)
- - dataflow_name: 数据流名称
- Raises:
- ValueError: 当 DataFlow 不存在或脚本路径为空时
- FileNotFoundError: 当脚本文件不存在时
- """
- from pathlib import Path
- try:
- # 从 Neo4j 获取 DataFlow 节点
- query = """
- MATCH (n:DataFlow)
- WHERE id(n) = $dataflow_id
- RETURN n, id(n) as node_id
- """
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).single()
- if not result:
- raise ValueError(f"未找到 ID 为 {dataflow_id} 的 DataFlow 节点")
- node = result["n"]
- node_props = dict(node)
- # 获取脚本路径
- script_path = node_props.get("script_path", "")
- if not script_path:
- raise ValueError(
- f"DataFlow (ID: {dataflow_id}) 的 script_path 属性为空"
- )
- # 确定脚本文件的完整路径
- # script_path 可能是相对路径或绝对路径
- script_file = Path(script_path)
- # 如果是相对路径,相对于项目根目录
- if not script_file.is_absolute():
- # 获取项目根目录(假设 app 目录的父目录是项目根)
- project_root = Path(__file__).parent.parent.parent.parent
- script_file = project_root / script_path
- # 检查文件是否存在
- if not script_file.exists():
- raise FileNotFoundError(f"脚本文件不存在: {script_file}")
- # 读取脚本内容
- with script_file.open("r", encoding="utf-8") as f:
- script_content = f.read()
- # 确定脚本类型
- suffix = script_file.suffix.lower()
- script_type_map = {
- ".py": "python",
- ".js": "javascript",
- ".ts": "typescript",
- ".sql": "sql",
- ".sh": "shell",
- }
- script_type = script_type_map.get(suffix, "text")
- logger.info(
- f"成功读取脚本内容: DataFlow ID={dataflow_id}, "
- f"路径={script_path}, 类型={script_type}"
- )
- return {
- "script_path": script_path,
- "script_content": script_content,
- "script_type": script_type,
- "dataflow_id": dataflow_id,
- "dataflow_name": node_props.get("name_zh", ""),
- "dataflow_name_en": node_props.get("name_en", ""),
- }
- except (ValueError, FileNotFoundError):
- raise
- except Exception as e:
- logger.error(f"获取脚本内容失败: {str(e)}")
- raise
- @staticmethod
- def update_dataflow(
- dataflow_id: int,
- data: Dict[str, Any],
- ) -> Optional[Dict[str, Any]]:
- """
- 更新数据流
- Args:
- dataflow_id: 数据流ID
- data: 更新的数据
- Returns:
- 更新后的数据流信息,如果不存在则返回None
- """
- try:
- # 提取 tag 数组(不作为节点属性存储)
- tag_list = data.pop("tag", None)
- # 查找节点
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
- if not result:
- return None
- # 更新节点属性
- update_fields = []
- params: Dict[str, Any] = {"dataflow_id": dataflow_id}
- for key, value in data.items():
- if key not in ["id", "created_at"]: # 保护字段
- # 复杂对象序列化为 JSON 字符串
- if key in ["config", "script_requirement"] and isinstance(
- value, dict
- ):
- value = json.dumps(value, ensure_ascii=False)
- update_fields.append(f"n.{key} = ${key}")
- params[key] = value
- if update_fields:
- params["updated_at"] = get_formatted_time()
- update_fields.append("n.updated_at = $updated_at")
- update_query = f"""
- MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
- SET {", ".join(update_fields)}
- RETURN n, id(n) as node_id
- """
- result = session.run(update_query, params).data()
- # 处理 tag 关系(支持多标签数组)
- if tag_list is not None:
- # 确保是列表格式
- if not isinstance(tag_list, list):
- tag_list = [tag_list] if tag_list else []
- # 先删除现有的 LABEL 关系
- delete_query = """
- MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
- WHERE id(n) = $dataflow_id
- DELETE r
- """
- session.run(delete_query, dataflow_id=dataflow_id)
- logger.info(f"删除数据流 {dataflow_id} 的现有标签关系")
- # 为每个 tag 创建新的 LABEL 关系
- for tag_item in tag_list:
- tag_id = None
- if isinstance(tag_item, dict) and "id" in tag_item:
- tag_id = int(tag_item["id"])
- elif isinstance(tag_item, (int, str)):
- with contextlib.suppress(ValueError, TypeError):
- tag_id = int(tag_item)
- if tag_id:
- DataFlowService._handle_single_tag_relationship(
- dataflow_id, tag_id
- )
- if result:
- node = result[0]["n"]
- updated_dataflow = dict(node)
- # 使用查询返回的node_id
- updated_dataflow["id"] = result[0]["node_id"]
- # 查询并添加标签数组到返回数据
- tags_query = """
- MATCH (n:DataFlow)
- WHERE id(n) = $dataflow_id
- OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
- RETURN collect({
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }) as tags
- """
- tags_result = session.run(
- tags_query, dataflow_id=dataflow_id
- ).single()
- if tags_result:
- tags = tags_result.get("tags", [])
- updated_dataflow["tag"] = [
- tag for tag in tags if tag.get("id") is not None
- ]
- else:
- updated_dataflow["tag"] = []
- logger.info(f"更新数据流成功: ID={dataflow_id}")
- return updated_dataflow
- return None
- except Exception as e:
- logger.error(f"更新数据流失败: {str(e)}")
- raise e
- @staticmethod
- def delete_dataflow(dataflow_id: int) -> bool:
- """
- 删除数据流
- Args:
- dataflow_id: 数据流ID
- Returns:
- 删除是否成功
- """
- try:
- # 删除节点及其关系
- query = """
- MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
- DETACH DELETE n
- RETURN count(n) as deleted_count
- """
- with connect_graph().session() as session:
- delete_result = session.run(query, dataflow_id=dataflow_id).single()
- result = delete_result["deleted_count"] if delete_result else 0
- if result and result > 0:
- logger.info(f"删除数据流成功: ID={dataflow_id}")
- return True
- return False
- except Exception as e:
- logger.error(f"删除数据流失败: {str(e)}")
- raise e
- @staticmethod
- def execute_dataflow(
- dataflow_id: int,
- params: Optional[Dict[str, Any]] = None,
- ) -> Dict[str, Any]:
- """
- 执行数据流
- Args:
- dataflow_id: 数据流ID
- params: 执行参数
- Returns:
- 执行结果信息
- """
- try:
- # 检查数据流是否存在
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
- execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
- # TODO: 这里应该实际执行数据流
- # 目前返回模拟结果
- result = {
- "execution_id": execution_id,
- "dataflow_id": dataflow_id,
- "status": "running",
- "started_at": datetime.now().isoformat(),
- "params": params or {},
- "progress": 0,
- }
- logger.info(
- "开始执行数据流: ID=%s, execution_id=%s",
- dataflow_id,
- execution_id,
- )
- return result
- except Exception as e:
- logger.error(f"执行数据流失败: {str(e)}")
- raise e
- @staticmethod
- def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
- """
- 获取数据流执行状态
- Args:
- dataflow_id: 数据流ID
- Returns:
- 执行状态信息
- """
- try:
- # TODO: 这里应该查询实际的执行状态
- # 目前返回模拟状态
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
- status = ["running", "completed", "failed", "pending"][dataflow_id % 4]
- return {
- "dataflow_id": dataflow_id,
- "status": status,
- "progress": (
- 100 if status == "completed" else (dataflow_id * 10) % 100
- ),
- "started_at": datetime.now().isoformat(),
- "completed_at": (
- datetime.now().isoformat() if status == "completed" else None
- ),
- "error_message": ("执行过程中发生错误" if status == "failed" else None),
- }
- except Exception as e:
- logger.error(f"获取数据流状态失败: {str(e)}")
- raise e
- @staticmethod
- def get_dataflow_logs(
- dataflow_id: int,
- page: int = 1,
- page_size: int = 50,
- ) -> Dict[str, Any]:
- """
- 获取数据流执行日志
- Args:
- dataflow_id: 数据流ID
- page: 页码
- page_size: 每页大小
- Returns:
- 执行日志列表和分页信息
- """
- try:
- # TODO: 这里应该查询实际的执行日志
- # 目前返回模拟日志
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
- mock_logs = [
- {
- "id": i,
- "timestamp": datetime.now().isoformat(),
- "level": ["INFO", "WARNING", "ERROR"][i % 3],
- "message": f"数据流执行日志消息 {i}",
- "component": ["source", "transform", "target"][i % 3],
- }
- for i in range(1, 101)
- ]
- # 分页处理
- total = len(mock_logs)
- start = (page - 1) * page_size
- end = start + page_size
- logs = mock_logs[start:end]
- return {
- "logs": logs,
- "pagination": {
- "page": page,
- "page_size": page_size,
- "total": total,
- "total_pages": (total + page_size - 1) // page_size,
- },
- }
- except Exception as e:
- logger.error(f"获取数据流日志失败: {str(e)}")
- raise e
- @staticmethod
- def _generate_businessdomain_ddl(
- session,
- bd_id: int,
- is_target: bool = False,
- update_mode: str = "append",
- ) -> Optional[Dict[str, Any]]:
- """
- 根据BusinessDomain节点ID生成DDL
- Args:
- session: Neo4j session对象
- bd_id: BusinessDomain节点ID
- is_target: 是否为目标表(目标表需要添加create_time字段)
- update_mode: 更新模式(append或full)
- Returns:
- 包含ddl和data_source信息的字典,如果节点不存在则返回None
- """
- try:
- # 查询BusinessDomain节点、元数据、标签关系和数据源关系
- cypher = """
- MATCH (bd:BusinessDomain)
- WHERE id(bd) = $bd_id
- OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
- OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
- OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
- RETURN bd,
- collect(DISTINCT m) as metadata,
- collect(DISTINCT {
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }) as labels,
- ds.type as ds_type,
- ds.host as ds_host,
- ds.port as ds_port,
- ds.database as ds_database
- """
- result = session.run(cypher, bd_id=bd_id).single()
- if not result or not result["bd"]:
- logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
- return None
- node = result["bd"]
- metadata = result["metadata"]
- # 处理标签数组,获取第一个有效标签名称用于判断
- labels = result.get("labels", [])
- valid_labels = [label for label in labels if label.get("id") is not None]
- label_name = valid_labels[0].get("name_zh") if valid_labels else None
- # 生成DDL
- node_props = dict(node)
- table_name = node_props.get("name_en", f"table_{bd_id}")
- ddl_lines = []
- ddl_lines.append(f"CREATE TABLE {table_name} (")
- column_definitions = []
- # 添加元数据列
- if metadata:
- for meta in metadata:
- if meta:
- meta_props = dict(meta)
- column_name = meta_props.get(
- "name_en",
- meta_props.get("name_zh", "unknown_column"),
- )
- data_type = meta_props.get("data_type", "VARCHAR(255)")
- comment = meta_props.get("name_zh", "")
- column_def = f" {column_name} {data_type}"
- if comment:
- column_def += f" COMMENT '{comment}'"
- column_definitions.append(column_def)
- # 如果没有元数据,添加默认主键
- if not column_definitions:
- column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
- # 5. 如果是目标表,添加create_time字段
- if is_target:
- column_definitions.append(
- " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
- "COMMENT '数据创建时间'"
- )
- ddl_lines.append(",\n".join(column_definitions))
- ddl_lines.append(");")
- # 添加表注释
- table_comment = node_props.get(
- "name_zh", node_props.get("describe", table_name)
- )
- if table_comment and table_comment != table_name:
- ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
- ddl_content = "\n".join(ddl_lines)
- # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
- data_source = None
- if label_name == "数据资源" and result["ds_type"]:
- data_source = {
- "type": result["ds_type"],
- "host": result["ds_host"],
- "port": result["ds_port"],
- "database": result["ds_database"],
- }
- logger.info(f"获取到数据源信息: {data_source}")
- logger.debug(
- f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}"
- )
- return {
- "ddl": ddl_content,
- "table_name": table_name,
- "data_source": data_source,
- }
- except Exception as e:
- logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
- return None
- @staticmethod
- def _handle_script_relationships(
- data: Dict[str, Any],
- dataflow_name: str,
- name_en: str,
- ):
- """
- 处理脚本关系,在Neo4j图数据库中创建从source BusinessDomain到DataFlow的
- INPUT关系,以及从DataFlow到target BusinessDomain的OUTPUT关系。
- 关系模型:
- - (source:BusinessDomain)-[:INPUT]->(dataflow:DataFlow)
- - (dataflow:DataFlow)-[:OUTPUT]->(target:BusinessDomain)
- Args:
- data: 包含脚本信息的数据字典,应包含script_name, script_type,
- schedule_status, source_table, target_table, update_mode
- """
- try:
- # 从data中读取键值对
- source_table_full = data.get("source_table", "")
- target_table_full = data.get("target_table", "")
- # 处理source_table和target_table的格式
- # 格式: "label:name" 或 直接 "name"
- source_table = (
- source_table_full.split(":")[-1]
- if ":" in source_table_full
- else source_table_full
- )
- target_table = (
- target_table_full.split(":")[-1]
- if ":" in target_table_full
- else target_table_full
- )
- source_label = (
- source_table_full.split(":")[0]
- if ":" in source_table_full
- else "BusinessDomain"
- )
- target_label = (
- target_table_full.split(":")[0]
- if ":" in target_table_full
- else "BusinessDomain"
- )
- # 验证必要字段
- if not source_table or not target_table:
- logger.warning(
- "source_table或target_table为空,跳过关系创建: "
- "source_table=%s, target_table=%s",
- source_table,
- target_table,
- )
- return
- logger.info(
- "开始创建INPUT/OUTPUT关系: %s -[INPUT]-> %s -[OUTPUT]-> %s",
- source_table,
- dataflow_name,
- target_table,
- )
- with connect_graph().session() as session:
- # 步骤1:获取DataFlow节点ID
- dataflow_query = """
- MATCH (df:DataFlow {name_zh: $dataflow_name})
- RETURN id(df) as dataflow_id
- """
- df_result = session.run(
- dataflow_query, # type: ignore[arg-type]
- {"dataflow_name": dataflow_name},
- ).single()
- if not df_result:
- logger.error(f"未找到DataFlow节点: {dataflow_name}")
- return
- dataflow_id = df_result["dataflow_id"]
- # 步骤2:获取或创建source节点
- # 优先通过name_en匹配,其次通过name匹配
- source_query = f"""
- MATCH (source:{source_label})
- WHERE source.name_en = $source_table OR source.name = $source_table
- RETURN id(source) as source_id
- LIMIT 1
- """
- source_result = session.run(
- source_query, # type: ignore[arg-type]
- {"source_table": source_table},
- ).single()
- if not source_result:
- logger.warning(
- "未找到source节点: %s,将创建新节点",
- source_table,
- )
- # 创建source节点
- create_source_query = f"""
- CREATE (source:{source_label} {{
- name: $source_table,
- name_en: $source_table,
- created_at: $created_at,
- type: 'source'
- }})
- RETURN id(source) as source_id
- """
- source_result = session.run(
- create_source_query, # type: ignore[arg-type]
- {
- "source_table": source_table,
- "created_at": get_formatted_time(),
- },
- ).single()
- source_id = source_result["source_id"] if source_result else None
- # 步骤3:获取或创建target节点
- target_query = f"""
- MATCH (target:{target_label})
- WHERE target.name_en = $target_table OR target.name = $target_table
- RETURN id(target) as target_id
- LIMIT 1
- """
- target_result = session.run(
- target_query, # type: ignore[arg-type]
- {"target_table": target_table},
- ).single()
- if not target_result:
- logger.warning(
- "未找到target节点: %s,将创建新节点",
- target_table,
- )
- # 创建target节点
- create_target_query = f"""
- CREATE (target:{target_label} {{
- name: $target_table,
- name_en: $target_table,
- created_at: $created_at,
- type: 'target'
- }})
- RETURN id(target) as target_id
- """
- target_result = session.run(
- create_target_query, # type: ignore[arg-type]
- {
- "target_table": target_table,
- "created_at": get_formatted_time(),
- },
- ).single()
- target_id = target_result["target_id"] if target_result else None
- if not source_id or not target_id:
- logger.error(
- "无法获取source或target节点ID: source_id=%s, target_id=%s",
- source_id,
- target_id,
- )
- return
- # 步骤4:创建 INPUT 关系 (source)-[:INPUT]->(dataflow)
- create_input_query = """
- MATCH (source), (dataflow:DataFlow)
- WHERE id(source) = $source_id AND id(dataflow) = $dataflow_id
- MERGE (source)-[r:INPUT]->(dataflow)
- ON CREATE SET r.created_at = $created_at
- ON MATCH SET r.updated_at = $created_at
- RETURN r
- """
- input_result = session.run(
- create_input_query, # type: ignore[arg-type]
- {
- "source_id": source_id,
- "dataflow_id": dataflow_id,
- "created_at": get_formatted_time(),
- },
- ).single()
- if input_result:
- logger.info(
- "成功创建INPUT关系: %s -> %s",
- source_table,
- dataflow_name,
- )
- else:
- logger.warning(
- "INPUT关系创建失败或已存在: %s -> %s",
- source_table,
- dataflow_name,
- )
- # 步骤5:创建 OUTPUT 关系 (dataflow)-[:OUTPUT]->(target)
- create_output_query = """
- MATCH (dataflow:DataFlow), (target)
- WHERE id(dataflow) = $dataflow_id AND id(target) = $target_id
- MERGE (dataflow)-[r:OUTPUT]->(target)
- ON CREATE SET r.created_at = $created_at
- ON MATCH SET r.updated_at = $created_at
- RETURN r
- """
- output_result = session.run(
- create_output_query, # type: ignore[arg-type]
- {
- "dataflow_id": dataflow_id,
- "target_id": target_id,
- "created_at": get_formatted_time(),
- },
- ).single()
- if output_result:
- logger.info(
- "成功创建OUTPUT关系: %s -> %s",
- dataflow_name,
- target_table,
- )
- else:
- logger.warning(
- "OUTPUT关系创建失败或已存在: %s -> %s",
- dataflow_name,
- target_table,
- )
- logger.info(
- "血缘关系创建完成: %s -[INPUT]-> %s -[OUTPUT]-> %s",
- source_table,
- dataflow_name,
- target_table,
- )
- except Exception as e:
- logger.error(f"处理脚本关系失败: {str(e)}")
- raise e
- @staticmethod
- def get_business_domain_list() -> List[Dict[str, Any]]:
- """
- 获取BusinessDomain节点列表
- Returns:
- BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
- """
- try:
- logger.info("开始查询BusinessDomain节点列表")
- with connect_graph().session() as session:
- # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
- query = """
- MATCH (bd:BusinessDomain)
- OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
- RETURN id(bd) as id,
- bd.name_zh as name_zh,
- bd.name_en as name_en,
- bd.create_time as create_time,
- collect({
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }) as tags
- ORDER BY create_time DESC
- """
- result = session.run(query)
- bd_list = []
- for record in result:
- # 处理标签数组,过滤掉空标签
- tags = record.get("tags", [])
- tag_list = [tag for tag in tags if tag.get("id") is not None]
- bd_item = {
- "id": record["id"],
- "name_zh": record.get("name_zh", "") or "",
- "name_en": record.get("name_en", "") or "",
- "tag": tag_list,
- }
- bd_list.append(bd_item)
- logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
- return bd_list
- except Exception as e:
- logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
- raise e
- @staticmethod
- def _register_data_product(
- data: Dict[str, Any],
- dataflow_name: str,
- name_en: str,
- dataflow_id: Optional[int] = None,
- ) -> None:
- """
- 注册数据产品到数据服务
- 当数据流创建成功后,自动将其注册为数据产品,
- 以便在数据服务模块中展示和管理。
- 从 script_requirement.target_table 中获取 BusinessDomain ID,
- 然后查询 Neo4j 获取对应节点的 name_zh 和 name_en 作为数据产品名称。
- Args:
- data: 数据流配置数据
- dataflow_name: 数据流名称(中文)
- name_en: 数据流英文名
- dataflow_id: 数据流ID(Neo4j节点ID)
- """
- try:
- # 从script_requirement中获取target_table(BusinessDomain ID列表)
- script_requirement = data.get("script_requirement")
- description = data.get("describe", "")
- # 解析 script_requirement
- req_json: Optional[Dict[str, Any]] = None
- if script_requirement:
- if isinstance(script_requirement, dict):
- req_json = script_requirement
- elif isinstance(script_requirement, str):
- try:
- parsed = json.loads(script_requirement)
- if isinstance(parsed, dict):
- req_json = parsed
- except (json.JSONDecodeError, TypeError):
- pass
- # 获取target_table中的BusinessDomain ID列表
- target_bd_ids: List[int] = []
- if req_json:
- target_table_ids = req_json.get("target_table", [])
- if isinstance(target_table_ids, list):
- target_bd_ids = [
- int(bid) for bid in target_table_ids if bid is not None
- ]
- elif target_table_ids is not None:
- target_bd_ids = [int(target_table_ids)]
- # 如果有rule字段,添加到描述中
- rule = req_json.get("rule", "")
- if rule and not description:
- description = rule
- # 如果没有target_table ID,则不注册数据产品
- if not target_bd_ids:
- logger.warning(
- f"数据流 {dataflow_name} 没有指定target_table,跳过数据产品注册"
- )
- return
- # 从Neo4j查询每个BusinessDomain节点的name_zh和name_en
- with connect_graph().session() as session:
- for bd_id in target_bd_ids:
- try:
- query = """
- MATCH (bd:BusinessDomain)
- WHERE id(bd) = $bd_id
- RETURN bd.name_zh as name_zh,
- bd.name_en as name_en,
- bd.describe as describe
- """
- result = session.run(query, bd_id=bd_id).single()
- if not result:
- logger.warning(
- f"未找到ID为 {bd_id} 的BusinessDomain节点,跳过"
- )
- continue
- # 使用BusinessDomain节点的name_zh和name_en
- product_name = result.get("name_zh") or ""
- product_name_en = result.get("name_en") or ""
- # 如果没有name_zh,使用name_en
- if not product_name:
- product_name = product_name_en
- # 如果没有name_en,使用name_zh转换
- if not product_name_en:
- product_name_en = product_name.lower().replace(" ", "_")
- # 目标表名使用BusinessDomain的name_en
- target_table = product_name_en
- # 如果BusinessDomain有describe且当前description为空,使用它
- bd_describe = result.get("describe") or ""
- if bd_describe and not description:
- description = bd_describe
- # 解析目标schema(默认为public)
- target_schema = "public"
- # 调用数据产品服务进行注册
- DataProductService.register_data_product(
- product_name=product_name,
- product_name_en=product_name_en,
- target_table=target_table,
- target_schema=target_schema,
- description=description,
- source_dataflow_id=dataflow_id,
- source_dataflow_name=dataflow_name,
- created_by=data.get("created_by", "dataflow"),
- )
- logger.info(
- f"数据产品注册成功: {product_name} -> "
- f"{target_schema}.{target_table}"
- )
- except Exception as bd_error:
- logger.error(
- f"处理BusinessDomain {bd_id} 失败: {str(bd_error)}"
- )
- # 继续处理下一个
- except Exception as e:
- logger.error(f"注册数据产品失败: {str(e)}")
- raise
|