| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783 |
- import json
- import logging
- from datetime import datetime
- from typing import Any, Dict, List, Optional, Union
- from sqlalchemy import text
- from app import db
- from app.core.data_service.data_product_service import DataProductService
- from app.core.graph.graph_operations import (
- connect_graph,
- create_or_get_node,
- get_node,
- relationship_exists,
- )
- from app.core.llm.llm_service import llm_sql
- from app.core.meta_data import get_formatted_time, translate_and_parse
- logger = logging.getLogger(__name__)
- class DataFlowService:
- """数据流服务类,处理数据流相关的业务逻辑"""
- @staticmethod
- def get_dataflows(
- page: int = 1,
- page_size: int = 10,
- search: str = "",
- ) -> Dict[str, Any]:
- """
- 获取数据流列表
- Args:
- page: 页码
- page_size: 每页大小
- search: 搜索关键词
- Returns:
- 包含数据流列表和分页信息的字典
- """
- try:
- # 从图数据库查询数据流列表
- skip_count = (page - 1) * page_size
- # 构建搜索条件
- where_clause = ""
- params: Dict[str, Union[int, str]] = {
- "skip": skip_count,
- "limit": page_size,
- }
- if search:
- where_clause = (
- "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
- )
- params["search"] = search
- # 查询数据流列表(包含标签数组)
- # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
- query = f"""
- MATCH (n:DataFlow)
- {where_clause}
- WITH n
- ORDER BY n.created_at DESC
- SKIP $skip
- LIMIT $limit
- OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
- RETURN n, id(n) as node_id,
- n.created_at as created_at,
- collect({{
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }}) as tags
- """
- # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
- try:
- with connect_graph().session() as session:
- list_result = session.run(query, params).data()
- # 查询总数
- count_query = f"""
- MATCH (n:DataFlow)
- {where_clause}
- RETURN count(n) as total
- """
- count_params = {"search": search} if search else {}
- count_result = session.run(count_query, count_params).single()
- total = count_result["total"] if count_result else 0
- except Exception as e:
- # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭
- # driver,因为connect_graph可能返回单例或新实例。如果是新实例,
- # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭,
- # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。
- # 用户反馈:The driver.close() call prematurely closes a shared
- # driver instance,所以直接使用 session,并不关闭 driver。
- logger.error(f"查询数据流失败: {str(e)}")
- raise e
- # 格式化结果
- dataflows = []
- for record in list_result:
- node = record["n"]
- dataflow = dict(node)
- dataflow["id"] = record["node_id"] # 使用查询返回的node_id
- # 处理标签数组,过滤掉空标签
- tags = record.get("tags", [])
- dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
- dataflows.append(dataflow)
- return {
- "list": dataflows,
- "pagination": {
- "page": page,
- "page_size": page_size,
- "total": total,
- "total_pages": (total + page_size - 1) // page_size,
- },
- }
- except Exception as e:
- logger.error(f"获取数据流列表失败: {str(e)}")
- raise e
- @staticmethod
- def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
- """
- 根据ID获取数据流详情
- Args:
- dataflow_id: 数据流ID
- Returns:
- 数据流详情字典,如果不存在则返回None
- """
- try:
- # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
- neo4j_query = """
- MATCH (n:DataFlow)
- WHERE id(n) = $dataflow_id
- OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
- RETURN n, id(n) as node_id,
- collect({
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }) as tags
- """
- with connect_graph().session() as session:
- neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
- if not neo4j_result:
- logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
- return None
- record = neo4j_result[0]
- node = record["n"]
- # 将节点属性转换为字典
- dataflow = dict(node)
- dataflow["id"] = record["node_id"]
- # 处理标签数组,过滤掉空标签
- tags = record.get("tags", [])
- dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
- # 处理 script_requirement:如果是JSON字符串,解析为对象
- script_requirement_str = dataflow.get("script_requirement", "")
- if script_requirement_str:
- try:
- # 尝试解析JSON字符串
- script_requirement_obj = json.loads(script_requirement_str)
- dataflow["script_requirement"] = script_requirement_obj
- logger.debug(
- "成功解析script_requirement: %s",
- script_requirement_obj,
- )
- except (json.JSONDecodeError, TypeError) as e:
- logger.warning(f"script_requirement解析失败,保持原值: {e}")
- # 保持原值(字符串)
- dataflow["script_requirement"] = script_requirement_str
- else:
- # 如果为空,设置为None
- dataflow["script_requirement"] = None
- logger.info(
- "成功获取DataFlow详情,ID: %s, 名称: %s",
- dataflow_id,
- dataflow.get("name_zh"),
- )
- return dataflow
- except Exception as e:
- logger.error(f"获取数据流详情失败: {str(e)}")
- raise e
- @staticmethod
- def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
- """
- 创建新的数据流
- Args:
- data: 数据流配置数据
- Returns:
- 创建的数据流信息
- """
- try:
- # 验证必填字段
- required_fields = ["name_zh", "describe"]
- for field in required_fields:
- if field not in data:
- raise ValueError(f"缺少必填字段: {field}")
- dataflow_name = data["name_zh"]
- # 使用LLM翻译名称生成英文名
- try:
- result_list = translate_and_parse(dataflow_name)
- name_en = (
- result_list[0]
- if result_list
- else dataflow_name.lower().replace(" ", "_")
- )
- except Exception as e:
- logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
- name_en = dataflow_name.lower().replace(" ", "_")
- # 处理 script_requirement,将其转换为 JSON 字符串
- script_requirement = data.get("script_requirement")
- if script_requirement is not None:
- # 如果是字典或列表,转换为 JSON 字符串
- if isinstance(script_requirement, (dict, list)):
- script_requirement_str = json.dumps(
- script_requirement, ensure_ascii=False
- )
- else:
- # 如果已经是字符串,直接使用
- script_requirement_str = str(script_requirement)
- else:
- script_requirement_str = ""
- # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
- node_data = {
- "name_zh": dataflow_name,
- "name_en": name_en,
- "category": data.get("category", ""),
- "organization": data.get("organization", ""),
- "leader": data.get("leader", ""),
- "frequency": data.get("frequency", ""),
- "describe": data.get("describe", ""),
- "status": data.get("status", "inactive"),
- "update_mode": data.get("update_mode", "append"),
- "script_type": data.get("script_type", "python"),
- "script_requirement": script_requirement_str,
- "created_at": get_formatted_time(),
- "updated_at": get_formatted_time(),
- }
- # 创建或获取数据流节点
- dataflow_id = get_node("DataFlow", name=dataflow_name)
- if dataflow_id:
- raise ValueError(f"数据流 '{dataflow_name}' 已存在")
- dataflow_id = create_or_get_node("DataFlow", **node_data)
- # 处理标签关系(支持多标签数组)
- tag_list = data.get("tag", [])
- if tag_list:
- try:
- DataFlowService._handle_tag_relationships(dataflow_id, tag_list)
- except Exception as e:
- logger.warning(f"处理标签关系时出错: {str(e)}")
- # 成功创建图数据库节点后,写入PG数据库
- try:
- DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
- logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
- # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
- try:
- DataFlowService._handle_script_relationships(
- data, dataflow_name, name_en
- )
- logger.info(f"脚本关系创建成功: {dataflow_name}")
- except Exception as script_error:
- logger.warning(f"创建脚本关系失败: {str(script_error)}")
- except Exception as pg_error:
- logger.error(f"写入PG数据库失败: {str(pg_error)}")
- # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
- # 在实际应用中,可能需要考虑分布式事务
- # 返回创建的数据流信息
- # 查询创建的节点获取完整信息
- query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
- with connect_graph().session() as session:
- id_result = session.run(query, name_zh=dataflow_name).single()
- if id_result:
- dataflow_node = id_result["n"]
- node_id = id_result["node_id"]
- # 将节点属性转换为字典
- result = dict(dataflow_node)
- result["id"] = node_id
- else:
- # 如果查询失败,返回基本信息
- result = {
- "id": (dataflow_id if isinstance(dataflow_id, int) else None),
- "name_zh": dataflow_name,
- "name_en": name_en,
- "created_at": get_formatted_time(),
- }
- # 注册数据产品到数据服务
- try:
- DataFlowService._register_data_product(
- data=data,
- dataflow_name=dataflow_name,
- name_en=name_en,
- dataflow_id=result.get("id"),
- )
- logger.info(f"数据产品注册成功: {dataflow_name}")
- except Exception as product_error:
- logger.warning(f"注册数据产品失败: {str(product_error)}")
- # 不影响主流程,仅记录警告
- logger.info(f"创建数据流成功: {dataflow_name}")
- return result
- except Exception as e:
- logger.error(f"创建数据流失败: {str(e)}")
- raise e
- @staticmethod
- def _save_to_pg_database(
- data: Dict[str, Any],
- script_name: str,
- name_en: str,
- ):
- """
- 将脚本信息保存到PG数据库
- Args:
- data: 包含脚本信息的数据
- script_name: 脚本名称
- name_en: 英文名称
- """
- try:
- # 提取脚本相关信息
- # 处理 script_requirement,确保保存为 JSON 字符串
- script_requirement_raw = data.get("script_requirement")
- # 用于保存从 script_requirement 中提取的 rule
- rule_from_requirement = ""
- if script_requirement_raw is not None:
- # 如果是字典,提取 rule 字段
- if isinstance(script_requirement_raw, dict):
- rule_from_requirement = script_requirement_raw.get("rule", "")
- script_requirement = json.dumps(
- script_requirement_raw, ensure_ascii=False
- )
- elif isinstance(script_requirement_raw, list):
- script_requirement = json.dumps(
- script_requirement_raw, ensure_ascii=False
- )
- else:
- # 如果已经是字符串,尝试解析以提取 rule
- script_requirement = str(script_requirement_raw)
- try:
- parsed_req = json.loads(script_requirement)
- if isinstance(parsed_req, dict):
- rule_from_requirement = parsed_req.get("rule", "")
- except (json.JSONDecodeError, TypeError):
- pass
- else:
- script_requirement = ""
- # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
- script_content = data.get("script_content", "")
- if not script_content and rule_from_requirement:
- script_content = rule_from_requirement
- logger.info(
- "script_content为空,使用从script_requirement提取的rule: %s",
- rule_from_requirement,
- )
- # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误)
- source_table_raw = data.get("source_table") or ""
- source_table = (
- source_table_raw.split(":")[-1]
- if ":" in source_table_raw
- else source_table_raw
- )
- target_table_raw = data.get("target_table") or ""
- target_table = (
- target_table_raw.split(":")[-1]
- if ":" in target_table_raw
- else (target_table_raw or name_en)
- )
- script_type = data.get("script_type", "python")
- user_name = data.get("created_by", "system")
- target_dt_column = data.get("target_dt_column", "")
- # 验证必需字段
- if not target_table:
- target_table = name_en
- if not script_name:
- raise ValueError("script_name不能为空")
- # 构建插入SQL
- insert_sql = text(
- """
- INSERT INTO dags.data_transform_scripts
- (source_table, target_table, script_name, script_type,
- script_requirement, script_content, user_name, create_time,
- update_time, target_dt_column)
- VALUES
- (:source_table, :target_table, :script_name, :script_type,
- :script_requirement, :script_content, :user_name,
- :create_time, :update_time, :target_dt_column)
- ON CONFLICT (target_table, script_name)
- DO UPDATE SET
- source_table = EXCLUDED.source_table,
- script_type = EXCLUDED.script_type,
- script_requirement = EXCLUDED.script_requirement,
- script_content = EXCLUDED.script_content,
- user_name = EXCLUDED.user_name,
- update_time = EXCLUDED.update_time,
- target_dt_column = EXCLUDED.target_dt_column
- """
- )
- # 准备参数
- current_time = datetime.now()
- params = {
- "source_table": source_table,
- "target_table": target_table,
- "script_name": script_name,
- "script_type": script_type,
- "script_requirement": script_requirement,
- "script_content": script_content,
- "user_name": user_name,
- "create_time": current_time,
- "update_time": current_time,
- "target_dt_column": target_dt_column,
- }
- # 执行插入操作
- db.session.execute(insert_sql, params)
- # 新增:保存到task_list表
- try:
- # 1. 解析script_requirement并构建详细的任务描述
- task_description_md = script_requirement
- try:
- # 尝试解析JSON
- try:
- req_json = json.loads(script_requirement)
- except (json.JSONDecodeError, TypeError):
- req_json = None
- if isinstance(req_json, dict):
- # 1. 从script_requirement中提取rule字段作为request_content_str
- request_content_str = req_json.get("rule", "")
- # 2. 从script_requirement中提取source_table和
- # target_table字段信息
- source_table_ids = req_json.get("source_table", [])
- target_table_ids = req_json.get("target_table", [])
- # 确保是列表格式
- if not isinstance(source_table_ids, list):
- source_table_ids = (
- [source_table_ids] if source_table_ids else []
- )
- if not isinstance(target_table_ids, list):
- target_table_ids = (
- [target_table_ids] if target_table_ids else []
- )
- # 合并所有BusinessDomain ID
- all_bd_ids = source_table_ids + target_table_ids
- # 4. 从data参数中提取update_mode
- update_mode = data.get("update_mode", "append")
- # 生成Business Domain DDLs
- source_ddls = []
- target_ddls = []
- data_source_info = None
- if all_bd_ids:
- try:
- with connect_graph().session() as session:
- # 处理source tables
- for bd_id in source_table_ids:
- ddl_info = DataFlowService._generate_businessdomain_ddl(
- session,
- bd_id,
- is_target=False,
- )
- if ddl_info:
- source_ddls.append(ddl_info["ddl"])
- # 3. 如果BELONGS_TO关系连接的是
- # "数据资源",获取数据源信息
- if (
- ddl_info.get("data_source")
- and not data_source_info
- ):
- data_source_info = ddl_info[
- "data_source"
- ]
- # 处理target tables(5. 目标表缺省要有create_time字段)
- for bd_id in target_table_ids:
- ddl_info = DataFlowService._generate_businessdomain_ddl(
- session,
- bd_id,
- is_target=True,
- update_mode=update_mode,
- )
- if ddl_info:
- target_ddls.append(ddl_info["ddl"])
- # 同样检查BELONGS_TO关系,获取数据源信息
- if (
- ddl_info.get("data_source")
- and not data_source_info
- ):
- data_source_info = ddl_info[
- "data_source"
- ]
- except Exception as neo_e:
- logger.error(
- f"获取BusinessDomain DDL失败: {str(neo_e)}"
- )
- # 构建Markdown格式的任务描述
- task_desc_parts = [f"# Task: {script_name}\n"]
- # 添加数据源信息
- if data_source_info:
- task_desc_parts.append("## Data Source")
- task_desc_parts.append(
- f"- **Type**: {data_source_info.get('type', 'N/A')}"
- )
- task_desc_parts.append(
- f"- **Host**: {data_source_info.get('host', 'N/A')}"
- )
- task_desc_parts.append(
- f"- **Port**: {data_source_info.get('port', 'N/A')}"
- )
- task_desc_parts.append(
- f"- **Database**: "
- f"{data_source_info.get('database', 'N/A')}\n"
- )
- # 添加源表DDL
- if source_ddls:
- task_desc_parts.append("## Source Tables (DDL)")
- for ddl in source_ddls:
- task_desc_parts.append(f"```sql\n{ddl}\n```\n")
- # 添加目标表DDL
- if target_ddls:
- task_desc_parts.append("## Target Tables (DDL)")
- for ddl in target_ddls:
- task_desc_parts.append(f"```sql\n{ddl}\n```\n")
- # 添加更新模式说明
- task_desc_parts.append("## Update Mode")
- if update_mode == "append":
- task_desc_parts.append("- **Mode**: Append (追加模式)")
- task_desc_parts.append(
- "- **Description**: 新数据将追加到目标表,不删除现有数据\n"
- )
- else:
- task_desc_parts.append(
- "- **Mode**: Full Refresh (全量更新)"
- )
- task_desc_parts.append(
- "- **Description**: 目标表将被清空后重新写入数据\n"
- )
- # 添加请求内容(rule)
- if request_content_str:
- task_desc_parts.append("## Request Content")
- task_desc_parts.append(f"{request_content_str}\n")
- # 添加实施步骤(根据任务类型优化)
- task_desc_parts.append("## Implementation Steps")
- # 判断是否为远程数据源导入任务
- if data_source_info:
- # 从远程数据源导入数据的简化步骤
- task_desc_parts.append(
- "1. Create an n8n workflow to execute the "
- "data import task"
- )
- task_desc_parts.append(
- "2. Configure the workflow to call "
- "`import_resource_data.py` Python script"
- )
- task_desc_parts.append(
- "3. Pass the following parameters to the "
- "Python execution node:"
- )
- task_desc_parts.append(
- " - `--source-config`: JSON configuration "
- "for the remote data source"
- )
- task_desc_parts.append(
- " - `--target-table`: Target table name "
- "(data resource English name)"
- )
- task_desc_parts.append(
- f" - `--update-mode`: {update_mode}"
- )
- task_desc_parts.append(
- "4. The Python script will automatically:"
- )
- task_desc_parts.append(
- " - Connect to the remote data source"
- )
- task_desc_parts.append(
- " - Extract data from the source table"
- )
- task_desc_parts.append(
- f" - Write data to target table using "
- f"{update_mode} mode"
- )
- else:
- # 数据转换任务的完整步骤
- task_desc_parts.append(
- "1. Extract data from source tables as "
- "specified in the DDL"
- )
- task_desc_parts.append(
- "2. Apply transformation logic according to the rule:"
- )
- if request_content_str:
- task_desc_parts.append(
- f" - Rule: {request_content_str}"
- )
- task_desc_parts.append(
- "3. Generate Python program to implement the "
- "data transformation logic"
- )
- task_desc_parts.append(
- f"4. Write transformed data to target table "
- f"using {update_mode} mode"
- )
- task_desc_parts.append(
- "5. Create an n8n workflow to schedule and "
- "execute the Python program"
- )
- task_description_md = "\n".join(task_desc_parts)
- except Exception as parse_e:
- logger.warning(
- f"解析任务描述详情失败,使用原始描述: {str(parse_e)}"
- )
- task_description_md = script_requirement
- # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
- code_path = "app/core/data_flow"
- task_insert_sql = text(
- "INSERT INTO public.task_list\n"
- "(task_name, task_description, status, code_name, "
- "code_path, create_by, create_time, update_time)\n"
- "VALUES\n"
- "(:task_name, :task_description, :status, :code_name, "
- ":code_path, :create_by, :create_time, :update_time)"
- )
- task_params = {
- "task_name": script_name,
- "task_description": task_description_md,
- "status": "pending",
- "code_name": script_name,
- "code_path": code_path,
- "create_by": "cursor",
- "create_time": current_time,
- "update_time": current_time,
- }
- # 使用嵌套事务,确保task_list插入失败不影响主流程
- with db.session.begin_nested():
- db.session.execute(task_insert_sql, task_params)
- logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
- except Exception as task_error:
- # 记录错误但不中断主流程
- logger.error(f"写入task_list表失败: {str(task_error)}")
- # 如果要求必须成功写入任务列表,则这里应该raise task_error
- # raise task_error
- db.session.commit()
- logger.info(
- "成功将脚本信息写入PG数据库: target_table=%s, script_name=%s",
- target_table,
- script_name,
- )
- except Exception as e:
- db.session.rollback()
- logger.error(f"写入PG数据库失败: {str(e)}")
- raise e
- @staticmethod
- def _handle_children_relationships(dataflow_node, children_ids):
- """处理子节点关系"""
- logger.debug(
- "处理子节点关系,原始children_ids: %s, 类型: %s",
- children_ids,
- type(children_ids),
- )
- # 确保children_ids是列表格式
- if not isinstance(children_ids, (list, tuple)):
- if children_ids is not None:
- children_ids = [children_ids] # 如果是单个值,转换为列表
- logger.debug(f"将单个值转换为列表: {children_ids}")
- else:
- children_ids = [] # 如果是None,转换为空列表
- logger.debug("将None转换为空列表")
- for child_id in children_ids:
- try:
- # 查找子节点
- query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, child_id=child_id).data()
- if result:
- # 获取dataflow_node的ID
- dataflow_id = getattr(dataflow_node, "identity", None)
- if dataflow_id is None:
- # 如果没有identity属性,从名称查询ID
- query_id = (
- "MATCH (n:DataFlow) WHERE n.name_zh = "
- "$name_zh RETURN id(n) as node_id"
- )
- id_result = session.run(
- query_id,
- name_zh=dataflow_node.get("name_zh"),
- ).single()
- dataflow_id = id_result["node_id"] if id_result else None
- # 创建关系 - 使用ID调用relationship_exists
- if dataflow_id and not relationship_exists(
- dataflow_id, "child", child_id
- ):
- session.run(
- "MATCH (a), (b) WHERE id(a) = $dataflow_id "
- "AND id(b) = $child_id "
- "CREATE (a)-[:child]->(b)",
- dataflow_id=dataflow_id,
- child_id=child_id,
- )
- logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
- except Exception as e:
- logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
- @staticmethod
- def _handle_tag_relationships(dataflow_id, tag_list):
- """
- 处理多标签关系
- Args:
- dataflow_id: 数据流节点ID
- tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
- """
- # 确保tag_list是列表格式
- if not isinstance(tag_list, list):
- tag_list = [tag_list] if tag_list else []
- for tag_item in tag_list:
- tag_id = None
- if isinstance(tag_item, dict) and "id" in tag_item:
- tag_id = int(tag_item["id"])
- elif isinstance(tag_item, (int, str)):
- try:
- tag_id = int(tag_item)
- except (ValueError, TypeError):
- pass
- if tag_id:
- DataFlowService._handle_single_tag_relationship(dataflow_id, tag_id)
- @staticmethod
- def _handle_single_tag_relationship(dataflow_id, tag_id):
- """处理单个标签关系"""
- try:
- # 查找标签节点
- query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, tag_id=tag_id).data()
- if result:
- # 创建关系 - 使用ID调用relationship_exists
- if dataflow_id and not relationship_exists(
- dataflow_id, "LABEL", tag_id
- ):
- session.run(
- "MATCH (a), (b) WHERE id(a) = $dataflow_id "
- "AND id(b) = $tag_id "
- "CREATE (a)-[:LABEL]->(b)",
- dataflow_id=dataflow_id,
- tag_id=tag_id,
- )
- logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
- except Exception as e:
- logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
- @staticmethod
- def update_dataflow(
- dataflow_id: int,
- data: Dict[str, Any],
- ) -> Optional[Dict[str, Any]]:
- """
- 更新数据流
- Args:
- dataflow_id: 数据流ID
- data: 更新的数据
- Returns:
- 更新后的数据流信息,如果不存在则返回None
- """
- try:
- # 提取 tag 数组(不作为节点属性存储)
- tag_list = data.pop("tag", None)
- # 查找节点
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
- if not result:
- return None
- # 更新节点属性
- update_fields = []
- params: Dict[str, Any] = {"dataflow_id": dataflow_id}
- for key, value in data.items():
- if key not in ["id", "created_at"]: # 保护字段
- # 复杂对象序列化为 JSON 字符串
- if key in ["config", "script_requirement"]:
- if isinstance(value, dict):
- value = json.dumps(value, ensure_ascii=False)
- update_fields.append(f"n.{key} = ${key}")
- params[key] = value
- if update_fields:
- params["updated_at"] = get_formatted_time()
- update_fields.append("n.updated_at = $updated_at")
- update_query = f"""
- MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
- SET {", ".join(update_fields)}
- RETURN n, id(n) as node_id
- """
- result = session.run(update_query, params).data()
- # 处理 tag 关系(支持多标签数组)
- if tag_list is not None:
- # 确保是列表格式
- if not isinstance(tag_list, list):
- tag_list = [tag_list] if tag_list else []
- # 先删除现有的 LABEL 关系
- delete_query = """
- MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
- WHERE id(n) = $dataflow_id
- DELETE r
- """
- session.run(delete_query, dataflow_id=dataflow_id)
- logger.info(f"删除数据流 {dataflow_id} 的现有标签关系")
- # 为每个 tag 创建新的 LABEL 关系
- for tag_item in tag_list:
- tag_id = None
- if isinstance(tag_item, dict) and "id" in tag_item:
- tag_id = int(tag_item["id"])
- elif isinstance(tag_item, (int, str)):
- try:
- tag_id = int(tag_item)
- except (ValueError, TypeError):
- pass
- if tag_id:
- DataFlowService._handle_single_tag_relationship(
- dataflow_id, tag_id
- )
- if result:
- node = result[0]["n"]
- updated_dataflow = dict(node)
- # 使用查询返回的node_id
- updated_dataflow["id"] = result[0]["node_id"]
- # 查询并添加标签数组到返回数据
- tags_query = """
- MATCH (n:DataFlow)
- WHERE id(n) = $dataflow_id
- OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
- RETURN collect({
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }) as tags
- """
- tags_result = session.run(
- tags_query, dataflow_id=dataflow_id
- ).single()
- if tags_result:
- tags = tags_result.get("tags", [])
- updated_dataflow["tag"] = [
- tag for tag in tags if tag.get("id") is not None
- ]
- else:
- updated_dataflow["tag"] = []
- logger.info(f"更新数据流成功: ID={dataflow_id}")
- return updated_dataflow
- return None
- except Exception as e:
- logger.error(f"更新数据流失败: {str(e)}")
- raise e
- @staticmethod
- def delete_dataflow(dataflow_id: int) -> bool:
- """
- 删除数据流
- Args:
- dataflow_id: 数据流ID
- Returns:
- 删除是否成功
- """
- try:
- # 删除节点及其关系
- query = """
- MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
- DETACH DELETE n
- RETURN count(n) as deleted_count
- """
- with connect_graph().session() as session:
- delete_result = session.run(query, dataflow_id=dataflow_id).single()
- result = delete_result["deleted_count"] if delete_result else 0
- if result and result > 0:
- logger.info(f"删除数据流成功: ID={dataflow_id}")
- return True
- return False
- except Exception as e:
- logger.error(f"删除数据流失败: {str(e)}")
- raise e
- @staticmethod
- def execute_dataflow(
- dataflow_id: int,
- params: Optional[Dict[str, Any]] = None,
- ) -> Dict[str, Any]:
- """
- 执行数据流
- Args:
- dataflow_id: 数据流ID
- params: 执行参数
- Returns:
- 执行结果信息
- """
- try:
- # 检查数据流是否存在
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
- execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
- # TODO: 这里应该实际执行数据流
- # 目前返回模拟结果
- result = {
- "execution_id": execution_id,
- "dataflow_id": dataflow_id,
- "status": "running",
- "started_at": datetime.now().isoformat(),
- "params": params or {},
- "progress": 0,
- }
- logger.info(
- "开始执行数据流: ID=%s, execution_id=%s",
- dataflow_id,
- execution_id,
- )
- return result
- except Exception as e:
- logger.error(f"执行数据流失败: {str(e)}")
- raise e
- @staticmethod
- def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
- """
- 获取数据流执行状态
- Args:
- dataflow_id: 数据流ID
- Returns:
- 执行状态信息
- """
- try:
- # TODO: 这里应该查询实际的执行状态
- # 目前返回模拟状态
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
- status = ["running", "completed", "failed", "pending"][dataflow_id % 4]
- return {
- "dataflow_id": dataflow_id,
- "status": status,
- "progress": (
- 100 if status == "completed" else (dataflow_id * 10) % 100
- ),
- "started_at": datetime.now().isoformat(),
- "completed_at": (
- datetime.now().isoformat() if status == "completed" else None
- ),
- "error_message": ("执行过程中发生错误" if status == "failed" else None),
- }
- except Exception as e:
- logger.error(f"获取数据流状态失败: {str(e)}")
- raise e
- @staticmethod
- def get_dataflow_logs(
- dataflow_id: int,
- page: int = 1,
- page_size: int = 50,
- ) -> Dict[str, Any]:
- """
- 获取数据流执行日志
- Args:
- dataflow_id: 数据流ID
- page: 页码
- page_size: 每页大小
- Returns:
- 执行日志列表和分页信息
- """
- try:
- # TODO: 这里应该查询实际的执行日志
- # 目前返回模拟日志
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
- mock_logs = [
- {
- "id": i,
- "timestamp": datetime.now().isoformat(),
- "level": ["INFO", "WARNING", "ERROR"][i % 3],
- "message": f"数据流执行日志消息 {i}",
- "component": ["source", "transform", "target"][i % 3],
- }
- for i in range(1, 101)
- ]
- # 分页处理
- total = len(mock_logs)
- start = (page - 1) * page_size
- end = start + page_size
- logs = mock_logs[start:end]
- return {
- "logs": logs,
- "pagination": {
- "page": page,
- "page_size": page_size,
- "total": total,
- "total_pages": (total + page_size - 1) // page_size,
- },
- }
- except Exception as e:
- logger.error(f"获取数据流日志失败: {str(e)}")
- raise e
- @staticmethod
- def create_script(request_data: Union[Dict[str, Any], str]) -> str:
- """
- 使用Deepseek模型生成SQL脚本
- Args:
- request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
- Returns:
- 生成的SQL脚本内容
- """
- try:
- logger.info(f"开始处理脚本生成请求: {request_data}")
- logger.info(f"request_data类型: {type(request_data)}")
- # 类型检查和处理
- if isinstance(request_data, str):
- logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
- try:
- import json
- request_data = json.loads(request_data)
- except json.JSONDecodeError as e:
- raise ValueError(f"无法解析request_data为JSON: {str(e)}")
- if not isinstance(request_data, dict):
- raise ValueError(
- f"request_data必须是字典类型,实际类型: {type(request_data)}"
- )
- # 1. 从传入的request_data中解析input, output, request_content内容
- input_data = request_data.get("input", "")
- output_data = request_data.get("output", "")
- request_content = request_data.get("request_data", "")
- # 如果request_content是HTML格式,提取纯文本
- if request_content and (
- request_content.startswith("<p>") or "<" in request_content
- ):
- # 简单的HTML标签清理
- import re
- request_content = re.sub(r"<[^>]+>", "", request_content).strip()
- if not input_data or not output_data or not request_content:
- raise ValueError(
- "缺少必要参数:input='{}', output='{}', "
- "request_content='{}' 不能为空".format(
- input_data,
- output_data,
- request_content[:100] if request_content else "",
- )
- )
- logger.info(
- "解析得到 - input: %s, output: %s, request_content: %s",
- input_data,
- output_data,
- request_content,
- )
- # 2. 解析input中的多个数据表并生成源表DDL
- source_tables_ddl = []
- input_tables = []
- if input_data:
- tables = [
- table.strip() for table in input_data.split(",") if table.strip()
- ]
- for table in tables:
- ddl = DataFlowService._parse_table_and_get_ddl(table, "input")
- if ddl:
- input_tables.append(table)
- source_tables_ddl.append(ddl)
- else:
- logger.warning(f"无法获取输入表 {table} 的DDL结构")
- # 3. 解析output中的数据表并生成目标表DDL
- target_table_ddl = ""
- if output_data:
- target_table_ddl = DataFlowService._parse_table_and_get_ddl(
- output_data.strip(), "output"
- )
- if not target_table_ddl:
- logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
- # 4. 按照Deepseek-prompt.txt的框架构建提示语
- prompt_parts = []
- # 开场白 - 角色定义
- prompt_parts.append(
- "你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。"
- "请为以下需求生成一段标准的 PostgreSQL SQL 脚本:"
- )
- # 动态生成源表部分(第1点)
- for i, (table, ddl) in enumerate(zip(input_tables, source_tables_ddl), 1):
- table_name = table.split(":")[-1] if ":" in table else table
- prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
- prompt_parts.append(ddl)
- prompt_parts.append("") # 添加空行分隔
- # 动态生成目标表部分(第2点)
- if target_table_ddl:
- target_table_name = (
- output_data.split(":")[-1] if ":" in output_data else output_data
- )
- next_index = len(input_tables) + 1
- prompt_parts.append(
- f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:"
- )
- prompt_parts.append(target_table_ddl)
- prompt_parts.append("") # 添加空行分隔
- # 动态生成处理逻辑部分(第3点)
- next_index = (
- len(input_tables) + 2 if target_table_ddl else len(input_tables) + 1
- )
- prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
- prompt_parts.append("") # 添加空行分隔
- # 固定的技术要求部分(第4-8点)
- tech_requirements = [
- (
- f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,"
- "适合在 Airflow、Python 脚本、或调度系统中调用;"
- ),
- f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
- f"{next_index + 3}.请直接输出SQL,无需进行解释。",
- (
- f'{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用"_"分隔,'
- "采用蛇形命名法。把sql的名字作为注释写在返回的sql中。"
- ),
- (
- f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期"
- "时间now(),不用处理update_time字段"
- ),
- ]
- prompt_parts.extend(tech_requirements)
- # 组合完整的提示语
- full_prompt = "\n".join(prompt_parts)
- logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
- logger.info(f"完整提示语内容: {full_prompt}")
- # 5. 调用LLM生成SQL脚本
- logger.info("开始调用Deepseek模型生成SQL脚本")
- script_content = llm_sql(full_prompt)
- if not script_content:
- raise ValueError("Deepseek模型返回空内容")
- # 确保返回的是文本格式
- if not isinstance(script_content, str):
- script_content = str(script_content)
- logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
- return script_content
- except Exception as e:
- logger.error(f"生成SQL脚本失败: {str(e)}")
- raise e
- @staticmethod
- def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
- """
- 解析表格式(A:B)并从Neo4j查询元数据生成DDL
- Args:
- table_str: 表格式字符串,格式为"label:name_en"
- table_type: 表类型,用于日志记录(input/output)
- Returns:
- DDL格式的表结构字符串
- """
- try:
- # 解析A:B格式
- if ":" not in table_str:
- logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
- return ""
- parts = table_str.split(":", 1)
- if len(parts) != 2:
- logger.error(f"表格式解析失败: {table_str}")
- return ""
- label = parts[0].strip()
- name_en = parts[1].strip()
- if not label or not name_en:
- logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
- return ""
- logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
- # 从Neo4j查询节点及其关联的元数据
- with connect_graph().session() as session:
- # 查询节点及其关联的元数据
- cypher = f"""
- MATCH (n:{label} {{name_en: $name_en}})
- OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
- RETURN n, collect(m) as metadata
- """
- result = session.run(
- cypher, # type: ignore[arg-type]
- {"name_en": name_en},
- )
- record = result.single()
- if not record:
- logger.error(f"未找到节点: label={label}, name_en={name_en}")
- return ""
- node = record["n"]
- metadata = record["metadata"]
- logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
- # 生成DDL格式的表结构
- ddl_lines = []
- ddl_lines.append(f"CREATE TABLE {name_en} (")
- if metadata:
- column_definitions = []
- for meta in metadata:
- if meta: # 确保meta不为空
- meta_props = dict(meta)
- column_name = meta_props.get(
- "name_en",
- meta_props.get("name_zh", "unknown_column"),
- )
- data_type = meta_props.get("data_type", "VARCHAR(255)")
- comment = meta_props.get("name_zh", "")
- # 构建列定义
- column_def = f" {column_name} {data_type}"
- if comment:
- column_def += f" COMMENT '{comment}'"
- column_definitions.append(column_def)
- if column_definitions:
- ddl_lines.append(",\n".join(column_definitions))
- else:
- ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
- else:
- # 如果没有元数据,添加默认列
- ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
- ddl_lines.append(");")
- # 添加表注释
- node_props = dict(node)
- table_comment = node_props.get(
- "name_zh", node_props.get("describe", name_en)
- )
- if table_comment and table_comment != name_en:
- ddl_lines.append(
- f"COMMENT ON TABLE {name_en} IS '{table_comment}';"
- )
- ddl_content = "\n".join(ddl_lines)
- logger.info(f"{table_type}表DDL生成成功: {name_en}")
- logger.debug(f"生成的DDL: {ddl_content}")
- return ddl_content
- except Exception as e:
- logger.error(f"解析表格式和生成DDL失败: {str(e)}")
- return ""
- @staticmethod
- def _generate_businessdomain_ddl(
- session,
- bd_id: int,
- is_target: bool = False,
- update_mode: str = "append",
- ) -> Optional[Dict[str, Any]]:
- """
- 根据BusinessDomain节点ID生成DDL
- Args:
- session: Neo4j session对象
- bd_id: BusinessDomain节点ID
- is_target: 是否为目标表(目标表需要添加create_time字段)
- update_mode: 更新模式(append或full)
- Returns:
- 包含ddl和data_source信息的字典,如果节点不存在则返回None
- """
- try:
- # 查询BusinessDomain节点、元数据、标签关系和数据源关系
- cypher = """
- MATCH (bd:BusinessDomain)
- WHERE id(bd) = $bd_id
- OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
- OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
- OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
- RETURN bd,
- collect(DISTINCT m) as metadata,
- collect(DISTINCT {
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }) as labels,
- ds.type as ds_type,
- ds.host as ds_host,
- ds.port as ds_port,
- ds.database as ds_database
- """
- result = session.run(cypher, bd_id=bd_id).single()
- if not result or not result["bd"]:
- logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
- return None
- node = result["bd"]
- metadata = result["metadata"]
- # 处理标签数组,获取第一个有效标签名称用于判断
- labels = result.get("labels", [])
- valid_labels = [label for label in labels if label.get("id") is not None]
- label_name = valid_labels[0].get("name_zh") if valid_labels else None
- # 生成DDL
- node_props = dict(node)
- table_name = node_props.get("name_en", f"table_{bd_id}")
- ddl_lines = []
- ddl_lines.append(f"CREATE TABLE {table_name} (")
- column_definitions = []
- # 添加元数据列
- if metadata:
- for meta in metadata:
- if meta:
- meta_props = dict(meta)
- column_name = meta_props.get(
- "name_en",
- meta_props.get("name_zh", "unknown_column"),
- )
- data_type = meta_props.get("data_type", "VARCHAR(255)")
- comment = meta_props.get("name_zh", "")
- column_def = f" {column_name} {data_type}"
- if comment:
- column_def += f" COMMENT '{comment}'"
- column_definitions.append(column_def)
- # 如果没有元数据,添加默认主键
- if not column_definitions:
- column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
- # 5. 如果是目标表,添加create_time字段
- if is_target:
- column_definitions.append(
- " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
- "COMMENT '数据创建时间'"
- )
- ddl_lines.append(",\n".join(column_definitions))
- ddl_lines.append(");")
- # 添加表注释
- table_comment = node_props.get(
- "name_zh", node_props.get("describe", table_name)
- )
- if table_comment and table_comment != table_name:
- ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
- ddl_content = "\n".join(ddl_lines)
- # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
- data_source = None
- if label_name == "数据资源" and result["ds_type"]:
- data_source = {
- "type": result["ds_type"],
- "host": result["ds_host"],
- "port": result["ds_port"],
- "database": result["ds_database"],
- }
- logger.info(f"获取到数据源信息: {data_source}")
- logger.debug(
- f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}"
- )
- return {
- "ddl": ddl_content,
- "table_name": table_name,
- "data_source": data_source,
- }
- except Exception as e:
- logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
- return None
- @staticmethod
- def _handle_script_relationships(
- data: Dict[str, Any],
- dataflow_name: str,
- name_en: str,
- ):
- """
- 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的
- DERIVED_FROM关系
- Args:
- data: 包含脚本信息的数据字典,应包含script_name, script_type,
- schedule_status, source_table, target_table, update_mode
- """
- try:
- # 从data中读取键值对
- script_name = (dataflow_name,)
- script_type = data.get("script_type", "sql")
- schedule_status = data.get("status", "inactive")
- source_table_full = data.get("source_table", "")
- target_table_full = data.get("target_table", "")
- update_mode = data.get("update_mode", "full")
- # 处理source_table和target_table的格式
- source_table = (
- source_table_full.split(":")[-1]
- if ":" in source_table_full
- else source_table_full
- )
- target_table = (
- target_table_full.split(":")[-1]
- if ":" in target_table_full
- else target_table_full
- )
- source_label = (
- source_table_full.split(":")[0]
- if ":" in source_table_full
- else source_table_full
- )
- target_label = (
- target_table_full.split(":")[0]
- if ":" in target_table_full
- else target_table_full
- )
- # 验证必要字段
- if not source_table or not target_table:
- logger.warning(
- "source_table或target_table为空,跳过关系创建: "
- "source_table=%s, target_table=%s",
- source_table,
- target_table,
- )
- return
- logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
- with connect_graph().session() as session:
- # 创建或获取source和target节点
- create_nodes_query = f"""
- MERGE (source:{source_label} {{name: $source_table}})
- ON CREATE SET source.created_at = $created_at,
- source.type = 'source'
- WITH source
- MERGE (target:{target_label} {{name: $target_table}})
- ON CREATE SET target.created_at = $created_at,
- target.type = 'target'
- RETURN source, target, id(source) as source_id,
- id(target) as target_id
- """
- # 执行创建节点的查询
- result = session.run(
- create_nodes_query, # type: ignore[arg-type]
- {
- "source_table": source_table,
- "target_table": target_table,
- "created_at": get_formatted_time(),
- },
- ).single()
- if result:
- source_id = result["source_id"]
- target_id = result["target_id"]
- # 检查并创建关系
- create_relationship_query = f"""
- MATCH (source:{source_label}), (target:{target_label})
- WHERE id(source) = $source_id AND id(target) = $target_id
- AND NOT EXISTS((target)-[:DERIVED_FROM]->(source))
- CREATE (target)-[r:DERIVED_FROM]->(source)
- SET r.script_name = $script_name,
- r.script_type = $script_type,
- r.schedule_status = $schedule_status,
- r.update_mode = $update_mode,
- r.created_at = $created_at,
- r.updated_at = $created_at
- RETURN r
- """
- relationship_result = session.run(
- create_relationship_query, # type: ignore[arg-type]
- {
- "source_id": source_id,
- "target_id": target_id,
- "script_name": script_name,
- "script_type": script_type,
- "schedule_status": schedule_status,
- "update_mode": update_mode,
- "created_at": get_formatted_time(),
- },
- ).single()
- if relationship_result:
- logger.info(
- "成功创建DERIVED_FROM关系: %s -> %s (script: %s)",
- target_table,
- source_table,
- script_name,
- )
- else:
- logger.info(
- "DERIVED_FROM关系已存在: %s -> %s",
- target_table,
- source_table,
- )
- else:
- logger.error(
- "创建表节点失败: source_table=%s, target_table=%s",
- source_table,
- target_table,
- )
- except Exception as e:
- logger.error(f"处理脚本关系失败: {str(e)}")
- raise e
- @staticmethod
- def get_business_domain_list() -> List[Dict[str, Any]]:
- """
- 获取BusinessDomain节点列表
- Returns:
- BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
- """
- try:
- logger.info("开始查询BusinessDomain节点列表")
- with connect_graph().session() as session:
- # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
- query = """
- MATCH (bd:BusinessDomain)
- OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
- RETURN id(bd) as id,
- bd.name_zh as name_zh,
- bd.name_en as name_en,
- bd.create_time as create_time,
- collect({
- id: id(label),
- name_zh: label.name_zh,
- name_en: label.name_en
- }) as tags
- ORDER BY create_time DESC
- """
- result = session.run(query)
- bd_list = []
- for record in result:
- # 处理标签数组,过滤掉空标签
- tags = record.get("tags", [])
- tag_list = [tag for tag in tags if tag.get("id") is not None]
- bd_item = {
- "id": record["id"],
- "name_zh": record.get("name_zh", "") or "",
- "name_en": record.get("name_en", "") or "",
- "tag": tag_list,
- }
- bd_list.append(bd_item)
- logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
- return bd_list
- except Exception as e:
- logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
- raise e
- @staticmethod
- def _register_data_product(
- data: Dict[str, Any],
- dataflow_name: str,
- name_en: str,
- dataflow_id: Optional[int] = None,
- ) -> None:
- """
- 注册数据产品到数据服务
- 当数据流创建成功后,自动将其注册为数据产品,
- 以便在数据服务模块中展示和管理。
- Args:
- data: 数据流配置数据
- dataflow_name: 数据流名称(中文)
- name_en: 数据流英文名
- dataflow_id: 数据流ID(Neo4j节点ID)
- """
- try:
- # 解析目标表信息
- target_table_raw = data.get("target_table") or ""
- target_table = (
- target_table_raw.split(":")[-1]
- if ":" in target_table_raw
- else target_table_raw
- )
- # 如果没有指定目标表,使用英文名作为目标表名
- if not target_table:
- target_table = name_en
- # 解析目标schema(默认为public)
- target_schema = "public"
- # 从script_requirement中尝试获取更多信息
- script_requirement = data.get("script_requirement")
- description = data.get("describe", "")
- if script_requirement:
- if isinstance(script_requirement, dict):
- # 如果有rule字段,添加到描述中
- rule = script_requirement.get("rule", "")
- if rule and not description:
- description = rule
- elif isinstance(script_requirement, str):
- try:
- req_json = json.loads(script_requirement)
- if isinstance(req_json, dict):
- rule = req_json.get("rule", "")
- if rule and not description:
- description = rule
- except (json.JSONDecodeError, TypeError):
- pass
- # 调用数据产品服务进行注册
- DataProductService.register_data_product(
- product_name=dataflow_name,
- product_name_en=name_en,
- target_table=target_table,
- target_schema=target_schema,
- description=description,
- source_dataflow_id=dataflow_id,
- source_dataflow_name=dataflow_name,
- created_by=data.get("created_by", "dataflow"),
- )
- logger.info(
- f"数据产品注册成功: {dataflow_name} -> {target_schema}.{target_table}"
- )
- except Exception as e:
- logger.error(f"注册数据产品失败: {str(e)}")
- raise
|