dataflows.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. import logging
  2. from typing import Dict, List, Optional, Any
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_client
  6. from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
  7. from app.core.meta_data import translate_and_parse, get_formatted_time
  8. from py2neo import Relationship
  9. from app import db
  10. from sqlalchemy import text
  11. logger = logging.getLogger(__name__)
  12. class DataFlowService:
  13. """数据流服务类,处理数据流相关的业务逻辑"""
  14. @staticmethod
  15. def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
  16. """
  17. 获取数据流列表
  18. Args:
  19. page: 页码
  20. page_size: 每页大小
  21. search: 搜索关键词
  22. Returns:
  23. 包含数据流列表和分页信息的字典
  24. """
  25. try:
  26. # 从图数据库查询数据流列表
  27. skip_count = (page - 1) * page_size
  28. # 构建搜索条件
  29. where_clause = ""
  30. params = {'skip': skip_count, 'limit': page_size}
  31. if search:
  32. where_clause = "WHERE n.name CONTAINS $search OR n.description CONTAINS $search"
  33. params['search'] = search
  34. # 查询数据流列表
  35. query = f"""
  36. MATCH (n:DataFlow)
  37. {where_clause}
  38. RETURN n, id(n) as node_id
  39. ORDER BY n.created_at DESC
  40. SKIP $skip
  41. LIMIT $limit
  42. """
  43. with connect_graph().session() as session:
  44. list_result = session.run(query, **params).data()
  45. # 查询总数
  46. count_query = f"""
  47. MATCH (n:DataFlow)
  48. {where_clause}
  49. RETURN count(n) as total
  50. """
  51. count_params = {'search': search} if search else {}
  52. count_result = session.run(count_query, **count_params).single()
  53. total = count_result['total'] if count_result else 0
  54. # 格式化结果
  55. dataflows = []
  56. for record in list_result:
  57. node = record['n']
  58. dataflow = dict(node)
  59. dataflow['id'] = record['node_id'] # 使用查询返回的node_id
  60. dataflows.append(dataflow)
  61. return {
  62. 'list': dataflows,
  63. 'pagination': {
  64. 'page': page,
  65. 'page_size': page_size,
  66. 'total': total,
  67. 'total_pages': (total + page_size - 1) // page_size
  68. }
  69. }
  70. except Exception as e:
  71. logger.error(f"获取数据流列表失败: {str(e)}")
  72. raise e
  73. @staticmethod
  74. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  75. """
  76. 根据ID获取数据流详情
  77. Args:
  78. dataflow_id: 数据流ID
  79. Returns:
  80. 数据流详情字典,如果不存在则返回None
  81. """
  82. try:
  83. query = """
  84. MATCH (n:DataFlow)
  85. WHERE id(n) = $dataflow_id
  86. OPTIONAL MATCH (n)-[:label]-(la:data_label)
  87. OPTIONAL MATCH (n)-[:child]-(child)
  88. OPTIONAL MATCH (parent)-[:child]-(n)
  89. RETURN n, id(n) as node_id,
  90. collect(DISTINCT {id: id(la), name: la.name}) as tags,
  91. collect(DISTINCT {id: id(child), name: child.name}) as children,
  92. collect(DISTINCT {id: id(parent), name: parent.name}) as parents
  93. """
  94. with connect_graph().session() as session:
  95. result = session.run(query, dataflow_id=dataflow_id).data()
  96. if not result:
  97. return None
  98. record = result[0]
  99. node = record['n']
  100. dataflow = dict(node)
  101. dataflow['id'] = record['node_id'] # 使用查询返回的node_id
  102. dataflow['tags'] = record['tags']
  103. dataflow['children'] = record['children']
  104. dataflow['parents'] = record['parents']
  105. return dataflow
  106. except Exception as e:
  107. logger.error(f"获取数据流详情失败: {str(e)}")
  108. raise e
  109. @staticmethod
  110. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  111. """
  112. 创建新的数据流
  113. Args:
  114. data: 数据流配置数据
  115. Returns:
  116. 创建的数据流信息
  117. """
  118. try:
  119. # 验证必填字段
  120. required_fields = ['name', 'describe']
  121. for field in required_fields:
  122. if field not in data:
  123. raise ValueError(f"缺少必填字段: {field}")
  124. dataflow_name = data['name']
  125. # 使用LLM翻译名称生成英文名
  126. try:
  127. result_list = translate_and_parse(dataflow_name)
  128. name_en = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
  129. except Exception as e:
  130. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  131. name_en = dataflow_name.lower().replace(' ', '_')
  132. # 准备节点数据
  133. node_data = {
  134. 'name_zh': dataflow_name,
  135. 'name_en': name_en,
  136. 'category': data.get('category', ''),
  137. 'organization': data.get('organization', ''),
  138. 'leader': data.get('leader', ''),
  139. 'frequency': data.get('frequency', ''),
  140. 'tag': data.get('tag', ''),
  141. 'describe': data.get('describe', ''),
  142. 'status': data.get('status', 'inactive'),
  143. 'created_at': get_formatted_time(),
  144. 'updated_at': get_formatted_time()
  145. }
  146. # 创建或获取数据流节点
  147. dataflow_id = get_node('DataFlow', name=dataflow_name)
  148. if dataflow_id:
  149. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  150. dataflow_id = create_or_get_node('DataFlow', **node_data)
  151. # 处理标签关系
  152. tag_id = data.get('tag')
  153. if tag_id is not None:
  154. try:
  155. DataFlowService._handle_tag_relationship(dataflow_id, tag_id)
  156. except Exception as e:
  157. logger.warning(f"处理标签关系时出错: {str(e)}")
  158. # 成功创建图数据库节点后,写入PG数据库
  159. try:
  160. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  161. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  162. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  163. try:
  164. DataFlowService._handle_script_relationships(data,dataflow_name,name_en)
  165. logger.info(f"脚本关系创建成功: {dataflow_name}")
  166. except Exception as script_error:
  167. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  168. except Exception as pg_error:
  169. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  170. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  171. # 在实际应用中,可能需要考虑分布式事务
  172. # 返回创建的数据流信息
  173. # 查询创建的节点获取完整信息
  174. query = "MATCH (n:DataFlow {name: $name}) RETURN n, id(n) as node_id"
  175. with connect_graph().session() as session:
  176. id_result = session.run(query, name=dataflow_name).single()
  177. if id_result:
  178. dataflow_node = id_result['n']
  179. node_id = id_result['node_id']
  180. # 将节点属性转换为字典
  181. result = dict(dataflow_node)
  182. result['id'] = node_id
  183. else:
  184. # 如果查询失败,返回基本信息
  185. result = {
  186. 'id': dataflow_id if isinstance(dataflow_id, int) else None,
  187. 'name': dataflow_name,
  188. 'name_en': name_en,
  189. 'created_at': get_formatted_time()
  190. }
  191. logger.info(f"创建数据流成功: {dataflow_name}")
  192. return result
  193. except Exception as e:
  194. logger.error(f"创建数据流失败: {str(e)}")
  195. raise e
  196. @staticmethod
  197. def _save_to_pg_database(data: Dict[str, Any], script_name: str, name_en: str):
  198. """
  199. 将脚本信息保存到PG数据库
  200. Args:
  201. data: 包含脚本信息的数据
  202. script_name: 脚本名称
  203. en_name: 英文名称
  204. """
  205. try:
  206. # 提取脚本相关信息
  207. script_requirement = data.get('script_requirement', '')
  208. script_content = data.get('script_content', '')
  209. source_table = data.get('source_table', '')
  210. target_table = data.get('target_table', name_en) # 如果没有指定目标表,使用英文名
  211. script_type = data.get('script_type', 'python')
  212. user_name = data.get('created_by', 'system')
  213. target_dt_column = data.get('target_dt_column', '')
  214. # 验证必需字段
  215. if not target_table:
  216. target_table = name_en
  217. if not script_name:
  218. raise ValueError("script_name不能为空")
  219. # 构建插入SQL
  220. insert_sql = text("""
  221. INSERT INTO dags.data_transform_scripts
  222. (source_table, target_table, script_name, script_type, script_requirement,
  223. script_content, user_name, create_time, update_time, target_dt_column)
  224. VALUES
  225. (:source_table, :target_table, :script_name, :script_type, :script_requirement,
  226. :script_content, :user_name, :create_time, :update_time, :target_dt_column)
  227. ON CONFLICT (target_table, script_name)
  228. DO UPDATE SET
  229. source_table = EXCLUDED.source_table,
  230. script_type = EXCLUDED.script_type,
  231. script_requirement = EXCLUDED.script_requirement,
  232. script_content = EXCLUDED.script_content,
  233. user_name = EXCLUDED.user_name,
  234. update_time = EXCLUDED.update_time,
  235. target_dt_column = EXCLUDED.target_dt_column
  236. """)
  237. # 准备参数
  238. current_time = datetime.now()
  239. params = {
  240. 'source_table': source_table,
  241. 'target_table': target_table,
  242. 'script_name': script_name,
  243. 'script_type': script_type,
  244. 'script_requirement': script_requirement,
  245. 'script_content': script_content,
  246. 'user_name': user_name,
  247. 'create_time': current_time,
  248. 'update_time': current_time,
  249. 'target_dt_column': target_dt_column
  250. }
  251. # 执行插入操作
  252. db.session.execute(insert_sql, params)
  253. db.session.commit()
  254. logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")
  255. except Exception as e:
  256. db.session.rollback()
  257. logger.error(f"写入PG数据库失败: {str(e)}")
  258. raise e
  259. @staticmethod
  260. def _handle_children_relationships(dataflow_node, children_ids):
  261. """处理子节点关系"""
  262. logger.debug(f"处理子节点关系,原始children_ids: {children_ids}, 类型: {type(children_ids)}")
  263. # 确保children_ids是列表格式
  264. if not isinstance(children_ids, (list, tuple)):
  265. if children_ids is not None:
  266. children_ids = [children_ids] # 如果是单个值,转换为列表
  267. logger.debug(f"将单个值转换为列表: {children_ids}")
  268. else:
  269. children_ids = [] # 如果是None,转换为空列表
  270. logger.debug("将None转换为空列表")
  271. for child_id in children_ids:
  272. try:
  273. # 查找子节点
  274. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  275. with connect_graph().session() as session:
  276. result = session.run(query, child_id=child_id).data()
  277. if result:
  278. child_node = result[0]['n']
  279. # 获取dataflow_node的ID
  280. dataflow_id = getattr(dataflow_node, 'identity', None)
  281. if dataflow_id is None:
  282. # 如果没有identity属性,从名称查询ID
  283. query_id = "MATCH (n:DataFlow) WHERE n.name = $name RETURN id(n) as node_id"
  284. id_result = session.run(query_id, name=dataflow_node.get('name')).single()
  285. dataflow_id = id_result['node_id'] if id_result else None
  286. # 创建关系 - 使用ID调用relationship_exists
  287. if dataflow_id and not relationship_exists(dataflow_id, 'child', child_id):
  288. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $child_id CREATE (a)-[:child]->(b)",
  289. dataflow_id=dataflow_id, child_id=child_id)
  290. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  291. except Exception as e:
  292. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  293. @staticmethod
  294. def _handle_tag_relationship(dataflow_id, tag_id):
  295. """处理标签关系"""
  296. try:
  297. # 查找标签节点
  298. query = "MATCH (n:data_label) WHERE id(n) = $tag_id RETURN n"
  299. with connect_graph().session() as session:
  300. result = session.run(query, tag_id=tag_id).data()
  301. if result:
  302. tag_node = result[0]['n']
  303. # 创建关系 - 使用ID调用relationship_exists
  304. if dataflow_id and not relationship_exists(dataflow_id, 'label', tag_id):
  305. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $tag_id CREATE (a)-[:label]->(b)",
  306. dataflow_id=dataflow_id, tag_id=tag_id)
  307. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  308. except Exception as e:
  309. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  310. @staticmethod
  311. def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
  312. """
  313. 更新数据流
  314. Args:
  315. dataflow_id: 数据流ID
  316. data: 更新的数据
  317. Returns:
  318. 更新后的数据流信息,如果不存在则返回None
  319. """
  320. try:
  321. # 查找节点
  322. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  323. with connect_graph().session() as session:
  324. result = session.run(query, dataflow_id=dataflow_id).data()
  325. if not result:
  326. return None
  327. # 更新节点属性
  328. update_fields = []
  329. params = {'dataflow_id': dataflow_id}
  330. for key, value in data.items():
  331. if key not in ['id', 'created_at']: # 保护字段
  332. if key == 'config' and isinstance(value, dict):
  333. value = json.dumps(value, ensure_ascii=False)
  334. update_fields.append(f"n.{key} = ${key}")
  335. params[key] = value
  336. if update_fields:
  337. params['updated_at'] = get_formatted_time()
  338. update_fields.append("n.updated_at = $updated_at")
  339. update_query = f"""
  340. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  341. SET {', '.join(update_fields)}
  342. RETURN n, id(n) as node_id
  343. """
  344. result = session.run(update_query, **params).data()
  345. if result:
  346. node = result[0]['n']
  347. updated_dataflow = dict(node)
  348. updated_dataflow['id'] = result[0]['node_id'] # 使用查询返回的node_id
  349. logger.info(f"更新数据流成功: ID={dataflow_id}")
  350. return updated_dataflow
  351. return None
  352. except Exception as e:
  353. logger.error(f"更新数据流失败: {str(e)}")
  354. raise e
  355. @staticmethod
  356. def delete_dataflow(dataflow_id: int) -> bool:
  357. """
  358. 删除数据流
  359. Args:
  360. dataflow_id: 数据流ID
  361. Returns:
  362. 删除是否成功
  363. """
  364. try:
  365. # 删除节点及其关系
  366. query = """
  367. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  368. DETACH DELETE n
  369. RETURN count(n) as deleted_count
  370. """
  371. with connect_graph().session() as session:
  372. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  373. result = delete_result['deleted_count'] if delete_result else 0
  374. if result and result > 0:
  375. logger.info(f"删除数据流成功: ID={dataflow_id}")
  376. return True
  377. return False
  378. except Exception as e:
  379. logger.error(f"删除数据流失败: {str(e)}")
  380. raise e
  381. @staticmethod
  382. def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
  383. """
  384. 执行数据流
  385. Args:
  386. dataflow_id: 数据流ID
  387. params: 执行参数
  388. Returns:
  389. 执行结果信息
  390. """
  391. try:
  392. # 检查数据流是否存在
  393. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  394. with connect_graph().session() as session:
  395. result = session.run(query, dataflow_id=dataflow_id).data()
  396. if not result:
  397. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  398. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  399. # TODO: 这里应该实际执行数据流
  400. # 目前返回模拟结果
  401. result = {
  402. 'execution_id': execution_id,
  403. 'dataflow_id': dataflow_id,
  404. 'status': 'running',
  405. 'started_at': datetime.now().isoformat(),
  406. 'params': params or {},
  407. 'progress': 0
  408. }
  409. logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
  410. return result
  411. except Exception as e:
  412. logger.error(f"执行数据流失败: {str(e)}")
  413. raise e
  414. @staticmethod
  415. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  416. """
  417. 获取数据流执行状态
  418. Args:
  419. dataflow_id: 数据流ID
  420. Returns:
  421. 执行状态信息
  422. """
  423. try:
  424. # TODO: 这里应该查询实际的执行状态
  425. # 目前返回模拟状态
  426. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  427. with connect_graph().session() as session:
  428. result = session.run(query, dataflow_id=dataflow_id).data()
  429. if not result:
  430. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  431. status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
  432. return {
  433. 'dataflow_id': dataflow_id,
  434. 'status': status,
  435. 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
  436. 'started_at': datetime.now().isoformat(),
  437. 'completed_at': datetime.now().isoformat() if status == 'completed' else None,
  438. 'error_message': '执行过程中发生错误' if status == 'failed' else None
  439. }
  440. except Exception as e:
  441. logger.error(f"获取数据流状态失败: {str(e)}")
  442. raise e
  443. @staticmethod
  444. def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
  445. """
  446. 获取数据流执行日志
  447. Args:
  448. dataflow_id: 数据流ID
  449. page: 页码
  450. page_size: 每页大小
  451. Returns:
  452. 执行日志列表和分页信息
  453. """
  454. try:
  455. # TODO: 这里应该查询实际的执行日志
  456. # 目前返回模拟日志
  457. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  458. with connect_graph().session() as session:
  459. result = session.run(query, dataflow_id=dataflow_id).data()
  460. if not result:
  461. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  462. mock_logs = [
  463. {
  464. 'id': i,
  465. 'timestamp': datetime.now().isoformat(),
  466. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  467. 'message': f'数据流执行日志消息 {i}',
  468. 'component': ['source', 'transform', 'target'][i % 3]
  469. }
  470. for i in range(1, 101)
  471. ]
  472. # 分页处理
  473. total = len(mock_logs)
  474. start = (page - 1) * page_size
  475. end = start + page_size
  476. logs = mock_logs[start:end]
  477. return {
  478. 'logs': logs,
  479. 'pagination': {
  480. 'page': page,
  481. 'page_size': page_size,
  482. 'total': total,
  483. 'total_pages': (total + page_size - 1) // page_size
  484. }
  485. }
  486. except Exception as e:
  487. logger.error(f"获取数据流日志失败: {str(e)}")
  488. raise e
  489. @staticmethod
  490. def create_script(request_data: str) -> str:
  491. """
  492. 使用Deepseek模型生成脚本
  493. Args:
  494. request_data: 请求数据,用户需求的文本描述
  495. Returns:
  496. 生成的脚本内容(TXT格式)
  497. """
  498. try:
  499. # 构建prompt
  500. prompt_parts = []
  501. # 添加系统提示
  502. prompt_parts.append("请根据以下需求生成相应的数据处理脚本:")
  503. # 直接将request_data作为文本描述添加到prompt中
  504. prompt_parts.append(request_data)
  505. # 添加格式要求
  506. prompt_parts.append("\n请生成完整可执行的脚本代码,包含必要的注释和错误处理。")
  507. # 组合prompt
  508. full_prompt = "\n\n".join(prompt_parts)
  509. logger.info(f"开始调用Deepseek模型生成脚本,prompt长度: {len(full_prompt)}")
  510. # 调用LLM服务
  511. script_content = llm_client(full_prompt)
  512. if not script_content:
  513. raise ValueError("Deepseek模型返回空内容")
  514. # 确保返回的是文本格式
  515. if not isinstance(script_content, str):
  516. script_content = str(script_content)
  517. logger.info(f"脚本生成成功,内容长度: {len(script_content)}")
  518. return script_content
  519. except Exception as e:
  520. logger.error(f"生成脚本失败: {str(e)}")
  521. raise e
  522. @staticmethod
  523. def _handle_script_relationships(data: Dict[str, Any],dataflow_name:str,name_en:str):
  524. """
  525. 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的DERIVED_FROM关系
  526. Args:
  527. data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode
  528. """
  529. try:
  530. # 从data中读取键值对
  531. script_name = dataflow_name,
  532. script_type = data.get('script_type', 'sql')
  533. schedule_status = data.get('status', 'inactive')
  534. source_table = data.get('source_table', '')
  535. target_table = data.get('target_table', '')
  536. update_mode = data.get('update_mode', 'full')
  537. # 验证必要字段
  538. if not source_table or not target_table:
  539. logger.warning(f"source_table或target_table为空,跳过关系创建: source_table={source_table}, target_table={target_table}")
  540. return
  541. logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
  542. with connect_graph().session() as session:
  543. # 创建或获取source_table节点
  544. source_node_query = """
  545. MERGE (source:table {name: $source_table})
  546. ON CREATE SET source.created_at = $created_at,
  547. source.type = 'source'
  548. RETURN source, id(source) as source_id
  549. """
  550. # 创建或获取target_table节点
  551. target_node_query = """
  552. MERGE (target:table {name: $target_table})
  553. ON CREATE SET target.created_at = $created_at,
  554. target.type = 'target'
  555. RETURN target, id(target) as target_id
  556. """
  557. current_time = get_formatted_time()
  558. # 执行创建节点的查询
  559. source_result = session.run(source_node_query,
  560. source_table=source_table,
  561. created_at=current_time).single()
  562. target_result = session.run(target_node_query,
  563. target_table=target_table,
  564. created_at=current_time).single()
  565. if source_result and target_result:
  566. source_id = source_result['source_id']
  567. target_id = target_result['target_id']
  568. # 检查关系是否已存在
  569. relationship_check_query = """
  570. MATCH (source:table)-[r:DERIVED_FROM]-(target:table)
  571. WHERE id(source) = $source_id AND id(target) = $target_id
  572. RETURN r
  573. """
  574. existing_relationship = session.run(relationship_check_query,
  575. source_id=source_id,
  576. target_id=target_id).single()
  577. if not existing_relationship:
  578. # 创建DERIVED_FROM关系,从source_table到target_table
  579. create_relationship_query = """
  580. MATCH (source:table), (target:table)
  581. WHERE id(source) = $source_id AND id(target) = $target_id
  582. CREATE (target)-[r:DERIVED_FROM]->(source)
  583. SET r.script_name = $script_name,
  584. r.script_type = $script_type,
  585. r.schedule_status = $schedule_status,
  586. r.update_mode = $update_mode,
  587. r.created_at = $created_at,
  588. r.updated_at = $created_at
  589. RETURN r
  590. """
  591. session.run(create_relationship_query,
  592. source_id=source_id,
  593. target_id=target_id,
  594. script_name=script_name,
  595. script_type=script_type,
  596. schedule_status=schedule_status,
  597. update_mode=update_mode,
  598. created_at=current_time)
  599. logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})")
  600. else:
  601. logger.info(f"DERIVED_FROM关系已存在: {target_table} -> {source_table}")
  602. else:
  603. logger.error(f"创建表节点失败: source_table={source_table}, target_table={target_table}")
  604. except Exception as e:
  605. logger.error(f"处理脚本关系失败: {str(e)}")
  606. raise e