dataflows.py 69 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717
  1. import logging
  2. from typing import Dict, List, Optional, Any, Union
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_sql
  6. from app.core.graph.graph_operations import (
  7. connect_graph,
  8. create_or_get_node,
  9. get_node,
  10. relationship_exists,
  11. )
  12. from app.core.meta_data import translate_and_parse, get_formatted_time
  13. from app import db
  14. from sqlalchemy import text
  15. logger = logging.getLogger(__name__)
  16. class DataFlowService:
  17. """数据流服务类,处理数据流相关的业务逻辑"""
  18. @staticmethod
  19. def get_dataflows(
  20. page: int = 1,
  21. page_size: int = 10,
  22. search: str = '',
  23. ) -> Dict[str, Any]:
  24. """
  25. 获取数据流列表
  26. Args:
  27. page: 页码
  28. page_size: 每页大小
  29. search: 搜索关键词
  30. Returns:
  31. 包含数据流列表和分页信息的字典
  32. """
  33. try:
  34. # 从图数据库查询数据流列表
  35. skip_count = (page - 1) * page_size
  36. # 构建搜索条件
  37. where_clause = ""
  38. params: Dict[str, Union[int, str]] = {
  39. 'skip': skip_count,
  40. 'limit': page_size,
  41. }
  42. if search:
  43. where_clause = (
  44. "WHERE n.name_zh CONTAINS $search OR "
  45. "n.description CONTAINS $search"
  46. )
  47. params['search'] = search
  48. # 查询数据流列表(包含标签数组)
  49. # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
  50. query = f"""
  51. MATCH (n:DataFlow)
  52. {where_clause}
  53. WITH n
  54. ORDER BY n.created_at DESC
  55. SKIP $skip
  56. LIMIT $limit
  57. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  58. RETURN n, id(n) as node_id,
  59. n.created_at as created_at,
  60. collect({{
  61. id: id(label),
  62. name_zh: label.name_zh,
  63. name_en: label.name_en
  64. }}) as tags
  65. """
  66. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  67. try:
  68. with connect_graph().session() as session:
  69. list_result = session.run(query, params).data()
  70. # 查询总数
  71. count_query = f"""
  72. MATCH (n:DataFlow)
  73. {where_clause}
  74. RETURN count(n) as total
  75. """
  76. count_params = {'search': search} if search else {}
  77. count_result = session.run(
  78. count_query, count_params).single()
  79. total = count_result['total'] if count_result else 0
  80. except Exception as e:
  81. # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭
  82. # driver,因为connect_graph可能返回单例或新实例。如果是新实例,
  83. # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭,
  84. # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。
  85. # 用户反馈:The driver.close() call prematurely closes a shared
  86. # driver instance,所以直接使用 session,并不关闭 driver。
  87. logger.error(f"查询数据流失败: {str(e)}")
  88. raise e
  89. # 格式化结果
  90. dataflows = []
  91. for record in list_result:
  92. node = record['n']
  93. dataflow = dict(node)
  94. dataflow['id'] = record['node_id'] # 使用查询返回的node_id
  95. # 处理标签数组,过滤掉空标签
  96. tags = record.get('tags', [])
  97. dataflow['tag'] = [
  98. tag for tag in tags
  99. if tag.get('id') is not None
  100. ]
  101. dataflows.append(dataflow)
  102. return {
  103. 'list': dataflows,
  104. 'pagination': {
  105. 'page': page,
  106. 'page_size': page_size,
  107. 'total': total,
  108. 'total_pages': (total + page_size - 1) // page_size
  109. }
  110. }
  111. except Exception as e:
  112. logger.error(f"获取数据流列表失败: {str(e)}")
  113. raise e
  114. @staticmethod
  115. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  116. """
  117. 根据ID获取数据流详情
  118. Args:
  119. dataflow_id: 数据流ID
  120. Returns:
  121. 数据流详情字典,如果不存在则返回None
  122. """
  123. try:
  124. # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
  125. neo4j_query = """
  126. MATCH (n:DataFlow)
  127. WHERE id(n) = $dataflow_id
  128. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  129. RETURN n, id(n) as node_id,
  130. collect({
  131. id: id(label),
  132. name_zh: label.name_zh,
  133. name_en: label.name_en
  134. }) as tags
  135. """
  136. with connect_graph().session() as session:
  137. neo4j_result = session.run(
  138. neo4j_query, dataflow_id=dataflow_id).data()
  139. if not neo4j_result:
  140. logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
  141. return None
  142. record = neo4j_result[0]
  143. node = record['n']
  144. # 将节点属性转换为字典
  145. dataflow = dict(node)
  146. dataflow['id'] = record['node_id']
  147. # 处理标签数组,过滤掉空标签
  148. tags = record.get('tags', [])
  149. dataflow['tag'] = [
  150. tag for tag in tags
  151. if tag.get('id') is not None
  152. ]
  153. # 处理 script_requirement:如果是JSON字符串,解析为对象
  154. script_requirement_str = dataflow.get('script_requirement', '')
  155. if script_requirement_str:
  156. try:
  157. # 尝试解析JSON字符串
  158. script_requirement_obj = json.loads(
  159. script_requirement_str)
  160. dataflow['script_requirement'] = script_requirement_obj
  161. logger.debug(
  162. "成功解析script_requirement: %s",
  163. script_requirement_obj,
  164. )
  165. except (json.JSONDecodeError, TypeError) as e:
  166. logger.warning(f"script_requirement解析失败,保持原值: {e}")
  167. # 保持原值(字符串)
  168. dataflow['script_requirement'] = script_requirement_str
  169. else:
  170. # 如果为空,设置为None
  171. dataflow['script_requirement'] = None
  172. logger.info(
  173. "成功获取DataFlow详情,ID: %s, 名称: %s",
  174. dataflow_id,
  175. dataflow.get('name_zh'),
  176. )
  177. return dataflow
  178. except Exception as e:
  179. logger.error(f"获取数据流详情失败: {str(e)}")
  180. raise e
  181. @staticmethod
  182. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  183. """
  184. 创建新的数据流
  185. Args:
  186. data: 数据流配置数据
  187. Returns:
  188. 创建的数据流信息
  189. """
  190. try:
  191. # 验证必填字段
  192. required_fields = ['name_zh', 'describe']
  193. for field in required_fields:
  194. if field not in data:
  195. raise ValueError(f"缺少必填字段: {field}")
  196. dataflow_name = data['name_zh']
  197. # 使用LLM翻译名称生成英文名
  198. try:
  199. result_list = translate_and_parse(dataflow_name)
  200. name_en = (
  201. result_list[0]
  202. if result_list
  203. else dataflow_name.lower().replace(' ', '_')
  204. )
  205. except Exception as e:
  206. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  207. name_en = dataflow_name.lower().replace(' ', '_')
  208. # 处理 script_requirement,将其转换为 JSON 字符串
  209. script_requirement = data.get('script_requirement', None)
  210. if script_requirement is not None:
  211. # 如果是字典或列表,转换为 JSON 字符串
  212. if isinstance(script_requirement, (dict, list)):
  213. script_requirement_str = json.dumps(
  214. script_requirement, ensure_ascii=False)
  215. else:
  216. # 如果已经是字符串,直接使用
  217. script_requirement_str = str(script_requirement)
  218. else:
  219. script_requirement_str = ''
  220. # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
  221. node_data = {
  222. 'name_zh': dataflow_name,
  223. 'name_en': name_en,
  224. 'category': data.get('category', ''),
  225. 'organization': data.get('organization', ''),
  226. 'leader': data.get('leader', ''),
  227. 'frequency': data.get('frequency', ''),
  228. 'describe': data.get('describe', ''),
  229. 'status': data.get('status', 'inactive'),
  230. 'update_mode': data.get('update_mode', 'append'),
  231. 'script_type': data.get('script_type', 'python'),
  232. 'script_requirement': script_requirement_str,
  233. 'created_at': get_formatted_time(),
  234. 'updated_at': get_formatted_time()
  235. }
  236. # 创建或获取数据流节点
  237. dataflow_id = get_node('DataFlow', name=dataflow_name)
  238. if dataflow_id:
  239. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  240. dataflow_id = create_or_get_node('DataFlow', **node_data)
  241. # 处理标签关系(支持多标签数组)
  242. tag_list = data.get('tag', [])
  243. if tag_list:
  244. try:
  245. DataFlowService._handle_tag_relationships(
  246. dataflow_id, tag_list)
  247. except Exception as e:
  248. logger.warning(f"处理标签关系时出错: {str(e)}")
  249. # 成功创建图数据库节点后,写入PG数据库
  250. try:
  251. DataFlowService._save_to_pg_database(
  252. data, dataflow_name, name_en)
  253. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  254. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  255. try:
  256. DataFlowService._handle_script_relationships(
  257. data, dataflow_name, name_en)
  258. logger.info(f"脚本关系创建成功: {dataflow_name}")
  259. except Exception as script_error:
  260. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  261. except Exception as pg_error:
  262. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  263. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  264. # 在实际应用中,可能需要考虑分布式事务
  265. # 返回创建的数据流信息
  266. # 查询创建的节点获取完整信息
  267. query = (
  268. "MATCH (n:DataFlow {name_zh: $name_zh}) "
  269. "RETURN n, id(n) as node_id"
  270. )
  271. with connect_graph().session() as session:
  272. id_result = session.run(query, name_zh=dataflow_name).single()
  273. if id_result:
  274. dataflow_node = id_result['n']
  275. node_id = id_result['node_id']
  276. # 将节点属性转换为字典
  277. result = dict(dataflow_node)
  278. result['id'] = node_id
  279. else:
  280. # 如果查询失败,返回基本信息
  281. result = {
  282. 'id': (
  283. dataflow_id
  284. if isinstance(dataflow_id, int)
  285. else None
  286. ),
  287. 'name_zh': dataflow_name,
  288. 'name_en': name_en,
  289. 'created_at': get_formatted_time(),
  290. }
  291. logger.info(f"创建数据流成功: {dataflow_name}")
  292. return result
  293. except Exception as e:
  294. logger.error(f"创建数据流失败: {str(e)}")
  295. raise e
  296. @staticmethod
  297. def _save_to_pg_database(
  298. data: Dict[str, Any],
  299. script_name: str,
  300. name_en: str,
  301. ):
  302. """
  303. 将脚本信息保存到PG数据库
  304. Args:
  305. data: 包含脚本信息的数据
  306. script_name: 脚本名称
  307. name_en: 英文名称
  308. """
  309. try:
  310. # 提取脚本相关信息
  311. # 处理 script_requirement,确保保存为 JSON 字符串
  312. script_requirement_raw = data.get('script_requirement', None)
  313. # 用于保存从 script_requirement 中提取的 rule
  314. rule_from_requirement = ''
  315. if script_requirement_raw is not None:
  316. # 如果是字典,提取 rule 字段
  317. if isinstance(script_requirement_raw, dict):
  318. rule_from_requirement = script_requirement_raw.get(
  319. 'rule', '')
  320. script_requirement = json.dumps(
  321. script_requirement_raw, ensure_ascii=False)
  322. elif isinstance(script_requirement_raw, list):
  323. script_requirement = json.dumps(
  324. script_requirement_raw, ensure_ascii=False)
  325. else:
  326. # 如果已经是字符串,尝试解析以提取 rule
  327. script_requirement = str(script_requirement_raw)
  328. try:
  329. parsed_req = json.loads(script_requirement)
  330. if isinstance(parsed_req, dict):
  331. rule_from_requirement = parsed_req.get('rule', '')
  332. except (json.JSONDecodeError, TypeError):
  333. pass
  334. else:
  335. script_requirement = ''
  336. # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
  337. script_content = data.get('script_content', '')
  338. if not script_content and rule_from_requirement:
  339. script_content = rule_from_requirement
  340. logger.info(
  341. "script_content为空,使用从script_requirement提取的rule: %s",
  342. rule_from_requirement,
  343. )
  344. # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误)
  345. source_table_raw = data.get('source_table') or ''
  346. source_table = (
  347. source_table_raw.split(':')[-1]
  348. if ':' in source_table_raw
  349. else source_table_raw
  350. )
  351. target_table_raw = data.get('target_table') or ''
  352. target_table = (
  353. target_table_raw.split(':')[-1]
  354. if ':' in target_table_raw
  355. else (target_table_raw or name_en)
  356. )
  357. script_type = data.get('script_type', 'python')
  358. user_name = data.get('created_by', 'system')
  359. target_dt_column = data.get('target_dt_column', '')
  360. # 验证必需字段
  361. if not target_table:
  362. target_table = name_en
  363. if not script_name:
  364. raise ValueError("script_name不能为空")
  365. # 构建插入SQL
  366. insert_sql = text(
  367. """
  368. INSERT INTO dags.data_transform_scripts
  369. (source_table, target_table, script_name, script_type,
  370. script_requirement, script_content, user_name, create_time,
  371. update_time, target_dt_column)
  372. VALUES
  373. (:source_table, :target_table, :script_name, :script_type,
  374. :script_requirement, :script_content, :user_name,
  375. :create_time, :update_time, :target_dt_column)
  376. ON CONFLICT (target_table, script_name)
  377. DO UPDATE SET
  378. source_table = EXCLUDED.source_table,
  379. script_type = EXCLUDED.script_type,
  380. script_requirement = EXCLUDED.script_requirement,
  381. script_content = EXCLUDED.script_content,
  382. user_name = EXCLUDED.user_name,
  383. update_time = EXCLUDED.update_time,
  384. target_dt_column = EXCLUDED.target_dt_column
  385. """
  386. )
  387. # 准备参数
  388. current_time = datetime.now()
  389. params = {
  390. 'source_table': source_table,
  391. 'target_table': target_table,
  392. 'script_name': script_name,
  393. 'script_type': script_type,
  394. 'script_requirement': script_requirement,
  395. 'script_content': script_content,
  396. 'user_name': user_name,
  397. 'create_time': current_time,
  398. 'update_time': current_time,
  399. 'target_dt_column': target_dt_column
  400. }
  401. # 执行插入操作
  402. db.session.execute(insert_sql, params)
  403. # 新增:保存到task_list表
  404. try:
  405. # 1. 解析script_requirement并构建详细的任务描述
  406. task_description_md = script_requirement
  407. try:
  408. # 尝试解析JSON
  409. try:
  410. req_json = json.loads(script_requirement)
  411. except (json.JSONDecodeError, TypeError):
  412. req_json = None
  413. if isinstance(req_json, dict):
  414. # 1. 从script_requirement中提取rule字段作为request_content_str
  415. request_content_str = req_json.get('rule', '')
  416. # 2. 从script_requirement中提取source_table和
  417. # target_table字段信息
  418. source_table_ids = req_json.get('source_table', [])
  419. target_table_ids = req_json.get('target_table', [])
  420. # 确保是列表格式
  421. if not isinstance(source_table_ids, list):
  422. source_table_ids = [
  423. source_table_ids] if source_table_ids else []
  424. if not isinstance(target_table_ids, list):
  425. target_table_ids = [
  426. target_table_ids] if target_table_ids else []
  427. # 合并所有BusinessDomain ID
  428. all_bd_ids = source_table_ids + target_table_ids
  429. # 4. 从data参数中提取update_mode
  430. update_mode = data.get('update_mode', 'append')
  431. # 生成Business Domain DDLs
  432. source_ddls = []
  433. target_ddls = []
  434. data_source_info = None
  435. if all_bd_ids:
  436. try:
  437. with connect_graph().session() as session:
  438. # 处理source tables
  439. for bd_id in source_table_ids:
  440. ddl_info = (
  441. DataFlowService
  442. ._generate_businessdomain_ddl(
  443. session,
  444. bd_id,
  445. is_target=False,
  446. )
  447. )
  448. if ddl_info:
  449. source_ddls.append(ddl_info['ddl'])
  450. # 3. 如果BELONGS_TO关系连接的是
  451. # "数据资源",获取数据源信息
  452. if (
  453. ddl_info.get('data_source')
  454. and not data_source_info
  455. ):
  456. data_source_info = ddl_info[
  457. 'data_source'
  458. ]
  459. # 处理target tables(5. 目标表缺省要有create_time字段)
  460. for bd_id in target_table_ids:
  461. ddl_info = (
  462. DataFlowService
  463. ._generate_businessdomain_ddl(
  464. session,
  465. bd_id,
  466. is_target=True,
  467. update_mode=update_mode,
  468. )
  469. )
  470. if ddl_info:
  471. target_ddls.append(ddl_info['ddl'])
  472. # 同样检查BELONGS_TO关系,获取数据源信息
  473. if (
  474. ddl_info.get('data_source')
  475. and not data_source_info
  476. ):
  477. data_source_info = ddl_info[
  478. 'data_source'
  479. ]
  480. except Exception as neo_e:
  481. logger.error(
  482. f"获取BusinessDomain DDL失败: {str(neo_e)}")
  483. # 构建Markdown格式的任务描述
  484. task_desc_parts = [f"# Task: {script_name}\n"]
  485. # 添加数据源信息
  486. if data_source_info:
  487. task_desc_parts.append("## Data Source")
  488. task_desc_parts.append(
  489. f"- **Type**: "
  490. f"{data_source_info.get('type', 'N/A')}"
  491. )
  492. task_desc_parts.append(
  493. f"- **Host**: "
  494. f"{data_source_info.get('host', 'N/A')}"
  495. )
  496. task_desc_parts.append(
  497. f"- **Port**: "
  498. f"{data_source_info.get('port', 'N/A')}"
  499. )
  500. task_desc_parts.append(
  501. f"- **Database**: "
  502. f"{data_source_info.get('database', 'N/A')}\n"
  503. )
  504. # 添加源表DDL
  505. if source_ddls:
  506. task_desc_parts.append("## Source Tables (DDL)")
  507. for ddl in source_ddls:
  508. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  509. # 添加目标表DDL
  510. if target_ddls:
  511. task_desc_parts.append("## Target Tables (DDL)")
  512. for ddl in target_ddls:
  513. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  514. # 添加更新模式说明
  515. task_desc_parts.append("## Update Mode")
  516. if update_mode == 'append':
  517. task_desc_parts.append("- **Mode**: Append (追加模式)")
  518. task_desc_parts.append(
  519. "- **Description**: 新数据将追加到目标表,不删除现有数据\n")
  520. else:
  521. task_desc_parts.append(
  522. "- **Mode**: Full Refresh (全量更新)")
  523. task_desc_parts.append(
  524. "- **Description**: 目标表将被清空后重新写入数据\n")
  525. # 添加请求内容(rule)
  526. if request_content_str:
  527. task_desc_parts.append("## Request Content")
  528. task_desc_parts.append(f"{request_content_str}\n")
  529. # 添加实施步骤(根据任务类型优化)
  530. task_desc_parts.append("## Implementation Steps")
  531. # 判断是否为远程数据源导入任务
  532. if data_source_info:
  533. # 从远程数据源导入数据的简化步骤
  534. task_desc_parts.append(
  535. "1. Create an n8n workflow to execute the "
  536. "data import task"
  537. )
  538. task_desc_parts.append(
  539. "2. Configure the workflow to call "
  540. "`import_resource_data.py` Python script"
  541. )
  542. task_desc_parts.append(
  543. "3. Pass the following parameters to the "
  544. "Python execution node:"
  545. )
  546. task_desc_parts.append(
  547. " - `--source-config`: JSON configuration "
  548. "for the remote data source"
  549. )
  550. task_desc_parts.append(
  551. " - `--target-table`: Target table name "
  552. "(data resource English name)"
  553. )
  554. task_desc_parts.append(
  555. f" - `--update-mode`: {update_mode}")
  556. task_desc_parts.append(
  557. "4. The Python script will automatically:")
  558. task_desc_parts.append(
  559. " - Connect to the remote data source")
  560. task_desc_parts.append(
  561. " - Extract data from the source table")
  562. task_desc_parts.append(
  563. f" - Write data to target table using "
  564. f"{update_mode} mode"
  565. )
  566. else:
  567. # 数据转换任务的完整步骤
  568. task_desc_parts.append(
  569. "1. Extract data from source tables as "
  570. "specified in the DDL"
  571. )
  572. task_desc_parts.append(
  573. "2. Apply transformation logic according "
  574. "to the rule:"
  575. )
  576. if request_content_str:
  577. task_desc_parts.append(
  578. f" - Rule: {request_content_str}")
  579. task_desc_parts.append(
  580. "3. Generate Python program to implement the "
  581. "data transformation logic"
  582. )
  583. task_desc_parts.append(
  584. f"4. Write transformed data to target table "
  585. f"using {update_mode} mode"
  586. )
  587. task_desc_parts.append(
  588. "5. Create an n8n workflow to schedule and "
  589. "execute the Python program"
  590. )
  591. task_description_md = "\n".join(task_desc_parts)
  592. except Exception as parse_e:
  593. logger.warning(f"解析任务描述详情失败,使用原始描述: {str(parse_e)}")
  594. task_description_md = script_requirement
  595. # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
  596. code_path = 'app/core/data_flow'
  597. task_insert_sql = text(
  598. "INSERT INTO public.task_list\n"
  599. "(task_name, task_description, status, code_name, "
  600. "code_path, create_by, create_time, update_time)\n"
  601. "VALUES\n"
  602. "(:task_name, :task_description, :status, :code_name, "
  603. ":code_path, :create_by, :create_time, :update_time)"
  604. )
  605. task_params = {
  606. 'task_name': script_name,
  607. 'task_description': task_description_md,
  608. 'status': 'pending',
  609. 'code_name': script_name,
  610. 'code_path': code_path,
  611. 'create_by': 'cursor',
  612. 'create_time': current_time,
  613. 'update_time': current_time
  614. }
  615. # 使用嵌套事务,确保task_list插入失败不影响主流程
  616. with db.session.begin_nested():
  617. db.session.execute(task_insert_sql, task_params)
  618. logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
  619. except Exception as task_error:
  620. # 记录错误但不中断主流程
  621. logger.error(f"写入task_list表失败: {str(task_error)}")
  622. # 如果要求必须成功写入任务列表,则这里应该raise task_error
  623. # raise task_error
  624. db.session.commit()
  625. logger.info(
  626. "成功将脚本信息写入PG数据库: target_table=%s, script_name=%s",
  627. target_table,
  628. script_name,
  629. )
  630. except Exception as e:
  631. db.session.rollback()
  632. logger.error(f"写入PG数据库失败: {str(e)}")
  633. raise e
  634. @staticmethod
  635. def _handle_children_relationships(dataflow_node, children_ids):
  636. """处理子节点关系"""
  637. logger.debug(
  638. "处理子节点关系,原始children_ids: %s, 类型: %s",
  639. children_ids,
  640. type(children_ids),
  641. )
  642. # 确保children_ids是列表格式
  643. if not isinstance(children_ids, (list, tuple)):
  644. if children_ids is not None:
  645. children_ids = [children_ids] # 如果是单个值,转换为列表
  646. logger.debug(f"将单个值转换为列表: {children_ids}")
  647. else:
  648. children_ids = [] # 如果是None,转换为空列表
  649. logger.debug("将None转换为空列表")
  650. for child_id in children_ids:
  651. try:
  652. # 查找子节点
  653. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  654. with connect_graph().session() as session:
  655. result = session.run(query, child_id=child_id).data()
  656. if result:
  657. # 获取dataflow_node的ID
  658. dataflow_id = getattr(dataflow_node, 'identity', None)
  659. if dataflow_id is None:
  660. # 如果没有identity属性,从名称查询ID
  661. query_id = (
  662. "MATCH (n:DataFlow) WHERE n.name_zh = "
  663. "$name_zh RETURN id(n) as node_id"
  664. )
  665. id_result = session.run(
  666. query_id,
  667. name_zh=dataflow_node.get('name_zh'),
  668. ).single()
  669. dataflow_id = (
  670. id_result['node_id'] if id_result else None
  671. )
  672. # 创建关系 - 使用ID调用relationship_exists
  673. if dataflow_id and not relationship_exists(
  674. dataflow_id, 'child', child_id
  675. ):
  676. session.run(
  677. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  678. "AND id(b) = $child_id "
  679. "CREATE (a)-[:child]->(b)",
  680. dataflow_id=dataflow_id,
  681. child_id=child_id,
  682. )
  683. logger.info(
  684. f"创建子节点关系: {dataflow_id} -> {child_id}")
  685. except Exception as e:
  686. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  687. @staticmethod
  688. def _handle_tag_relationships(dataflow_id, tag_list):
  689. """
  690. 处理多标签关系
  691. Args:
  692. dataflow_id: 数据流节点ID
  693. tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
  694. """
  695. # 确保tag_list是列表格式
  696. if not isinstance(tag_list, list):
  697. tag_list = [tag_list] if tag_list else []
  698. for tag_item in tag_list:
  699. tag_id = None
  700. if isinstance(tag_item, dict) and 'id' in tag_item:
  701. tag_id = int(tag_item['id'])
  702. elif isinstance(tag_item, (int, str)):
  703. try:
  704. tag_id = int(tag_item)
  705. except (ValueError, TypeError):
  706. pass
  707. if tag_id:
  708. DataFlowService._handle_single_tag_relationship(
  709. dataflow_id, tag_id)
  710. @staticmethod
  711. def _handle_single_tag_relationship(dataflow_id, tag_id):
  712. """处理单个标签关系"""
  713. try:
  714. # 查找标签节点
  715. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  716. with connect_graph().session() as session:
  717. result = session.run(query, tag_id=tag_id).data()
  718. if result:
  719. # 创建关系 - 使用ID调用relationship_exists
  720. if dataflow_id and not relationship_exists(
  721. dataflow_id, 'LABEL', tag_id
  722. ):
  723. session.run(
  724. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  725. "AND id(b) = $tag_id "
  726. "CREATE (a)-[:LABEL]->(b)",
  727. dataflow_id=dataflow_id,
  728. tag_id=tag_id,
  729. )
  730. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  731. except Exception as e:
  732. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  733. @staticmethod
  734. def update_dataflow(
  735. dataflow_id: int,
  736. data: Dict[str, Any],
  737. ) -> Optional[Dict[str, Any]]:
  738. """
  739. 更新数据流
  740. Args:
  741. dataflow_id: 数据流ID
  742. data: 更新的数据
  743. Returns:
  744. 更新后的数据流信息,如果不存在则返回None
  745. """
  746. try:
  747. # 提取 tag 数组(不作为节点属性存储)
  748. tag_list = data.pop('tag', None)
  749. # 查找节点
  750. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  751. with connect_graph().session() as session:
  752. result = session.run(query, dataflow_id=dataflow_id).data()
  753. if not result:
  754. return None
  755. # 更新节点属性
  756. update_fields = []
  757. params: Dict[str, Any] = {'dataflow_id': dataflow_id}
  758. for key, value in data.items():
  759. if key not in ['id', 'created_at']: # 保护字段
  760. # 复杂对象序列化为 JSON 字符串
  761. if key in ['config', 'script_requirement']:
  762. if isinstance(value, dict):
  763. value = json.dumps(value, ensure_ascii=False)
  764. update_fields.append(f"n.{key} = ${key}")
  765. params[key] = value
  766. if update_fields:
  767. params['updated_at'] = get_formatted_time()
  768. update_fields.append("n.updated_at = $updated_at")
  769. update_query = f"""
  770. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  771. SET {', '.join(update_fields)}
  772. RETURN n, id(n) as node_id
  773. """
  774. result = session.run(update_query, params).data()
  775. # 处理 tag 关系(支持多标签数组)
  776. if tag_list is not None:
  777. # 确保是列表格式
  778. if not isinstance(tag_list, list):
  779. tag_list = [tag_list] if tag_list else []
  780. # 先删除现有的 LABEL 关系
  781. delete_query = """
  782. MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
  783. WHERE id(n) = $dataflow_id
  784. DELETE r
  785. """
  786. session.run(delete_query, dataflow_id=dataflow_id)
  787. logger.info(f"删除数据流 {dataflow_id} 的现有标签关系")
  788. # 为每个 tag 创建新的 LABEL 关系
  789. for tag_item in tag_list:
  790. tag_id = None
  791. if isinstance(tag_item, dict) and 'id' in tag_item:
  792. tag_id = int(tag_item['id'])
  793. elif isinstance(tag_item, (int, str)):
  794. try:
  795. tag_id = int(tag_item)
  796. except (ValueError, TypeError):
  797. pass
  798. if tag_id:
  799. DataFlowService._handle_single_tag_relationship(
  800. dataflow_id, tag_id
  801. )
  802. if result:
  803. node = result[0]['n']
  804. updated_dataflow = dict(node)
  805. # 使用查询返回的node_id
  806. updated_dataflow['id'] = result[0]['node_id']
  807. # 查询并添加标签数组到返回数据
  808. tags_query = """
  809. MATCH (n:DataFlow)
  810. WHERE id(n) = $dataflow_id
  811. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  812. RETURN collect({
  813. id: id(label),
  814. name_zh: label.name_zh,
  815. name_en: label.name_en
  816. }) as tags
  817. """
  818. tags_result = session.run(
  819. tags_query, dataflow_id=dataflow_id
  820. ).single()
  821. if tags_result:
  822. tags = tags_result.get('tags', [])
  823. updated_dataflow['tag'] = [
  824. tag for tag in tags
  825. if tag.get('id') is not None
  826. ]
  827. else:
  828. updated_dataflow['tag'] = []
  829. logger.info(f"更新数据流成功: ID={dataflow_id}")
  830. return updated_dataflow
  831. return None
  832. except Exception as e:
  833. logger.error(f"更新数据流失败: {str(e)}")
  834. raise e
  835. @staticmethod
  836. def delete_dataflow(dataflow_id: int) -> bool:
  837. """
  838. 删除数据流
  839. Args:
  840. dataflow_id: 数据流ID
  841. Returns:
  842. 删除是否成功
  843. """
  844. try:
  845. # 删除节点及其关系
  846. query = """
  847. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  848. DETACH DELETE n
  849. RETURN count(n) as deleted_count
  850. """
  851. with connect_graph().session() as session:
  852. delete_result = session.run(
  853. query, dataflow_id=dataflow_id).single()
  854. result = delete_result['deleted_count'] if delete_result else 0
  855. if result and result > 0:
  856. logger.info(f"删除数据流成功: ID={dataflow_id}")
  857. return True
  858. return False
  859. except Exception as e:
  860. logger.error(f"删除数据流失败: {str(e)}")
  861. raise e
  862. @staticmethod
  863. def execute_dataflow(
  864. dataflow_id: int,
  865. params: Optional[Dict[str, Any]] = None,
  866. ) -> Dict[str, Any]:
  867. """
  868. 执行数据流
  869. Args:
  870. dataflow_id: 数据流ID
  871. params: 执行参数
  872. Returns:
  873. 执行结果信息
  874. """
  875. try:
  876. # 检查数据流是否存在
  877. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  878. with connect_graph().session() as session:
  879. result = session.run(query, dataflow_id=dataflow_id).data()
  880. if not result:
  881. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  882. execution_id = (
  883. f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  884. )
  885. # TODO: 这里应该实际执行数据流
  886. # 目前返回模拟结果
  887. result = {
  888. 'execution_id': execution_id,
  889. 'dataflow_id': dataflow_id,
  890. 'status': 'running',
  891. 'started_at': datetime.now().isoformat(),
  892. 'params': params or {},
  893. 'progress': 0
  894. }
  895. logger.info(
  896. "开始执行数据流: ID=%s, execution_id=%s",
  897. dataflow_id,
  898. execution_id,
  899. )
  900. return result
  901. except Exception as e:
  902. logger.error(f"执行数据流失败: {str(e)}")
  903. raise e
  904. @staticmethod
  905. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  906. """
  907. 获取数据流执行状态
  908. Args:
  909. dataflow_id: 数据流ID
  910. Returns:
  911. 执行状态信息
  912. """
  913. try:
  914. # TODO: 这里应该查询实际的执行状态
  915. # 目前返回模拟状态
  916. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  917. with connect_graph().session() as session:
  918. result = session.run(query, dataflow_id=dataflow_id).data()
  919. if not result:
  920. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  921. status = ['running', 'completed', 'failed', 'pending'][
  922. dataflow_id % 4
  923. ]
  924. return {
  925. 'dataflow_id': dataflow_id,
  926. 'status': status,
  927. 'progress': (
  928. 100
  929. if status == 'completed'
  930. else (dataflow_id * 10) % 100
  931. ),
  932. 'started_at': datetime.now().isoformat(),
  933. 'completed_at': (
  934. datetime.now().isoformat()
  935. if status == 'completed'
  936. else None
  937. ),
  938. 'error_message': (
  939. '执行过程中发生错误' if status == 'failed' else None
  940. ),
  941. }
  942. except Exception as e:
  943. logger.error(f"获取数据流状态失败: {str(e)}")
  944. raise e
  945. @staticmethod
  946. def get_dataflow_logs(
  947. dataflow_id: int,
  948. page: int = 1,
  949. page_size: int = 50,
  950. ) -> Dict[str, Any]:
  951. """
  952. 获取数据流执行日志
  953. Args:
  954. dataflow_id: 数据流ID
  955. page: 页码
  956. page_size: 每页大小
  957. Returns:
  958. 执行日志列表和分页信息
  959. """
  960. try:
  961. # TODO: 这里应该查询实际的执行日志
  962. # 目前返回模拟日志
  963. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  964. with connect_graph().session() as session:
  965. result = session.run(query, dataflow_id=dataflow_id).data()
  966. if not result:
  967. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  968. mock_logs = [
  969. {
  970. 'id': i,
  971. 'timestamp': datetime.now().isoformat(),
  972. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  973. 'message': f'数据流执行日志消息 {i}',
  974. 'component': ['source', 'transform', 'target'][i % 3]
  975. }
  976. for i in range(1, 101)
  977. ]
  978. # 分页处理
  979. total = len(mock_logs)
  980. start = (page - 1) * page_size
  981. end = start + page_size
  982. logs = mock_logs[start:end]
  983. return {
  984. 'logs': logs,
  985. 'pagination': {
  986. 'page': page,
  987. 'page_size': page_size,
  988. 'total': total,
  989. 'total_pages': (total + page_size - 1) // page_size
  990. }
  991. }
  992. except Exception as e:
  993. logger.error(f"获取数据流日志失败: {str(e)}")
  994. raise e
  995. @staticmethod
  996. def create_script(request_data: Union[Dict[str, Any], str]) -> str:
  997. """
  998. 使用Deepseek模型生成SQL脚本
  999. Args:
  1000. request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
  1001. Returns:
  1002. 生成的SQL脚本内容
  1003. """
  1004. try:
  1005. logger.info(f"开始处理脚本生成请求: {request_data}")
  1006. logger.info(f"request_data类型: {type(request_data)}")
  1007. # 类型检查和处理
  1008. if isinstance(request_data, str):
  1009. logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
  1010. try:
  1011. import json
  1012. request_data = json.loads(request_data)
  1013. except json.JSONDecodeError as e:
  1014. raise ValueError(f"无法解析request_data为JSON: {str(e)}")
  1015. if not isinstance(request_data, dict):
  1016. raise ValueError(
  1017. f"request_data必须是字典类型,实际类型: {type(request_data)}")
  1018. # 1. 从传入的request_data中解析input, output, request_content内容
  1019. input_data = request_data.get('input', '')
  1020. output_data = request_data.get('output', '')
  1021. request_content = request_data.get('request_data', '')
  1022. # 如果request_content是HTML格式,提取纯文本
  1023. if request_content and (
  1024. request_content.startswith('<p>') or '<' in request_content
  1025. ):
  1026. # 简单的HTML标签清理
  1027. import re
  1028. request_content = re.sub(
  1029. r'<[^>]+>', '', request_content).strip()
  1030. if not input_data or not output_data or not request_content:
  1031. raise ValueError(
  1032. "缺少必要参数:input='{}', output='{}', "
  1033. "request_content='{}' 不能为空".format(
  1034. input_data,
  1035. output_data,
  1036. request_content[:100] if request_content else '',
  1037. )
  1038. )
  1039. logger.info(
  1040. "解析得到 - input: %s, output: %s, request_content: %s",
  1041. input_data,
  1042. output_data,
  1043. request_content,
  1044. )
  1045. # 2. 解析input中的多个数据表并生成源表DDL
  1046. source_tables_ddl = []
  1047. input_tables = []
  1048. if input_data:
  1049. tables = [table.strip()
  1050. for table in input_data.split(',') if table.strip()]
  1051. for table in tables:
  1052. ddl = DataFlowService._parse_table_and_get_ddl(
  1053. table, 'input')
  1054. if ddl:
  1055. input_tables.append(table)
  1056. source_tables_ddl.append(ddl)
  1057. else:
  1058. logger.warning(f"无法获取输入表 {table} 的DDL结构")
  1059. # 3. 解析output中的数据表并生成目标表DDL
  1060. target_table_ddl = ""
  1061. if output_data:
  1062. target_table_ddl = DataFlowService._parse_table_and_get_ddl(
  1063. output_data.strip(), 'output')
  1064. if not target_table_ddl:
  1065. logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
  1066. # 4. 按照Deepseek-prompt.txt的框架构建提示语
  1067. prompt_parts = []
  1068. # 开场白 - 角色定义
  1069. prompt_parts.append(
  1070. "你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。"
  1071. "请为以下需求生成一段标准的 PostgreSQL SQL 脚本:"
  1072. )
  1073. # 动态生成源表部分(第1点)
  1074. for i, (table, ddl) in enumerate(
  1075. zip(input_tables, source_tables_ddl), 1
  1076. ):
  1077. table_name = table.split(':')[-1] if ':' in table else table
  1078. prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
  1079. prompt_parts.append(ddl)
  1080. prompt_parts.append("") # 添加空行分隔
  1081. # 动态生成目标表部分(第2点)
  1082. if target_table_ddl:
  1083. target_table_name = output_data.split(
  1084. ':')[-1] if ':' in output_data else output_data
  1085. next_index = len(input_tables) + 1
  1086. prompt_parts.append(
  1087. f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:")
  1088. prompt_parts.append(target_table_ddl)
  1089. prompt_parts.append("") # 添加空行分隔
  1090. # 动态生成处理逻辑部分(第3点)
  1091. next_index = (
  1092. len(input_tables) + 2
  1093. if target_table_ddl
  1094. else len(input_tables) + 1
  1095. )
  1096. prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
  1097. prompt_parts.append("") # 添加空行分隔
  1098. # 固定的技术要求部分(第4-8点)
  1099. tech_requirements = [
  1100. (
  1101. f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,"
  1102. "适合在 Airflow、Python 脚本、或调度系统中调用;"
  1103. ),
  1104. f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
  1105. f"{next_index + 3}.请直接输出SQL,无需进行解释。",
  1106. (
  1107. f"{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用\"_\"分隔,"
  1108. "采用蛇形命名法。把sql的名字作为注释写在返回的sql中。"
  1109. ),
  1110. (
  1111. f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期"
  1112. "时间now(),不用处理update_time字段"
  1113. ),
  1114. ]
  1115. prompt_parts.extend(tech_requirements)
  1116. # 组合完整的提示语
  1117. full_prompt = "\n".join(prompt_parts)
  1118. logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
  1119. logger.info(f"完整提示语内容: {full_prompt}")
  1120. # 5. 调用LLM生成SQL脚本
  1121. logger.info("开始调用Deepseek模型生成SQL脚本")
  1122. script_content = llm_sql(full_prompt)
  1123. if not script_content:
  1124. raise ValueError("Deepseek模型返回空内容")
  1125. # 确保返回的是文本格式
  1126. if not isinstance(script_content, str):
  1127. script_content = str(script_content)
  1128. logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
  1129. return script_content
  1130. except Exception as e:
  1131. logger.error(f"生成SQL脚本失败: {str(e)}")
  1132. raise e
  1133. @staticmethod
  1134. def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
  1135. """
  1136. 解析表格式(A:B)并从Neo4j查询元数据生成DDL
  1137. Args:
  1138. table_str: 表格式字符串,格式为"label:name_en"
  1139. table_type: 表类型,用于日志记录(input/output)
  1140. Returns:
  1141. DDL格式的表结构字符串
  1142. """
  1143. try:
  1144. # 解析A:B格式
  1145. if ':' not in table_str:
  1146. logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
  1147. return ""
  1148. parts = table_str.split(':', 1)
  1149. if len(parts) != 2:
  1150. logger.error(f"表格式解析失败: {table_str}")
  1151. return ""
  1152. label = parts[0].strip()
  1153. name_en = parts[1].strip()
  1154. if not label or not name_en:
  1155. logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
  1156. return ""
  1157. logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
  1158. # 从Neo4j查询节点及其关联的元数据
  1159. with connect_graph().session() as session:
  1160. # 查询节点及其关联的元数据
  1161. cypher = f"""
  1162. MATCH (n:{label} {{name_en: $name_en}})
  1163. OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
  1164. RETURN n, collect(m) as metadata
  1165. """
  1166. result = session.run(
  1167. cypher, # type: ignore[arg-type]
  1168. {'name_en': name_en},
  1169. )
  1170. record = result.single()
  1171. if not record:
  1172. logger.error(f"未找到节点: label={label}, name_en={name_en}")
  1173. return ""
  1174. node = record['n']
  1175. metadata = record['metadata']
  1176. logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
  1177. # 生成DDL格式的表结构
  1178. ddl_lines = []
  1179. ddl_lines.append(f"CREATE TABLE {name_en} (")
  1180. if metadata:
  1181. column_definitions = []
  1182. for meta in metadata:
  1183. if meta: # 确保meta不为空
  1184. meta_props = dict(meta)
  1185. column_name = meta_props.get(
  1186. 'name_en',
  1187. meta_props.get('name_zh', 'unknown_column'),
  1188. )
  1189. data_type = meta_props.get(
  1190. 'data_type', 'VARCHAR(255)')
  1191. comment = meta_props.get('name_zh', '')
  1192. # 构建列定义
  1193. column_def = f" {column_name} {data_type}"
  1194. if comment:
  1195. column_def += f" COMMENT '{comment}'"
  1196. column_definitions.append(column_def)
  1197. if column_definitions:
  1198. ddl_lines.append(",\n".join(column_definitions))
  1199. else:
  1200. ddl_lines.append(
  1201. " id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1202. else:
  1203. # 如果没有元数据,添加默认列
  1204. ddl_lines.append(
  1205. " id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1206. ddl_lines.append(");")
  1207. # 添加表注释
  1208. node_props = dict(node)
  1209. table_comment = node_props.get(
  1210. 'name_zh', node_props.get('describe', name_en))
  1211. if table_comment and table_comment != name_en:
  1212. ddl_lines.append(
  1213. f"COMMENT ON TABLE {name_en} IS '{table_comment}';")
  1214. ddl_content = "\n".join(ddl_lines)
  1215. logger.info(f"{table_type}表DDL生成成功: {name_en}")
  1216. logger.debug(f"生成的DDL: {ddl_content}")
  1217. return ddl_content
  1218. except Exception as e:
  1219. logger.error(f"解析表格式和生成DDL失败: {str(e)}")
  1220. return ""
  1221. @staticmethod
  1222. def _generate_businessdomain_ddl(
  1223. session,
  1224. bd_id: int,
  1225. is_target: bool = False,
  1226. update_mode: str = 'append',
  1227. ) -> Optional[Dict[str, Any]]:
  1228. """
  1229. 根据BusinessDomain节点ID生成DDL
  1230. Args:
  1231. session: Neo4j session对象
  1232. bd_id: BusinessDomain节点ID
  1233. is_target: 是否为目标表(目标表需要添加create_time字段)
  1234. update_mode: 更新模式(append或full)
  1235. Returns:
  1236. 包含ddl和data_source信息的字典,如果节点不存在则返回None
  1237. """
  1238. try:
  1239. # 查询BusinessDomain节点、元数据、标签关系和数据源关系
  1240. cypher = """
  1241. MATCH (bd:BusinessDomain)
  1242. WHERE id(bd) = $bd_id
  1243. OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
  1244. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1245. OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
  1246. RETURN bd,
  1247. collect(DISTINCT m) as metadata,
  1248. collect(DISTINCT {
  1249. id: id(label),
  1250. name_zh: label.name_zh,
  1251. name_en: label.name_en
  1252. }) as labels,
  1253. ds.type as ds_type,
  1254. ds.host as ds_host,
  1255. ds.port as ds_port,
  1256. ds.database as ds_database
  1257. """
  1258. result = session.run(cypher, bd_id=bd_id).single()
  1259. if not result or not result['bd']:
  1260. logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
  1261. return None
  1262. node = result['bd']
  1263. metadata = result['metadata']
  1264. # 处理标签数组,获取第一个有效标签名称用于判断
  1265. labels = result.get('labels', [])
  1266. valid_labels = [
  1267. label for label in labels if label.get('id') is not None
  1268. ]
  1269. label_name = (
  1270. valid_labels[0].get('name_zh') if valid_labels else None
  1271. )
  1272. # 生成DDL
  1273. node_props = dict(node)
  1274. table_name = node_props.get('name_en', f'table_{bd_id}')
  1275. ddl_lines = []
  1276. ddl_lines.append(f"CREATE TABLE {table_name} (")
  1277. column_definitions = []
  1278. # 添加元数据列
  1279. if metadata:
  1280. for meta in metadata:
  1281. if meta:
  1282. meta_props = dict(meta)
  1283. column_name = meta_props.get(
  1284. 'name_en',
  1285. meta_props.get('name_zh', 'unknown_column'),
  1286. )
  1287. data_type = meta_props.get('data_type', 'VARCHAR(255)')
  1288. comment = meta_props.get('name_zh', '')
  1289. column_def = f" {column_name} {data_type}"
  1290. if comment:
  1291. column_def += f" COMMENT '{comment}'"
  1292. column_definitions.append(column_def)
  1293. # 如果没有元数据,添加默认主键
  1294. if not column_definitions:
  1295. column_definitions.append(
  1296. " id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1297. # 5. 如果是目标表,添加create_time字段
  1298. if is_target:
  1299. column_definitions.append(
  1300. " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
  1301. "COMMENT '数据创建时间'"
  1302. )
  1303. ddl_lines.append(",\n".join(column_definitions))
  1304. ddl_lines.append(");")
  1305. # 添加表注释
  1306. table_comment = node_props.get(
  1307. 'name_zh', node_props.get('describe', table_name))
  1308. if table_comment and table_comment != table_name:
  1309. ddl_lines.append(
  1310. f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
  1311. ddl_content = "\n".join(ddl_lines)
  1312. # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
  1313. data_source = None
  1314. if label_name == '数据资源' and result['ds_type']:
  1315. data_source = {
  1316. 'type': result['ds_type'],
  1317. 'host': result['ds_host'],
  1318. 'port': result['ds_port'],
  1319. 'database': result['ds_database']
  1320. }
  1321. logger.info(f"获取到数据源信息: {data_source}")
  1322. logger.debug(
  1323. f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}")
  1324. return {
  1325. 'ddl': ddl_content,
  1326. 'table_name': table_name,
  1327. 'data_source': data_source
  1328. }
  1329. except Exception as e:
  1330. logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
  1331. return None
  1332. @staticmethod
  1333. def _handle_script_relationships(
  1334. data: Dict[str, Any],
  1335. dataflow_name: str,
  1336. name_en: str,
  1337. ):
  1338. """
  1339. 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的
  1340. DERIVED_FROM关系
  1341. Args:
  1342. data: 包含脚本信息的数据字典,应包含script_name, script_type,
  1343. schedule_status, source_table, target_table, update_mode
  1344. """
  1345. try:
  1346. # 从data中读取键值对
  1347. script_name = dataflow_name,
  1348. script_type = data.get('script_type', 'sql')
  1349. schedule_status = data.get('status', 'inactive')
  1350. source_table_full = data.get('source_table', '')
  1351. target_table_full = data.get('target_table', '')
  1352. update_mode = data.get('update_mode', 'full')
  1353. # 处理source_table和target_table的格式
  1354. source_table = source_table_full.split(
  1355. ':')[-1] if ':' in source_table_full else source_table_full
  1356. target_table = target_table_full.split(
  1357. ':')[-1] if ':' in target_table_full else target_table_full
  1358. source_label = source_table_full.split(
  1359. ':')[0] if ':' in source_table_full else source_table_full
  1360. target_label = target_table_full.split(
  1361. ':')[0] if ':' in target_table_full else target_table_full
  1362. # 验证必要字段
  1363. if not source_table or not target_table:
  1364. logger.warning(
  1365. "source_table或target_table为空,跳过关系创建: "
  1366. "source_table=%s, target_table=%s",
  1367. source_table,
  1368. target_table,
  1369. )
  1370. return
  1371. logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
  1372. with connect_graph().session() as session:
  1373. # 创建或获取source和target节点
  1374. create_nodes_query = f"""
  1375. MERGE (source:{source_label} {{name: $source_table}})
  1376. ON CREATE SET source.created_at = $created_at,
  1377. source.type = 'source'
  1378. WITH source
  1379. MERGE (target:{target_label} {{name: $target_table}})
  1380. ON CREATE SET target.created_at = $created_at,
  1381. target.type = 'target'
  1382. RETURN source, target, id(source) as source_id,
  1383. id(target) as target_id
  1384. """
  1385. # 执行创建节点的查询
  1386. result = session.run(
  1387. create_nodes_query, # type: ignore[arg-type]
  1388. {
  1389. 'source_table': source_table,
  1390. 'target_table': target_table,
  1391. 'created_at': get_formatted_time(),
  1392. },
  1393. ).single()
  1394. if result:
  1395. source_id = result['source_id']
  1396. target_id = result['target_id']
  1397. # 检查并创建关系
  1398. create_relationship_query = f"""
  1399. MATCH (source:{source_label}), (target:{target_label})
  1400. WHERE id(source) = $source_id AND id(target) = $target_id
  1401. AND NOT EXISTS((target)-[:DERIVED_FROM]->(source))
  1402. CREATE (target)-[r:DERIVED_FROM]->(source)
  1403. SET r.script_name = $script_name,
  1404. r.script_type = $script_type,
  1405. r.schedule_status = $schedule_status,
  1406. r.update_mode = $update_mode,
  1407. r.created_at = $created_at,
  1408. r.updated_at = $created_at
  1409. RETURN r
  1410. """
  1411. relationship_result = session.run(
  1412. create_relationship_query, # type: ignore[arg-type]
  1413. {
  1414. 'source_id': source_id,
  1415. 'target_id': target_id,
  1416. 'script_name': script_name,
  1417. 'script_type': script_type,
  1418. 'schedule_status': schedule_status,
  1419. 'update_mode': update_mode,
  1420. 'created_at': get_formatted_time(),
  1421. },
  1422. ).single()
  1423. if relationship_result:
  1424. logger.info(
  1425. "成功创建DERIVED_FROM关系: %s -> %s (script: %s)",
  1426. target_table,
  1427. source_table,
  1428. script_name,
  1429. )
  1430. else:
  1431. logger.info(
  1432. "DERIVED_FROM关系已存在: %s -> %s",
  1433. target_table,
  1434. source_table,
  1435. )
  1436. else:
  1437. logger.error(
  1438. "创建表节点失败: source_table=%s, target_table=%s",
  1439. source_table,
  1440. target_table,
  1441. )
  1442. except Exception as e:
  1443. logger.error(f"处理脚本关系失败: {str(e)}")
  1444. raise e
  1445. @staticmethod
  1446. def get_business_domain_list() -> List[Dict[str, Any]]:
  1447. """
  1448. 获取BusinessDomain节点列表
  1449. Returns:
  1450. BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
  1451. """
  1452. try:
  1453. logger.info("开始查询BusinessDomain节点列表")
  1454. with connect_graph().session() as session:
  1455. # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
  1456. query = """
  1457. MATCH (bd:BusinessDomain)
  1458. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1459. RETURN id(bd) as id,
  1460. bd.name_zh as name_zh,
  1461. bd.name_en as name_en,
  1462. bd.create_time as create_time,
  1463. collect({
  1464. id: id(label),
  1465. name_zh: label.name_zh,
  1466. name_en: label.name_en
  1467. }) as tags
  1468. ORDER BY create_time DESC
  1469. """
  1470. result = session.run(query)
  1471. bd_list = []
  1472. for record in result:
  1473. # 处理标签数组,过滤掉空标签
  1474. tags = record.get("tags", [])
  1475. tag_list = [
  1476. tag for tag in tags
  1477. if tag.get('id') is not None
  1478. ]
  1479. bd_item = {
  1480. "id": record["id"],
  1481. "name_zh": record.get("name_zh", "") or "",
  1482. "name_en": record.get("name_en", "") or "",
  1483. "tag": tag_list,
  1484. }
  1485. bd_list.append(bd_item)
  1486. logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
  1487. return bd_list
  1488. except Exception as e:
  1489. logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
  1490. raise e