dataflows.py 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279
  1. import logging
  2. from typing import Dict, List, Optional, Any, Union
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_client, llm_sql
  6. from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
  7. from app.core.meta_data import translate_and_parse, get_formatted_time
  8. from py2neo import Relationship
  9. from app import db
  10. from sqlalchemy import text
  11. logger = logging.getLogger(__name__)
  12. class DataFlowService:
  13. """数据流服务类,处理数据流相关的业务逻辑"""
  14. @staticmethod
  15. def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
  16. """
  17. 获取数据流列表
  18. Args:
  19. page: 页码
  20. page_size: 每页大小
  21. search: 搜索关键词
  22. Returns:
  23. 包含数据流列表和分页信息的字典
  24. """
  25. try:
  26. # 从图数据库查询数据流列表
  27. skip_count = (page - 1) * page_size
  28. # 构建搜索条件
  29. where_clause = ""
  30. params: Dict[str, Union[int, str]] = {'skip': skip_count, 'limit': page_size}
  31. if search:
  32. where_clause = "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
  33. params['search'] = search
  34. # 查询数据流列表
  35. query = f"""
  36. MATCH (n:DataFlow)
  37. {where_clause}
  38. RETURN n, id(n) as node_id
  39. ORDER BY n.created_at DESC
  40. SKIP $skip
  41. LIMIT $limit
  42. """
  43. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  44. try:
  45. with connect_graph().session() as session:
  46. list_result = session.run(query, params).data()
  47. # 查询总数
  48. count_query = f"""
  49. MATCH (n:DataFlow)
  50. {where_clause}
  51. RETURN count(n) as total
  52. """
  53. count_params = {'search': search} if search else {}
  54. count_result = session.run(count_query, count_params).single()
  55. total = count_result['total'] if count_result else 0
  56. except Exception as e:
  57. # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭driver,因为connect_graph返回的可能是单例或新实例,
  58. # 但如果是新实例,我们没有引用它去关闭。如果connect_graph设计为每次返回新实例且需要关闭,
  59. # 那么之前的代码是对的。如果connect_graph返回单例,则不应关闭。
  60. # 根据用户反馈:The driver.close() call prematurely closes a shared driver instance.
  61. # 所以我们直接使用 session,并不关闭 driver。
  62. logger.error(f"查询数据流失败: {str(e)}")
  63. raise e
  64. # 格式化结果
  65. dataflows = []
  66. for record in list_result:
  67. node = record['n']
  68. dataflow = dict(node)
  69. dataflow['id'] = record['node_id'] # 使用查询返回的node_id
  70. dataflows.append(dataflow)
  71. return {
  72. 'list': dataflows,
  73. 'pagination': {
  74. 'page': page,
  75. 'page_size': page_size,
  76. 'total': total,
  77. 'total_pages': (total + page_size - 1) // page_size
  78. }
  79. }
  80. except Exception as e:
  81. logger.error(f"获取数据流列表失败: {str(e)}")
  82. raise e
  83. @staticmethod
  84. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  85. """
  86. 根据ID获取数据流详情
  87. Args:
  88. dataflow_id: 数据流ID
  89. Returns:
  90. 数据流详情字典,如果不存在则返回None
  91. """
  92. try:
  93. # 从Neo4j获取DataFlow节点的所有属性
  94. neo4j_query = """
  95. MATCH (n:DataFlow)
  96. WHERE id(n) = $dataflow_id
  97. RETURN n, id(n) as node_id
  98. """
  99. with connect_graph().session() as session:
  100. neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
  101. if not neo4j_result:
  102. logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
  103. return None
  104. record = neo4j_result[0]
  105. node = record['n']
  106. # 将节点属性转换为字典
  107. dataflow = dict(node)
  108. dataflow['id'] = record['node_id']
  109. # 处理 script_requirement:如果是JSON字符串,解析为对象
  110. script_requirement_str = dataflow.get('script_requirement', '')
  111. if script_requirement_str:
  112. try:
  113. # 尝试解析JSON字符串
  114. script_requirement_obj = json.loads(script_requirement_str)
  115. dataflow['script_requirement'] = script_requirement_obj
  116. logger.debug(f"成功解析script_requirement: {script_requirement_obj}")
  117. except (json.JSONDecodeError, TypeError) as e:
  118. logger.warning(f"script_requirement解析失败,保持原值: {e}")
  119. # 保持原值(字符串)
  120. dataflow['script_requirement'] = script_requirement_str
  121. else:
  122. # 如果为空,设置为None
  123. dataflow['script_requirement'] = None
  124. logger.info(f"成功获取DataFlow详情,ID: {dataflow_id}, 名称: {dataflow.get('name_zh')}")
  125. return dataflow
  126. except Exception as e:
  127. logger.error(f"获取数据流详情失败: {str(e)}")
  128. raise e
  129. @staticmethod
  130. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  131. """
  132. 创建新的数据流
  133. Args:
  134. data: 数据流配置数据
  135. Returns:
  136. 创建的数据流信息
  137. """
  138. try:
  139. # 验证必填字段
  140. required_fields = ['name_zh', 'describe']
  141. for field in required_fields:
  142. if field not in data:
  143. raise ValueError(f"缺少必填字段: {field}")
  144. dataflow_name = data['name_zh']
  145. # 使用LLM翻译名称生成英文名
  146. try:
  147. result_list = translate_and_parse(dataflow_name)
  148. name_en = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
  149. except Exception as e:
  150. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  151. name_en = dataflow_name.lower().replace(' ', '_')
  152. # 处理 script_requirement,将其转换为 JSON 字符串
  153. script_requirement = data.get('script_requirement', None)
  154. if script_requirement is not None:
  155. # 如果是字典或列表,转换为 JSON 字符串
  156. if isinstance(script_requirement, (dict, list)):
  157. script_requirement_str = json.dumps(script_requirement, ensure_ascii=False)
  158. else:
  159. # 如果已经是字符串,直接使用
  160. script_requirement_str = str(script_requirement)
  161. else:
  162. script_requirement_str = ''
  163. # 准备节点数据
  164. node_data = {
  165. 'name_zh': dataflow_name,
  166. 'name_en': name_en,
  167. 'category': data.get('category', ''),
  168. 'organization': data.get('organization', ''),
  169. 'leader': data.get('leader', ''),
  170. 'frequency': data.get('frequency', ''),
  171. 'tag': data.get('tag', ''),
  172. 'describe': data.get('describe', ''),
  173. 'status': data.get('status', 'inactive'),
  174. 'update_mode': data.get('update_mode', 'append'),
  175. 'script_type': data.get('script_type', 'python'),
  176. 'script_requirement': script_requirement_str,
  177. 'created_at': get_formatted_time(),
  178. 'updated_at': get_formatted_time()
  179. }
  180. # 创建或获取数据流节点
  181. dataflow_id = get_node('DataFlow', name=dataflow_name)
  182. if dataflow_id:
  183. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  184. dataflow_id = create_or_get_node('DataFlow', **node_data)
  185. # 处理标签关系
  186. tag_id = data.get('tag')
  187. if tag_id is not None:
  188. try:
  189. DataFlowService._handle_tag_relationship(dataflow_id, tag_id)
  190. except Exception as e:
  191. logger.warning(f"处理标签关系时出错: {str(e)}")
  192. # 成功创建图数据库节点后,写入PG数据库
  193. try:
  194. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  195. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  196. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  197. try:
  198. DataFlowService._handle_script_relationships(data,dataflow_name,name_en)
  199. logger.info(f"脚本关系创建成功: {dataflow_name}")
  200. except Exception as script_error:
  201. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  202. except Exception as pg_error:
  203. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  204. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  205. # 在实际应用中,可能需要考虑分布式事务
  206. # 返回创建的数据流信息
  207. # 查询创建的节点获取完整信息
  208. query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
  209. with connect_graph().session() as session:
  210. id_result = session.run(query, name_zh=dataflow_name).single()
  211. if id_result:
  212. dataflow_node = id_result['n']
  213. node_id = id_result['node_id']
  214. # 将节点属性转换为字典
  215. result = dict(dataflow_node)
  216. result['id'] = node_id
  217. else:
  218. # 如果查询失败,返回基本信息
  219. result = {
  220. 'id': dataflow_id if isinstance(dataflow_id, int) else None,
  221. 'name_zh': dataflow_name,
  222. 'name_en': name_en,
  223. 'created_at': get_formatted_time()
  224. }
  225. logger.info(f"创建数据流成功: {dataflow_name}")
  226. return result
  227. except Exception as e:
  228. logger.error(f"创建数据流失败: {str(e)}")
  229. raise e
  230. @staticmethod
  231. def _save_to_pg_database(data: Dict[str, Any], script_name: str, name_en: str):
  232. """
  233. 将脚本信息保存到PG数据库
  234. Args:
  235. data: 包含脚本信息的数据
  236. script_name: 脚本名称
  237. name_en: 英文名称
  238. """
  239. try:
  240. # 提取脚本相关信息
  241. # 处理 script_requirement,确保保存为 JSON 字符串
  242. script_requirement_raw = data.get('script_requirement', None)
  243. rule_from_requirement = '' # 用于保存从 script_requirement 中提取的 rule
  244. if script_requirement_raw is not None:
  245. # 如果是字典,提取 rule 字段
  246. if isinstance(script_requirement_raw, dict):
  247. rule_from_requirement = script_requirement_raw.get('rule', '')
  248. script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
  249. elif isinstance(script_requirement_raw, list):
  250. script_requirement = json.dumps(script_requirement_raw, ensure_ascii=False)
  251. else:
  252. # 如果已经是字符串,尝试解析以提取 rule
  253. script_requirement = str(script_requirement_raw)
  254. try:
  255. parsed_req = json.loads(script_requirement)
  256. if isinstance(parsed_req, dict):
  257. rule_from_requirement = parsed_req.get('rule', '')
  258. except (json.JSONDecodeError, TypeError):
  259. pass
  260. else:
  261. script_requirement = ''
  262. # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
  263. script_content = data.get('script_content', '')
  264. if not script_content and rule_from_requirement:
  265. script_content = rule_from_requirement
  266. logger.info(f"script_content为空,使用从script_requirement提取的rule: {rule_from_requirement}")
  267. # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误)
  268. source_table_raw = data.get('source_table') or ''
  269. source_table = source_table_raw.split(':')[-1] if ':' in source_table_raw else source_table_raw
  270. target_table_raw = data.get('target_table') or ''
  271. target_table = target_table_raw.split(':')[-1] if ':' in target_table_raw else (target_table_raw or name_en)
  272. script_type = data.get('script_type', 'python')
  273. user_name = data.get('created_by', 'system')
  274. target_dt_column = data.get('target_dt_column', '')
  275. # 验证必需字段
  276. if not target_table:
  277. target_table = name_en
  278. if not script_name:
  279. raise ValueError("script_name不能为空")
  280. # 构建插入SQL
  281. insert_sql = text("""
  282. INSERT INTO dags.data_transform_scripts
  283. (source_table, target_table, script_name, script_type, script_requirement,
  284. script_content, user_name, create_time, update_time, target_dt_column)
  285. VALUES
  286. (:source_table, :target_table, :script_name, :script_type, :script_requirement,
  287. :script_content, :user_name, :create_time, :update_time, :target_dt_column)
  288. ON CONFLICT (target_table, script_name)
  289. DO UPDATE SET
  290. source_table = EXCLUDED.source_table,
  291. script_type = EXCLUDED.script_type,
  292. script_requirement = EXCLUDED.script_requirement,
  293. script_content = EXCLUDED.script_content,
  294. user_name = EXCLUDED.user_name,
  295. update_time = EXCLUDED.update_time,
  296. target_dt_column = EXCLUDED.target_dt_column
  297. """)
  298. # 准备参数
  299. current_time = datetime.now()
  300. params = {
  301. 'source_table': source_table,
  302. 'target_table': target_table,
  303. 'script_name': script_name,
  304. 'script_type': script_type,
  305. 'script_requirement': script_requirement,
  306. 'script_content': script_content,
  307. 'user_name': user_name,
  308. 'create_time': current_time,
  309. 'update_time': current_time,
  310. 'target_dt_column': target_dt_column
  311. }
  312. # 执行插入操作
  313. db.session.execute(insert_sql, params)
  314. # 新增:保存到task_list表
  315. try:
  316. # 1. 解析script_requirement并构建详细的任务描述
  317. task_description_md = script_requirement
  318. try:
  319. # 尝试解析JSON
  320. try:
  321. req_json = json.loads(script_requirement)
  322. except (json.JSONDecodeError, TypeError):
  323. req_json = None
  324. if isinstance(req_json, dict):
  325. # 1. 从script_requirement中提取rule字段作为request_content_str
  326. request_content_str = req_json.get('rule', '')
  327. # 2. 从script_requirement中提取source_table和target_table字段信息
  328. source_table_ids = req_json.get('source_table', [])
  329. target_table_ids = req_json.get('target_table', [])
  330. # 确保是列表格式
  331. if not isinstance(source_table_ids, list):
  332. source_table_ids = [source_table_ids] if source_table_ids else []
  333. if not isinstance(target_table_ids, list):
  334. target_table_ids = [target_table_ids] if target_table_ids else []
  335. # 合并所有BusinessDomain ID
  336. all_bd_ids = source_table_ids + target_table_ids
  337. # 4. 从data参数中提取update_mode
  338. update_mode = data.get('update_mode', 'append')
  339. # 生成Business Domain DDLs
  340. source_ddls = []
  341. target_ddls = []
  342. data_source_info = None
  343. if all_bd_ids:
  344. try:
  345. with connect_graph().session() as session:
  346. # 处理source tables
  347. for bd_id in source_table_ids:
  348. ddl_info = DataFlowService._generate_businessdomain_ddl(
  349. session, bd_id, is_target=False
  350. )
  351. if ddl_info:
  352. source_ddls.append(ddl_info['ddl'])
  353. # 3. 如果BELONGS_TO关系连接的是"数据资源",获取数据源信息
  354. if ddl_info.get('data_source') and not data_source_info:
  355. data_source_info = ddl_info['data_source']
  356. # 处理target tables(5. 目标表缺省要有create_time字段)
  357. for bd_id in target_table_ids:
  358. ddl_info = DataFlowService._generate_businessdomain_ddl(
  359. session, bd_id, is_target=True, update_mode=update_mode
  360. )
  361. if ddl_info:
  362. target_ddls.append(ddl_info['ddl'])
  363. # 同样检查BELONGS_TO关系,获取数据源信息
  364. if ddl_info.get('data_source') and not data_source_info:
  365. data_source_info = ddl_info['data_source']
  366. except Exception as neo_e:
  367. logger.error(f"获取BusinessDomain DDL失败: {str(neo_e)}")
  368. # 构建Markdown格式的任务描述
  369. task_desc_parts = [f"# Task: {script_name}\n"]
  370. # 添加数据源信息
  371. if data_source_info:
  372. task_desc_parts.append("## Data Source")
  373. task_desc_parts.append(f"- **Type**: {data_source_info.get('type', 'N/A')}")
  374. task_desc_parts.append(f"- **Host**: {data_source_info.get('host', 'N/A')}")
  375. task_desc_parts.append(f"- **Port**: {data_source_info.get('port', 'N/A')}")
  376. task_desc_parts.append(f"- **Database**: {data_source_info.get('database', 'N/A')}\n")
  377. # 添加源表DDL
  378. if source_ddls:
  379. task_desc_parts.append("## Source Tables (DDL)")
  380. for ddl in source_ddls:
  381. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  382. # 添加目标表DDL
  383. if target_ddls:
  384. task_desc_parts.append("## Target Tables (DDL)")
  385. for ddl in target_ddls:
  386. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  387. # 添加更新模式说明
  388. task_desc_parts.append("## Update Mode")
  389. if update_mode == 'append':
  390. task_desc_parts.append("- **Mode**: Append (追加模式)")
  391. task_desc_parts.append("- **Description**: 新数据将追加到目标表,不删除现有数据\n")
  392. else:
  393. task_desc_parts.append("- **Mode**: Full Refresh (全量更新)")
  394. task_desc_parts.append("- **Description**: 目标表将被清空后重新写入数据\n")
  395. # 添加请求内容(rule)
  396. if request_content_str:
  397. task_desc_parts.append("## Request Content")
  398. task_desc_parts.append(f"{request_content_str}\n")
  399. # 添加实施步骤(根据任务类型优化)
  400. task_desc_parts.append("## Implementation Steps")
  401. # 判断是否为远程数据源导入任务
  402. if data_source_info:
  403. # 从远程数据源导入数据的简化步骤
  404. task_desc_parts.append("1. Create an n8n workflow to execute the data import task")
  405. task_desc_parts.append("2. Configure the workflow to call `import_resource_data.py` Python script")
  406. task_desc_parts.append("3. Pass the following parameters to the Python execution node:")
  407. task_desc_parts.append(" - `--source-config`: JSON configuration for the remote data source")
  408. task_desc_parts.append(" - `--target-table`: Target table name (data resource English name)")
  409. task_desc_parts.append(f" - `--update-mode`: {update_mode}")
  410. task_desc_parts.append("4. The Python script will automatically:")
  411. task_desc_parts.append(" - Connect to the remote data source")
  412. task_desc_parts.append(" - Extract data from the source table")
  413. task_desc_parts.append(f" - Write data to target table using {update_mode} mode")
  414. else:
  415. # 数据转换任务的完整步骤
  416. task_desc_parts.append("1. Extract data from source tables as specified in the DDL")
  417. task_desc_parts.append("2. Apply transformation logic according to the rule:")
  418. if request_content_str:
  419. task_desc_parts.append(f" - Rule: {request_content_str}")
  420. task_desc_parts.append("3. Generate Python program to implement the data transformation logic")
  421. task_desc_parts.append(f"4. Write transformed data to target table using {update_mode} mode")
  422. task_desc_parts.append("5. Create an n8n workflow to schedule and execute the Python program")
  423. task_description_md = "\n".join(task_desc_parts)
  424. except Exception as parse_e:
  425. logger.warning(f"解析任务描述详情失败,使用原始描述: {str(parse_e)}")
  426. task_description_md = script_requirement
  427. # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
  428. code_path = 'app/core/data_flow'
  429. task_insert_sql = text("""
  430. INSERT INTO public.task_list
  431. (task_name, task_description, status, code_name, code_path, create_by, create_time, update_time)
  432. VALUES
  433. (:task_name, :task_description, :status, :code_name, :code_path, :create_by, :create_time, :update_time)
  434. """)
  435. task_params = {
  436. 'task_name': script_name,
  437. 'task_description': task_description_md,
  438. 'status': 'pending',
  439. 'code_name': script_name,
  440. 'code_path': code_path,
  441. 'create_by': 'cursor',
  442. 'create_time': current_time,
  443. 'update_time': current_time
  444. }
  445. # 使用嵌套事务,确保task_list插入失败不影响主流程
  446. with db.session.begin_nested():
  447. db.session.execute(task_insert_sql, task_params)
  448. logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
  449. except Exception as task_error:
  450. # 记录错误但不中断主流程
  451. logger.error(f"写入task_list表失败: {str(task_error)}")
  452. # 如果要求必须成功写入任务列表,则这里应该raise task_error
  453. # raise task_error
  454. db.session.commit()
  455. logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")
  456. except Exception as e:
  457. db.session.rollback()
  458. logger.error(f"写入PG数据库失败: {str(e)}")
  459. raise e
  460. @staticmethod
  461. def _handle_children_relationships(dataflow_node, children_ids):
  462. """处理子节点关系"""
  463. logger.debug(f"处理子节点关系,原始children_ids: {children_ids}, 类型: {type(children_ids)}")
  464. # 确保children_ids是列表格式
  465. if not isinstance(children_ids, (list, tuple)):
  466. if children_ids is not None:
  467. children_ids = [children_ids] # 如果是单个值,转换为列表
  468. logger.debug(f"将单个值转换为列表: {children_ids}")
  469. else:
  470. children_ids = [] # 如果是None,转换为空列表
  471. logger.debug("将None转换为空列表")
  472. for child_id in children_ids:
  473. try:
  474. # 查找子节点
  475. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  476. with connect_graph().session() as session:
  477. result = session.run(query, child_id=child_id).data()
  478. if result:
  479. child_node = result[0]['n']
  480. # 获取dataflow_node的ID
  481. dataflow_id = getattr(dataflow_node, 'identity', None)
  482. if dataflow_id is None:
  483. # 如果没有identity属性,从名称查询ID
  484. query_id = "MATCH (n:DataFlow) WHERE n.name_zh = $name_zh RETURN id(n) as node_id"
  485. id_result = session.run(query_id, name_zh=dataflow_node.get('name_zh')).single()
  486. dataflow_id = id_result['node_id'] if id_result else None
  487. # 创建关系 - 使用ID调用relationship_exists
  488. if dataflow_id and not relationship_exists(dataflow_id, 'child', child_id):
  489. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $child_id CREATE (a)-[:child]->(b)",
  490. dataflow_id=dataflow_id, child_id=child_id)
  491. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  492. except Exception as e:
  493. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  494. @staticmethod
  495. def _handle_tag_relationship(dataflow_id, tag_id):
  496. """处理标签关系"""
  497. try:
  498. # 查找标签节点
  499. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  500. with connect_graph().session() as session:
  501. result = session.run(query, tag_id=tag_id).data()
  502. if result:
  503. tag_node = result[0]['n']
  504. # 创建关系 - 使用ID调用relationship_exists
  505. if dataflow_id and not relationship_exists(dataflow_id, 'LABEL', tag_id):
  506. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $tag_id CREATE (a)-[:LABEL]->(b)",
  507. dataflow_id=dataflow_id, tag_id=tag_id)
  508. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  509. except Exception as e:
  510. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  511. @staticmethod
  512. def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
  513. """
  514. 更新数据流
  515. Args:
  516. dataflow_id: 数据流ID
  517. data: 更新的数据
  518. Returns:
  519. 更新后的数据流信息,如果不存在则返回None
  520. """
  521. try:
  522. # 查找节点
  523. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  524. with connect_graph().session() as session:
  525. result = session.run(query, dataflow_id=dataflow_id).data()
  526. if not result:
  527. return None
  528. # 更新节点属性
  529. update_fields = []
  530. params: Dict[str, Any] = {'dataflow_id': dataflow_id}
  531. for key, value in data.items():
  532. if key not in ['id', 'created_at']: # 保护字段
  533. if key == 'config' and isinstance(value, dict):
  534. value = json.dumps(value, ensure_ascii=False)
  535. update_fields.append(f"n.{key} = ${key}")
  536. params[key] = value
  537. if update_fields:
  538. params['updated_at'] = get_formatted_time()
  539. update_fields.append("n.updated_at = $updated_at")
  540. update_query = f"""
  541. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  542. SET {', '.join(update_fields)}
  543. RETURN n, id(n) as node_id
  544. """
  545. result = session.run(update_query, params).data()
  546. if result:
  547. node = result[0]['n']
  548. updated_dataflow = dict(node)
  549. updated_dataflow['id'] = result[0]['node_id'] # 使用查询返回的node_id
  550. logger.info(f"更新数据流成功: ID={dataflow_id}")
  551. return updated_dataflow
  552. return None
  553. except Exception as e:
  554. logger.error(f"更新数据流失败: {str(e)}")
  555. raise e
  556. @staticmethod
  557. def delete_dataflow(dataflow_id: int) -> bool:
  558. """
  559. 删除数据流
  560. Args:
  561. dataflow_id: 数据流ID
  562. Returns:
  563. 删除是否成功
  564. """
  565. try:
  566. # 删除节点及其关系
  567. query = """
  568. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  569. DETACH DELETE n
  570. RETURN count(n) as deleted_count
  571. """
  572. with connect_graph().session() as session:
  573. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  574. result = delete_result['deleted_count'] if delete_result else 0
  575. if result and result > 0:
  576. logger.info(f"删除数据流成功: ID={dataflow_id}")
  577. return True
  578. return False
  579. except Exception as e:
  580. logger.error(f"删除数据流失败: {str(e)}")
  581. raise e
  582. @staticmethod
  583. def execute_dataflow(dataflow_id: int, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
  584. """
  585. 执行数据流
  586. Args:
  587. dataflow_id: 数据流ID
  588. params: 执行参数
  589. Returns:
  590. 执行结果信息
  591. """
  592. try:
  593. # 检查数据流是否存在
  594. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  595. with connect_graph().session() as session:
  596. result = session.run(query, dataflow_id=dataflow_id).data()
  597. if not result:
  598. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  599. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  600. # TODO: 这里应该实际执行数据流
  601. # 目前返回模拟结果
  602. result = {
  603. 'execution_id': execution_id,
  604. 'dataflow_id': dataflow_id,
  605. 'status': 'running',
  606. 'started_at': datetime.now().isoformat(),
  607. 'params': params or {},
  608. 'progress': 0
  609. }
  610. logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
  611. return result
  612. except Exception as e:
  613. logger.error(f"执行数据流失败: {str(e)}")
  614. raise e
  615. @staticmethod
  616. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  617. """
  618. 获取数据流执行状态
  619. Args:
  620. dataflow_id: 数据流ID
  621. Returns:
  622. 执行状态信息
  623. """
  624. try:
  625. # TODO: 这里应该查询实际的执行状态
  626. # 目前返回模拟状态
  627. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  628. with connect_graph().session() as session:
  629. result = session.run(query, dataflow_id=dataflow_id).data()
  630. if not result:
  631. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  632. status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
  633. return {
  634. 'dataflow_id': dataflow_id,
  635. 'status': status,
  636. 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
  637. 'started_at': datetime.now().isoformat(),
  638. 'completed_at': datetime.now().isoformat() if status == 'completed' else None,
  639. 'error_message': '执行过程中发生错误' if status == 'failed' else None
  640. }
  641. except Exception as e:
  642. logger.error(f"获取数据流状态失败: {str(e)}")
  643. raise e
  644. @staticmethod
  645. def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
  646. """
  647. 获取数据流执行日志
  648. Args:
  649. dataflow_id: 数据流ID
  650. page: 页码
  651. page_size: 每页大小
  652. Returns:
  653. 执行日志列表和分页信息
  654. """
  655. try:
  656. # TODO: 这里应该查询实际的执行日志
  657. # 目前返回模拟日志
  658. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  659. with connect_graph().session() as session:
  660. result = session.run(query, dataflow_id=dataflow_id).data()
  661. if not result:
  662. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  663. mock_logs = [
  664. {
  665. 'id': i,
  666. 'timestamp': datetime.now().isoformat(),
  667. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  668. 'message': f'数据流执行日志消息 {i}',
  669. 'component': ['source', 'transform', 'target'][i % 3]
  670. }
  671. for i in range(1, 101)
  672. ]
  673. # 分页处理
  674. total = len(mock_logs)
  675. start = (page - 1) * page_size
  676. end = start + page_size
  677. logs = mock_logs[start:end]
  678. return {
  679. 'logs': logs,
  680. 'pagination': {
  681. 'page': page,
  682. 'page_size': page_size,
  683. 'total': total,
  684. 'total_pages': (total + page_size - 1) // page_size
  685. }
  686. }
  687. except Exception as e:
  688. logger.error(f"获取数据流日志失败: {str(e)}")
  689. raise e
  690. @staticmethod
  691. def create_script(request_data: Union[Dict[str, Any], str]) -> str:
  692. """
  693. 使用Deepseek模型生成SQL脚本
  694. Args:
  695. request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
  696. Returns:
  697. 生成的SQL脚本内容
  698. """
  699. try:
  700. logger.info(f"开始处理脚本生成请求: {request_data}")
  701. logger.info(f"request_data类型: {type(request_data)}")
  702. # 类型检查和处理
  703. if isinstance(request_data, str):
  704. logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
  705. try:
  706. import json
  707. request_data = json.loads(request_data)
  708. except json.JSONDecodeError as e:
  709. raise ValueError(f"无法解析request_data为JSON: {str(e)}")
  710. if not isinstance(request_data, dict):
  711. raise ValueError(f"request_data必须是字典类型,实际类型: {type(request_data)}")
  712. # 1. 从传入的request_data中解析input, output, request_content内容
  713. input_data = request_data.get('input', '')
  714. output_data = request_data.get('output', '')
  715. request_content = request_data.get('request_data', '')
  716. # 如果request_content是HTML格式,提取纯文本
  717. if request_content and (request_content.startswith('<p>') or '<' in request_content):
  718. # 简单的HTML标签清理
  719. import re
  720. request_content = re.sub(r'<[^>]+>', '', request_content).strip()
  721. if not input_data or not output_data or not request_content:
  722. raise ValueError(f"缺少必要参数:input='{input_data}', output='{output_data}', request_content='{request_content[:100] if request_content else ''}' 不能为空")
  723. logger.info(f"解析得到 - input: {input_data}, output: {output_data}, request_content: {request_content}")
  724. # 2. 解析input中的多个数据表并生成源表DDL
  725. source_tables_ddl = []
  726. input_tables = []
  727. if input_data:
  728. tables = [table.strip() for table in input_data.split(',') if table.strip()]
  729. for table in tables:
  730. ddl = DataFlowService._parse_table_and_get_ddl(table, 'input')
  731. if ddl:
  732. input_tables.append(table)
  733. source_tables_ddl.append(ddl)
  734. else:
  735. logger.warning(f"无法获取输入表 {table} 的DDL结构")
  736. # 3. 解析output中的数据表并生成目标表DDL
  737. target_table_ddl = ""
  738. if output_data:
  739. target_table_ddl = DataFlowService._parse_table_and_get_ddl(output_data.strip(), 'output')
  740. if not target_table_ddl:
  741. logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
  742. # 4. 按照Deepseek-prompt.txt的框架构建提示语
  743. prompt_parts = []
  744. # 开场白 - 角色定义
  745. prompt_parts.append("你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。请为以下需求生成一段标准的 PostgreSQL SQL 脚本:")
  746. # 动态生成源表部分(第1点)
  747. for i, (table, ddl) in enumerate(zip(input_tables, source_tables_ddl), 1):
  748. table_name = table.split(':')[-1] if ':' in table else table
  749. prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
  750. prompt_parts.append(ddl)
  751. prompt_parts.append("") # 添加空行分隔
  752. # 动态生成目标表部分(第2点)
  753. if target_table_ddl:
  754. target_table_name = output_data.split(':')[-1] if ':' in output_data else output_data
  755. next_index = len(input_tables) + 1
  756. prompt_parts.append(f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:")
  757. prompt_parts.append(target_table_ddl)
  758. prompt_parts.append("") # 添加空行分隔
  759. # 动态生成处理逻辑部分(第3点)
  760. next_index = len(input_tables) + 2 if target_table_ddl else len(input_tables) + 1
  761. prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
  762. prompt_parts.append("") # 添加空行分隔
  763. # 固定的技术要求部分(第4-8点)
  764. tech_requirements = [
  765. f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,适合在 Airflow、Python 脚本、或调度系统中调用;",
  766. f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
  767. f"{next_index + 3}.请直接输出SQL,无需进行解释。",
  768. f"{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用\"_\"分隔,采用蛇形命名法。把sql的名字作为注释写在返回的sql中。",
  769. f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期时间now(),不用处理update_time字段"
  770. ]
  771. prompt_parts.extend(tech_requirements)
  772. # 组合完整的提示语
  773. full_prompt = "\n".join(prompt_parts)
  774. logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
  775. logger.info(f"完整提示语内容: {full_prompt}")
  776. # 5. 调用LLM生成SQL脚本
  777. logger.info("开始调用Deepseek模型生成SQL脚本")
  778. script_content = llm_sql(full_prompt)
  779. if not script_content:
  780. raise ValueError("Deepseek模型返回空内容")
  781. # 确保返回的是文本格式
  782. if not isinstance(script_content, str):
  783. script_content = str(script_content)
  784. logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
  785. return script_content
  786. except Exception as e:
  787. logger.error(f"生成SQL脚本失败: {str(e)}")
  788. raise e
  789. @staticmethod
  790. def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
  791. """
  792. 解析表格式(A:B)并从Neo4j查询元数据生成DDL
  793. Args:
  794. table_str: 表格式字符串,格式为"label:name_en"
  795. table_type: 表类型,用于日志记录(input/output)
  796. Returns:
  797. DDL格式的表结构字符串
  798. """
  799. try:
  800. # 解析A:B格式
  801. if ':' not in table_str:
  802. logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
  803. return ""
  804. parts = table_str.split(':', 1)
  805. if len(parts) != 2:
  806. logger.error(f"表格式解析失败: {table_str}")
  807. return ""
  808. label = parts[0].strip()
  809. name_en = parts[1].strip()
  810. if not label or not name_en:
  811. logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
  812. return ""
  813. logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
  814. # 从Neo4j查询节点及其关联的元数据
  815. with connect_graph().session() as session:
  816. # 查询节点及其关联的元数据
  817. cypher = f"""
  818. MATCH (n:{label} {{name_en: $name_en}})
  819. OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
  820. RETURN n, collect(m) as metadata
  821. """
  822. result = session.run(cypher, {'name_en': name_en}) # type: ignore[arg-type]
  823. record = result.single()
  824. if not record:
  825. logger.error(f"未找到节点: label={label}, name_en={name_en}")
  826. return ""
  827. node = record['n']
  828. metadata = record['metadata']
  829. logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
  830. # 生成DDL格式的表结构
  831. ddl_lines = []
  832. ddl_lines.append(f"CREATE TABLE {name_en} (")
  833. if metadata:
  834. column_definitions = []
  835. for meta in metadata:
  836. if meta: # 确保meta不为空
  837. meta_props = dict(meta)
  838. column_name = meta_props.get('name_en', meta_props.get('name_zh', 'unknown_column'))
  839. data_type = meta_props.get('data_type', 'VARCHAR(255)')
  840. comment = meta_props.get('name_zh', '')
  841. # 构建列定义
  842. column_def = f" {column_name} {data_type}"
  843. if comment:
  844. column_def += f" COMMENT '{comment}'"
  845. column_definitions.append(column_def)
  846. if column_definitions:
  847. ddl_lines.append(",\n".join(column_definitions))
  848. else:
  849. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  850. else:
  851. # 如果没有元数据,添加默认列
  852. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  853. ddl_lines.append(");")
  854. # 添加表注释
  855. node_props = dict(node)
  856. table_comment = node_props.get('name_zh', node_props.get('describe', name_en))
  857. if table_comment and table_comment != name_en:
  858. ddl_lines.append(f"COMMENT ON TABLE {name_en} IS '{table_comment}';")
  859. ddl_content = "\n".join(ddl_lines)
  860. logger.info(f"{table_type}表DDL生成成功: {name_en}")
  861. logger.debug(f"生成的DDL: {ddl_content}")
  862. return ddl_content
  863. except Exception as e:
  864. logger.error(f"解析表格式和生成DDL失败: {str(e)}")
  865. return ""
  866. @staticmethod
  867. def _generate_businessdomain_ddl(session, bd_id: int, is_target: bool = False, update_mode: str = 'append') -> Optional[Dict[str, Any]]:
  868. """
  869. 根据BusinessDomain节点ID生成DDL
  870. Args:
  871. session: Neo4j session对象
  872. bd_id: BusinessDomain节点ID
  873. is_target: 是否为目标表(目标表需要添加create_time字段)
  874. update_mode: 更新模式(append或full)
  875. Returns:
  876. 包含ddl和data_source信息的字典,如果节点不存在则返回None
  877. """
  878. try:
  879. # 查询BusinessDomain节点、元数据、标签关系和数据源关系
  880. cypher = """
  881. MATCH (bd:BusinessDomain)
  882. WHERE id(bd) = $bd_id
  883. OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
  884. OPTIONAL MATCH (bd)-[:BELONGS_TO]->(label:DataLabel)
  885. OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
  886. RETURN bd,
  887. collect(DISTINCT m) as metadata,
  888. label.name_zh as label_name,
  889. ds.type as ds_type,
  890. ds.host as ds_host,
  891. ds.port as ds_port,
  892. ds.database as ds_database
  893. """
  894. result = session.run(cypher, bd_id=bd_id).single()
  895. if not result or not result['bd']:
  896. logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
  897. return None
  898. node = result['bd']
  899. metadata = result['metadata']
  900. label_name = result['label_name']
  901. # 生成DDL
  902. node_props = dict(node)
  903. table_name = node_props.get('name_en', f'table_{bd_id}')
  904. ddl_lines = []
  905. ddl_lines.append(f"CREATE TABLE {table_name} (")
  906. column_definitions = []
  907. # 添加元数据列
  908. if metadata:
  909. for meta in metadata:
  910. if meta:
  911. meta_props = dict(meta)
  912. column_name = meta_props.get('name_en', meta_props.get('name_zh', 'unknown_column'))
  913. data_type = meta_props.get('data_type', 'VARCHAR(255)')
  914. comment = meta_props.get('name_zh', '')
  915. column_def = f" {column_name} {data_type}"
  916. if comment:
  917. column_def += f" COMMENT '{comment}'"
  918. column_definitions.append(column_def)
  919. # 如果没有元数据,添加默认主键
  920. if not column_definitions:
  921. column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  922. # 5. 如果是目标表,添加create_time字段
  923. if is_target:
  924. column_definitions.append(" create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间'")
  925. ddl_lines.append(",\n".join(column_definitions))
  926. ddl_lines.append(");")
  927. # 添加表注释
  928. table_comment = node_props.get('name_zh', node_props.get('describe', table_name))
  929. if table_comment and table_comment != table_name:
  930. ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
  931. ddl_content = "\n".join(ddl_lines)
  932. # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
  933. data_source = None
  934. if label_name == '数据资源' and result['ds_type']:
  935. data_source = {
  936. 'type': result['ds_type'],
  937. 'host': result['ds_host'],
  938. 'port': result['ds_port'],
  939. 'database': result['ds_database']
  940. }
  941. logger.info(f"获取到数据源信息: {data_source}")
  942. logger.debug(f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}")
  943. return {
  944. 'ddl': ddl_content,
  945. 'table_name': table_name,
  946. 'data_source': data_source
  947. }
  948. except Exception as e:
  949. logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
  950. return None
  951. @staticmethod
  952. def _handle_script_relationships(data: Dict[str, Any],dataflow_name:str,name_en:str):
  953. """
  954. 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的DERIVED_FROM关系
  955. Args:
  956. data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode
  957. """
  958. try:
  959. # 从data中读取键值对
  960. script_name = dataflow_name,
  961. script_type = data.get('script_type', 'sql')
  962. schedule_status = data.get('status', 'inactive')
  963. source_table_full = data.get('source_table', '')
  964. target_table_full = data.get('target_table', '')
  965. update_mode = data.get('update_mode', 'full')
  966. # 处理source_table和target_table的格式
  967. source_table = source_table_full.split(':')[-1] if ':' in source_table_full else source_table_full
  968. target_table = target_table_full.split(':')[-1] if ':' in target_table_full else target_table_full
  969. source_label = source_table_full.split(':')[0] if ':' in source_table_full else source_table_full
  970. target_label = target_table_full.split(':')[0] if ':' in target_table_full else target_table_full
  971. # 验证必要字段
  972. if not source_table or not target_table:
  973. logger.warning(f"source_table或target_table为空,跳过关系创建: source_table={source_table}, target_table={target_table}")
  974. return
  975. logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
  976. with connect_graph().session() as session:
  977. # 创建或获取source和target节点
  978. create_nodes_query = f"""
  979. MERGE (source:{source_label} {{name: $source_table}})
  980. ON CREATE SET source.created_at = $created_at,
  981. source.type = 'source'
  982. WITH source
  983. MERGE (target:{target_label} {{name: $target_table}})
  984. ON CREATE SET target.created_at = $created_at,
  985. target.type = 'target'
  986. RETURN source, target, id(source) as source_id, id(target) as target_id
  987. """
  988. # 执行创建节点的查询
  989. result = session.run(create_nodes_query, { # type: ignore[arg-type]
  990. 'source_table': source_table,
  991. 'target_table': target_table,
  992. 'created_at': get_formatted_time()
  993. }).single()
  994. if result:
  995. source_id = result['source_id']
  996. target_id = result['target_id']
  997. # 检查并创建关系
  998. create_relationship_query = f"""
  999. MATCH (source:{source_label}), (target:{target_label})
  1000. WHERE id(source) = $source_id AND id(target) = $target_id
  1001. AND NOT EXISTS((target)-[:DERIVED_FROM]->(source))
  1002. CREATE (target)-[r:DERIVED_FROM]->(source)
  1003. SET r.script_name = $script_name,
  1004. r.script_type = $script_type,
  1005. r.schedule_status = $schedule_status,
  1006. r.update_mode = $update_mode,
  1007. r.created_at = $created_at,
  1008. r.updated_at = $created_at
  1009. RETURN r
  1010. """
  1011. relationship_result = session.run(create_relationship_query, { # type: ignore[arg-type]
  1012. 'source_id': source_id,
  1013. 'target_id': target_id,
  1014. 'script_name': script_name,
  1015. 'script_type': script_type,
  1016. 'schedule_status': schedule_status,
  1017. 'update_mode': update_mode,
  1018. 'created_at': get_formatted_time()
  1019. }).single()
  1020. if relationship_result:
  1021. logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})")
  1022. else:
  1023. logger.info(f"DERIVED_FROM关系已存在: {target_table} -> {source_table}")
  1024. else:
  1025. logger.error(f"创建表节点失败: source_table={source_table}, target_table={target_table}")
  1026. except Exception as e:
  1027. logger.error(f"处理脚本关系失败: {str(e)}")
  1028. raise e
  1029. @staticmethod
  1030. def get_business_domain_list() -> List[Dict[str, Any]]:
  1031. """
  1032. 获取BusinessDomain节点列表
  1033. Returns:
  1034. BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
  1035. """
  1036. try:
  1037. logger.info("开始查询BusinessDomain节点列表")
  1038. with connect_graph().session() as session:
  1039. # 查询所有BusinessDomain节点及其BELONGS_TO关系指向的标签
  1040. query = """
  1041. MATCH (bd:BusinessDomain)
  1042. OPTIONAL MATCH (bd)-[:BELONGS_TO]->(label:DataLabel)
  1043. RETURN id(bd) as id,
  1044. bd.name_zh as name_zh,
  1045. bd.name_en as name_en,
  1046. label.name_zh as tag
  1047. ORDER BY bd.create_time DESC
  1048. """
  1049. result = session.run(query)
  1050. bd_list = []
  1051. for record in result:
  1052. bd_item = {
  1053. "id": record["id"],
  1054. "name_zh": record["name_zh"] if record["name_zh"] else "",
  1055. "name_en": record["name_en"] if record["name_en"] else "",
  1056. "tag": record["tag"] if record["tag"] else ""
  1057. }
  1058. bd_list.append(bd_item)
  1059. logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
  1060. return bd_list
  1061. except Exception as e:
  1062. logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
  1063. raise e