dataflows.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746
  1. import logging
  2. from typing import Dict, List, Optional, Any
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_client
  6. from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
  7. from app.core.meta_data import translate_and_parse, get_formatted_time
  8. from py2neo import Relationship
  9. from app import db
  10. from sqlalchemy import text
  11. logger = logging.getLogger(__name__)
  12. class DataFlowService:
  13. """数据流服务类,处理数据流相关的业务逻辑"""
  14. @staticmethod
  15. def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
  16. """
  17. 获取数据流列表
  18. Args:
  19. page: 页码
  20. page_size: 每页大小
  21. search: 搜索关键词
  22. Returns:
  23. 包含数据流列表和分页信息的字典
  24. """
  25. try:
  26. # 从图数据库查询数据流列表
  27. skip_count = (page - 1) * page_size
  28. # 构建搜索条件
  29. where_clause = ""
  30. params = {'skip': skip_count, 'limit': page_size}
  31. if search:
  32. where_clause = "WHERE n.name CONTAINS $search OR n.description CONTAINS $search"
  33. params['search'] = search
  34. # 查询数据流列表
  35. query = f"""
  36. MATCH (n:DataFlow)
  37. {where_clause}
  38. RETURN n, id(n) as node_id
  39. ORDER BY n.created_at DESC
  40. SKIP $skip
  41. LIMIT $limit
  42. """
  43. with connect_graph().session() as session:
  44. list_result = session.run(query, **params).data()
  45. # 查询总数
  46. count_query = f"""
  47. MATCH (n:DataFlow)
  48. {where_clause}
  49. RETURN count(n) as total
  50. """
  51. count_params = {'search': search} if search else {}
  52. count_result = session.run(count_query, **count_params).single()
  53. total = count_result['total'] if count_result else 0
  54. # 格式化结果
  55. dataflows = []
  56. for record in list_result:
  57. node = record['n']
  58. dataflow = dict(node)
  59. dataflow['id'] = record['node_id'] # 使用查询返回的node_id
  60. dataflows.append(dataflow)
  61. return {
  62. 'list': dataflows,
  63. 'pagination': {
  64. 'page': page,
  65. 'page_size': page_size,
  66. 'total': total,
  67. 'total_pages': (total + page_size - 1) // page_size
  68. }
  69. }
  70. except Exception as e:
  71. logger.error(f"获取数据流列表失败: {str(e)}")
  72. raise e
  73. @staticmethod
  74. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  75. """
  76. 根据ID获取数据流详情
  77. Args:
  78. dataflow_id: 数据流ID
  79. Returns:
  80. 数据流详情字典,如果不存在则返回None
  81. """
  82. try:
  83. # 从Neo4j获取基本信息
  84. neo4j_query = """
  85. MATCH (n:DataFlow)
  86. WHERE id(n) = $dataflow_id
  87. OPTIONAL MATCH (n)-[:label]-(la:DataLabel)
  88. RETURN n, id(n) as node_id,
  89. collect(DISTINCT {id: id(la), name: la.name}) as tags
  90. """
  91. with connect_graph().session() as session:
  92. neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
  93. if not neo4j_result:
  94. return None
  95. record = neo4j_result[0]
  96. node = record['n']
  97. dataflow = dict(node)
  98. dataflow['id'] = record['node_id']
  99. dataflow['tags'] = record['tags']
  100. # 从PostgreSQL获取额外信息
  101. pg_query = """
  102. SELECT
  103. source_table,
  104. target_table,
  105. script_name,
  106. script_type,
  107. script_requirement,
  108. script_content,
  109. user_name,
  110. create_time,
  111. update_time,
  112. target_dt_column
  113. FROM dags.data_transform_scripts
  114. WHERE script_name = :script_name
  115. """
  116. with db.engine.connect() as conn:
  117. pg_result = conn.execute(text(pg_query), {"script_name": dataflow.get('name_zh')}).fetchone()
  118. if pg_result:
  119. # 将PostgreSQL数据添加到结果中
  120. dataflow.update({
  121. 'source_table': pg_result.source_table,
  122. 'target_table': pg_result.target_table,
  123. 'script_type': pg_result.script_type,
  124. 'script_requirement': pg_result.script_requirement,
  125. 'script_content': pg_result.script_content,
  126. 'created_by': pg_result.user_name,
  127. 'pg_created_at': pg_result.create_time,
  128. 'pg_updated_at': pg_result.update_time,
  129. 'target_dt_column': pg_result.target_dt_column
  130. })
  131. return dataflow
  132. except Exception as e:
  133. logger.error(f"获取数据流详情失败: {str(e)}")
  134. raise e
  135. @staticmethod
  136. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  137. """
  138. 创建新的数据流
  139. Args:
  140. data: 数据流配置数据
  141. Returns:
  142. 创建的数据流信息
  143. """
  144. try:
  145. # 验证必填字段
  146. required_fields = ['name_zh', 'describe']
  147. for field in required_fields:
  148. if field not in data:
  149. raise ValueError(f"缺少必填字段: {field}")
  150. dataflow_name = data['name_zh']
  151. # 使用LLM翻译名称生成英文名
  152. try:
  153. result_list = translate_and_parse(dataflow_name)
  154. name_en = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
  155. except Exception as e:
  156. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  157. name_en = dataflow_name.lower().replace(' ', '_')
  158. # 准备节点数据
  159. node_data = {
  160. 'name_zh': dataflow_name,
  161. 'name_en': name_en,
  162. 'category': data.get('category', ''),
  163. 'organization': data.get('organization', ''),
  164. 'leader': data.get('leader', ''),
  165. 'frequency': data.get('frequency', ''),
  166. 'tag': data.get('tag', ''),
  167. 'describe': data.get('describe', ''),
  168. 'status': data.get('status', 'inactive'),
  169. 'update_mode': data.get('update_mode', 'append'),
  170. 'created_at': get_formatted_time(),
  171. 'updated_at': get_formatted_time()
  172. }
  173. # 创建或获取数据流节点
  174. dataflow_id = get_node('DataFlow', name=dataflow_name)
  175. if dataflow_id:
  176. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  177. dataflow_id = create_or_get_node('DataFlow', **node_data)
  178. # 处理标签关系
  179. tag_id = data.get('tag')
  180. if tag_id is not None:
  181. try:
  182. DataFlowService._handle_tag_relationship(dataflow_id, tag_id)
  183. except Exception as e:
  184. logger.warning(f"处理标签关系时出错: {str(e)}")
  185. # 成功创建图数据库节点后,写入PG数据库
  186. try:
  187. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  188. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  189. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  190. try:
  191. DataFlowService._handle_script_relationships(data,dataflow_name,name_en)
  192. logger.info(f"脚本关系创建成功: {dataflow_name}")
  193. except Exception as script_error:
  194. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  195. except Exception as pg_error:
  196. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  197. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  198. # 在实际应用中,可能需要考虑分布式事务
  199. # 返回创建的数据流信息
  200. # 查询创建的节点获取完整信息
  201. query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
  202. with connect_graph().session() as session:
  203. id_result = session.run(query, name_zh=dataflow_name).single()
  204. if id_result:
  205. dataflow_node = id_result['n']
  206. node_id = id_result['node_id']
  207. # 将节点属性转换为字典
  208. result = dict(dataflow_node)
  209. result['id'] = node_id
  210. else:
  211. # 如果查询失败,返回基本信息
  212. result = {
  213. 'id': dataflow_id if isinstance(dataflow_id, int) else None,
  214. 'name_zh': dataflow_name,
  215. 'name_en': name_en,
  216. 'created_at': get_formatted_time()
  217. }
  218. logger.info(f"创建数据流成功: {dataflow_name}")
  219. return result
  220. except Exception as e:
  221. logger.error(f"创建数据流失败: {str(e)}")
  222. raise e
  223. @staticmethod
  224. def _save_to_pg_database(data: Dict[str, Any], script_name: str, name_en: str):
  225. """
  226. 将脚本信息保存到PG数据库
  227. Args:
  228. data: 包含脚本信息的数据
  229. script_name: 脚本名称
  230. name_en: 英文名称
  231. """
  232. try:
  233. # 提取脚本相关信息
  234. script_requirement = data.get('script_requirement', '')
  235. script_content = data.get('script_content', '')
  236. source_table = data.get('source_table', '').split(':')[-1] if ':' in data.get('source_table', '') else data.get('source_table', '')
  237. target_table = data.get('target_table', '').split(':')[-1] if ':' in data.get('target_table', '') else data.get('target_table', name_en) # 如果没有指定目标表,使用英文名
  238. script_type = data.get('script_type', 'python')
  239. user_name = data.get('created_by', 'system')
  240. target_dt_column = data.get('target_dt_column', '')
  241. # 验证必需字段
  242. if not target_table:
  243. target_table = name_en
  244. if not script_name:
  245. raise ValueError("script_name不能为空")
  246. # 构建插入SQL
  247. insert_sql = text("""
  248. INSERT INTO dags.data_transform_scripts
  249. (source_table, target_table, script_name, script_type, script_requirement,
  250. script_content, user_name, create_time, update_time, target_dt_column)
  251. VALUES
  252. (:source_table, :target_table, :script_name, :script_type, :script_requirement,
  253. :script_content, :user_name, :create_time, :update_time, :target_dt_column)
  254. ON CONFLICT (target_table, script_name)
  255. DO UPDATE SET
  256. source_table = EXCLUDED.source_table,
  257. script_type = EXCLUDED.script_type,
  258. script_requirement = EXCLUDED.script_requirement,
  259. script_content = EXCLUDED.script_content,
  260. user_name = EXCLUDED.user_name,
  261. update_time = EXCLUDED.update_time,
  262. target_dt_column = EXCLUDED.target_dt_column
  263. """)
  264. # 准备参数
  265. current_time = datetime.now()
  266. params = {
  267. 'source_table': source_table,
  268. 'target_table': target_table,
  269. 'script_name': script_name,
  270. 'script_type': script_type,
  271. 'script_requirement': script_requirement,
  272. 'script_content': script_content,
  273. 'user_name': user_name,
  274. 'create_time': current_time,
  275. 'update_time': current_time,
  276. 'target_dt_column': target_dt_column
  277. }
  278. # 执行插入操作
  279. db.session.execute(insert_sql, params)
  280. db.session.commit()
  281. logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")
  282. except Exception as e:
  283. db.session.rollback()
  284. logger.error(f"写入PG数据库失败: {str(e)}")
  285. raise e
  286. @staticmethod
  287. def _handle_children_relationships(dataflow_node, children_ids):
  288. """处理子节点关系"""
  289. logger.debug(f"处理子节点关系,原始children_ids: {children_ids}, 类型: {type(children_ids)}")
  290. # 确保children_ids是列表格式
  291. if not isinstance(children_ids, (list, tuple)):
  292. if children_ids is not None:
  293. children_ids = [children_ids] # 如果是单个值,转换为列表
  294. logger.debug(f"将单个值转换为列表: {children_ids}")
  295. else:
  296. children_ids = [] # 如果是None,转换为空列表
  297. logger.debug("将None转换为空列表")
  298. for child_id in children_ids:
  299. try:
  300. # 查找子节点
  301. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  302. with connect_graph().session() as session:
  303. result = session.run(query, child_id=child_id).data()
  304. if result:
  305. child_node = result[0]['n']
  306. # 获取dataflow_node的ID
  307. dataflow_id = getattr(dataflow_node, 'identity', None)
  308. if dataflow_id is None:
  309. # 如果没有identity属性,从名称查询ID
  310. query_id = "MATCH (n:DataFlow) WHERE n.name = $name RETURN id(n) as node_id"
  311. id_result = session.run(query_id, name=dataflow_node.get('name')).single()
  312. dataflow_id = id_result['node_id'] if id_result else None
  313. # 创建关系 - 使用ID调用relationship_exists
  314. if dataflow_id and not relationship_exists(dataflow_id, 'child', child_id):
  315. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $child_id CREATE (a)-[:child]->(b)",
  316. dataflow_id=dataflow_id, child_id=child_id)
  317. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  318. except Exception as e:
  319. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  320. @staticmethod
  321. def _handle_tag_relationship(dataflow_id, tag_id):
  322. """处理标签关系"""
  323. try:
  324. # 查找标签节点
  325. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  326. with connect_graph().session() as session:
  327. result = session.run(query, tag_id=tag_id).data()
  328. if result:
  329. tag_node = result[0]['n']
  330. # 创建关系 - 使用ID调用relationship_exists
  331. if dataflow_id and not relationship_exists(dataflow_id, 'label', tag_id):
  332. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $tag_id CREATE (a)-[:label]->(b)",
  333. dataflow_id=dataflow_id, tag_id=tag_id)
  334. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  335. except Exception as e:
  336. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  337. @staticmethod
  338. def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
  339. """
  340. 更新数据流
  341. Args:
  342. dataflow_id: 数据流ID
  343. data: 更新的数据
  344. Returns:
  345. 更新后的数据流信息,如果不存在则返回None
  346. """
  347. try:
  348. # 查找节点
  349. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  350. with connect_graph().session() as session:
  351. result = session.run(query, dataflow_id=dataflow_id).data()
  352. if not result:
  353. return None
  354. # 更新节点属性
  355. update_fields = []
  356. params = {'dataflow_id': dataflow_id}
  357. for key, value in data.items():
  358. if key not in ['id', 'created_at']: # 保护字段
  359. if key == 'config' and isinstance(value, dict):
  360. value = json.dumps(value, ensure_ascii=False)
  361. update_fields.append(f"n.{key} = ${key}")
  362. params[key] = value
  363. if update_fields:
  364. params['updated_at'] = get_formatted_time()
  365. update_fields.append("n.updated_at = $updated_at")
  366. update_query = f"""
  367. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  368. SET {', '.join(update_fields)}
  369. RETURN n, id(n) as node_id
  370. """
  371. result = session.run(update_query, **params).data()
  372. if result:
  373. node = result[0]['n']
  374. updated_dataflow = dict(node)
  375. updated_dataflow['id'] = result[0]['node_id'] # 使用查询返回的node_id
  376. logger.info(f"更新数据流成功: ID={dataflow_id}")
  377. return updated_dataflow
  378. return None
  379. except Exception as e:
  380. logger.error(f"更新数据流失败: {str(e)}")
  381. raise e
  382. @staticmethod
  383. def delete_dataflow(dataflow_id: int) -> bool:
  384. """
  385. 删除数据流
  386. Args:
  387. dataflow_id: 数据流ID
  388. Returns:
  389. 删除是否成功
  390. """
  391. try:
  392. # 删除节点及其关系
  393. query = """
  394. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  395. DETACH DELETE n
  396. RETURN count(n) as deleted_count
  397. """
  398. with connect_graph().session() as session:
  399. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  400. result = delete_result['deleted_count'] if delete_result else 0
  401. if result and result > 0:
  402. logger.info(f"删除数据流成功: ID={dataflow_id}")
  403. return True
  404. return False
  405. except Exception as e:
  406. logger.error(f"删除数据流失败: {str(e)}")
  407. raise e
  408. @staticmethod
  409. def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
  410. """
  411. 执行数据流
  412. Args:
  413. dataflow_id: 数据流ID
  414. params: 执行参数
  415. Returns:
  416. 执行结果信息
  417. """
  418. try:
  419. # 检查数据流是否存在
  420. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  421. with connect_graph().session() as session:
  422. result = session.run(query, dataflow_id=dataflow_id).data()
  423. if not result:
  424. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  425. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  426. # TODO: 这里应该实际执行数据流
  427. # 目前返回模拟结果
  428. result = {
  429. 'execution_id': execution_id,
  430. 'dataflow_id': dataflow_id,
  431. 'status': 'running',
  432. 'started_at': datetime.now().isoformat(),
  433. 'params': params or {},
  434. 'progress': 0
  435. }
  436. logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
  437. return result
  438. except Exception as e:
  439. logger.error(f"执行数据流失败: {str(e)}")
  440. raise e
  441. @staticmethod
  442. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  443. """
  444. 获取数据流执行状态
  445. Args:
  446. dataflow_id: 数据流ID
  447. Returns:
  448. 执行状态信息
  449. """
  450. try:
  451. # TODO: 这里应该查询实际的执行状态
  452. # 目前返回模拟状态
  453. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  454. with connect_graph().session() as session:
  455. result = session.run(query, dataflow_id=dataflow_id).data()
  456. if not result:
  457. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  458. status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
  459. return {
  460. 'dataflow_id': dataflow_id,
  461. 'status': status,
  462. 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
  463. 'started_at': datetime.now().isoformat(),
  464. 'completed_at': datetime.now().isoformat() if status == 'completed' else None,
  465. 'error_message': '执行过程中发生错误' if status == 'failed' else None
  466. }
  467. except Exception as e:
  468. logger.error(f"获取数据流状态失败: {str(e)}")
  469. raise e
  470. @staticmethod
  471. def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
  472. """
  473. 获取数据流执行日志
  474. Args:
  475. dataflow_id: 数据流ID
  476. page: 页码
  477. page_size: 每页大小
  478. Returns:
  479. 执行日志列表和分页信息
  480. """
  481. try:
  482. # TODO: 这里应该查询实际的执行日志
  483. # 目前返回模拟日志
  484. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  485. with connect_graph().session() as session:
  486. result = session.run(query, dataflow_id=dataflow_id).data()
  487. if not result:
  488. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  489. mock_logs = [
  490. {
  491. 'id': i,
  492. 'timestamp': datetime.now().isoformat(),
  493. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  494. 'message': f'数据流执行日志消息 {i}',
  495. 'component': ['source', 'transform', 'target'][i % 3]
  496. }
  497. for i in range(1, 101)
  498. ]
  499. # 分页处理
  500. total = len(mock_logs)
  501. start = (page - 1) * page_size
  502. end = start + page_size
  503. logs = mock_logs[start:end]
  504. return {
  505. 'logs': logs,
  506. 'pagination': {
  507. 'page': page,
  508. 'page_size': page_size,
  509. 'total': total,
  510. 'total_pages': (total + page_size - 1) // page_size
  511. }
  512. }
  513. except Exception as e:
  514. logger.error(f"获取数据流日志失败: {str(e)}")
  515. raise e
  516. @staticmethod
  517. def create_script(request_data: str) -> str:
  518. """
  519. 使用Deepseek模型生成脚本
  520. Args:
  521. request_data: 请求数据,用户需求的文本描述
  522. Returns:
  523. 生成的脚本内容(TXT格式)
  524. """
  525. try:
  526. # 构建prompt
  527. prompt_parts = []
  528. # 添加系统提示
  529. prompt_parts.append("请根据以下需求生成相应的数据处理脚本:")
  530. # 直接将request_data作为文本描述添加到prompt中
  531. prompt_parts.append(request_data)
  532. # 添加格式要求
  533. prompt_parts.append("\n请生成完整可执行的脚本代码,包含必要的注释和错误处理。")
  534. # 组合prompt
  535. full_prompt = "\n\n".join(prompt_parts)
  536. logger.info(f"开始调用Deepseek模型生成脚本,prompt长度: {len(full_prompt)}")
  537. # 调用LLM服务
  538. script_content = llm_client(full_prompt)
  539. if not script_content:
  540. raise ValueError("Deepseek模型返回空内容")
  541. # 确保返回的是文本格式
  542. if not isinstance(script_content, str):
  543. script_content = str(script_content)
  544. logger.info(f"脚本生成成功,内容长度: {len(script_content)}")
  545. return script_content
  546. except Exception as e:
  547. logger.error(f"生成脚本失败: {str(e)}")
  548. raise e
  549. @staticmethod
  550. def _handle_script_relationships(data: Dict[str, Any],dataflow_name:str,name_en:str):
  551. """
  552. 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的DERIVED_FROM关系
  553. Args:
  554. data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode
  555. """
  556. try:
  557. # 从data中读取键值对
  558. script_name = dataflow_name,
  559. script_type = data.get('script_type', 'sql')
  560. schedule_status = data.get('status', 'inactive')
  561. source_table_full = data.get('source_table', '')
  562. target_table_full = data.get('target_table', '')
  563. update_mode = data.get('update_mode', 'full')
  564. # 处理source_table和target_table的格式
  565. source_table = source_table_full.split(':')[-1] if ':' in source_table_full else source_table_full
  566. target_table = target_table_full.split(':')[-1] if ':' in target_table_full else target_table_full
  567. source_label = source_table_full.split(':')[0] if ':' in source_table_full else source_table_full
  568. target_label = target_table_full.split(':')[0] if ':' in target_table_full else target_table_full
  569. # 验证必要字段
  570. if not source_table or not target_table:
  571. logger.warning(f"source_table或target_table为空,跳过关系创建: source_table={source_table}, target_table={target_table}")
  572. return
  573. logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
  574. with connect_graph().session() as session:
  575. # 创建或获取source和target节点
  576. create_nodes_query = f"""
  577. MERGE (source:{source_label} {{name: $source_table}})
  578. ON CREATE SET source.created_at = $created_at,
  579. source.type = 'source'
  580. WITH source
  581. MERGE (target:{target_label} {{name: $target_table}})
  582. ON CREATE SET target.created_at = $created_at,
  583. target.type = 'target'
  584. RETURN source, target, id(source) as source_id, id(target) as target_id
  585. """
  586. # 执行创建节点的查询
  587. result = session.run(create_nodes_query,
  588. source_table=source_table,
  589. target_table=target_table,
  590. created_at=get_formatted_time()).single()
  591. if result:
  592. source_id = result['source_id']
  593. target_id = result['target_id']
  594. # 检查并创建关系
  595. create_relationship_query = f"""
  596. MATCH (source:{source_label}), (target:{target_label})
  597. WHERE id(source) = $source_id AND id(target) = $target_id
  598. AND NOT EXISTS((target)-[:DERIVED_FROM]->(source))
  599. CREATE (target)-[r:DERIVED_FROM]->(source)
  600. SET r.script_name = $script_name,
  601. r.script_type = $script_type,
  602. r.schedule_status = $schedule_status,
  603. r.update_mode = $update_mode,
  604. r.created_at = $created_at,
  605. r.updated_at = $created_at
  606. RETURN r
  607. """
  608. relationship_result = session.run(create_relationship_query,
  609. source_id=source_id,
  610. target_id=target_id,
  611. script_name=script_name,
  612. script_type=script_type,
  613. schedule_status=schedule_status,
  614. update_mode=update_mode,
  615. created_at=get_formatted_time()).single()
  616. if relationship_result:
  617. logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})")
  618. else:
  619. logger.info(f"DERIVED_FROM关系已存在: {target_table} -> {source_table}")
  620. else:
  621. logger.error(f"创建表节点失败: source_table={source_table}, target_table={target_table}")
  622. except Exception as e:
  623. logger.error(f"处理脚本关系失败: {str(e)}")
  624. raise e