dataflows.py 71 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783
  1. import json
  2. import logging
  3. from datetime import datetime
  4. from typing import Any, Dict, List, Optional, Union
  5. from sqlalchemy import text
  6. from app import db
  7. from app.core.data_service.data_product_service import DataProductService
  8. from app.core.graph.graph_operations import (
  9. connect_graph,
  10. create_or_get_node,
  11. get_node,
  12. relationship_exists,
  13. )
  14. from app.core.llm.llm_service import llm_sql
  15. from app.core.meta_data import get_formatted_time, translate_and_parse
  16. logger = logging.getLogger(__name__)
  17. class DataFlowService:
  18. """数据流服务类,处理数据流相关的业务逻辑"""
  19. @staticmethod
  20. def get_dataflows(
  21. page: int = 1,
  22. page_size: int = 10,
  23. search: str = "",
  24. ) -> Dict[str, Any]:
  25. """
  26. 获取数据流列表
  27. Args:
  28. page: 页码
  29. page_size: 每页大小
  30. search: 搜索关键词
  31. Returns:
  32. 包含数据流列表和分页信息的字典
  33. """
  34. try:
  35. # 从图数据库查询数据流列表
  36. skip_count = (page - 1) * page_size
  37. # 构建搜索条件
  38. where_clause = ""
  39. params: Dict[str, Union[int, str]] = {
  40. "skip": skip_count,
  41. "limit": page_size,
  42. }
  43. if search:
  44. where_clause = (
  45. "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
  46. )
  47. params["search"] = search
  48. # 查询数据流列表(包含标签数组)
  49. # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
  50. query = f"""
  51. MATCH (n:DataFlow)
  52. {where_clause}
  53. WITH n
  54. ORDER BY n.created_at DESC
  55. SKIP $skip
  56. LIMIT $limit
  57. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  58. RETURN n, id(n) as node_id,
  59. n.created_at as created_at,
  60. collect({{
  61. id: id(label),
  62. name_zh: label.name_zh,
  63. name_en: label.name_en
  64. }}) as tags
  65. """
  66. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  67. try:
  68. with connect_graph().session() as session:
  69. list_result = session.run(query, params).data()
  70. # 查询总数
  71. count_query = f"""
  72. MATCH (n:DataFlow)
  73. {where_clause}
  74. RETURN count(n) as total
  75. """
  76. count_params = {"search": search} if search else {}
  77. count_result = session.run(count_query, count_params).single()
  78. total = count_result["total"] if count_result else 0
  79. except Exception as e:
  80. # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭
  81. # driver,因为connect_graph可能返回单例或新实例。如果是新实例,
  82. # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭,
  83. # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。
  84. # 用户反馈:The driver.close() call prematurely closes a shared
  85. # driver instance,所以直接使用 session,并不关闭 driver。
  86. logger.error(f"查询数据流失败: {str(e)}")
  87. raise e
  88. # 格式化结果
  89. dataflows = []
  90. for record in list_result:
  91. node = record["n"]
  92. dataflow = dict(node)
  93. dataflow["id"] = record["node_id"] # 使用查询返回的node_id
  94. # 处理标签数组,过滤掉空标签
  95. tags = record.get("tags", [])
  96. dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
  97. dataflows.append(dataflow)
  98. return {
  99. "list": dataflows,
  100. "pagination": {
  101. "page": page,
  102. "page_size": page_size,
  103. "total": total,
  104. "total_pages": (total + page_size - 1) // page_size,
  105. },
  106. }
  107. except Exception as e:
  108. logger.error(f"获取数据流列表失败: {str(e)}")
  109. raise e
  110. @staticmethod
  111. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  112. """
  113. 根据ID获取数据流详情
  114. Args:
  115. dataflow_id: 数据流ID
  116. Returns:
  117. 数据流详情字典,如果不存在则返回None
  118. """
  119. try:
  120. # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
  121. neo4j_query = """
  122. MATCH (n:DataFlow)
  123. WHERE id(n) = $dataflow_id
  124. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  125. RETURN n, id(n) as node_id,
  126. collect({
  127. id: id(label),
  128. name_zh: label.name_zh,
  129. name_en: label.name_en
  130. }) as tags
  131. """
  132. with connect_graph().session() as session:
  133. neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
  134. if not neo4j_result:
  135. logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
  136. return None
  137. record = neo4j_result[0]
  138. node = record["n"]
  139. # 将节点属性转换为字典
  140. dataflow = dict(node)
  141. dataflow["id"] = record["node_id"]
  142. # 处理标签数组,过滤掉空标签
  143. tags = record.get("tags", [])
  144. dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
  145. # 处理 script_requirement:如果是JSON字符串,解析为对象
  146. script_requirement_str = dataflow.get("script_requirement", "")
  147. if script_requirement_str:
  148. try:
  149. # 尝试解析JSON字符串
  150. script_requirement_obj = json.loads(script_requirement_str)
  151. dataflow["script_requirement"] = script_requirement_obj
  152. logger.debug(
  153. "成功解析script_requirement: %s",
  154. script_requirement_obj,
  155. )
  156. except (json.JSONDecodeError, TypeError) as e:
  157. logger.warning(f"script_requirement解析失败,保持原值: {e}")
  158. # 保持原值(字符串)
  159. dataflow["script_requirement"] = script_requirement_str
  160. else:
  161. # 如果为空,设置为None
  162. dataflow["script_requirement"] = None
  163. logger.info(
  164. "成功获取DataFlow详情,ID: %s, 名称: %s",
  165. dataflow_id,
  166. dataflow.get("name_zh"),
  167. )
  168. return dataflow
  169. except Exception as e:
  170. logger.error(f"获取数据流详情失败: {str(e)}")
  171. raise e
  172. @staticmethod
  173. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  174. """
  175. 创建新的数据流
  176. Args:
  177. data: 数据流配置数据
  178. Returns:
  179. 创建的数据流信息
  180. """
  181. try:
  182. # 验证必填字段
  183. required_fields = ["name_zh", "describe"]
  184. for field in required_fields:
  185. if field not in data:
  186. raise ValueError(f"缺少必填字段: {field}")
  187. dataflow_name = data["name_zh"]
  188. # 使用LLM翻译名称生成英文名
  189. try:
  190. result_list = translate_and_parse(dataflow_name)
  191. name_en = (
  192. result_list[0]
  193. if result_list
  194. else dataflow_name.lower().replace(" ", "_")
  195. )
  196. except Exception as e:
  197. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  198. name_en = dataflow_name.lower().replace(" ", "_")
  199. # 处理 script_requirement,将其转换为 JSON 字符串
  200. script_requirement = data.get("script_requirement")
  201. if script_requirement is not None:
  202. # 如果是字典或列表,转换为 JSON 字符串
  203. if isinstance(script_requirement, (dict, list)):
  204. script_requirement_str = json.dumps(
  205. script_requirement, ensure_ascii=False
  206. )
  207. else:
  208. # 如果已经是字符串,直接使用
  209. script_requirement_str = str(script_requirement)
  210. else:
  211. script_requirement_str = ""
  212. # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
  213. node_data = {
  214. "name_zh": dataflow_name,
  215. "name_en": name_en,
  216. "category": data.get("category", ""),
  217. "organization": data.get("organization", ""),
  218. "leader": data.get("leader", ""),
  219. "frequency": data.get("frequency", ""),
  220. "describe": data.get("describe", ""),
  221. "status": data.get("status", "inactive"),
  222. "update_mode": data.get("update_mode", "append"),
  223. "script_type": data.get("script_type", "python"),
  224. "script_requirement": script_requirement_str,
  225. "created_at": get_formatted_time(),
  226. "updated_at": get_formatted_time(),
  227. }
  228. # 创建或获取数据流节点
  229. dataflow_id = get_node("DataFlow", name=dataflow_name)
  230. if dataflow_id:
  231. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  232. dataflow_id = create_or_get_node("DataFlow", **node_data)
  233. # 处理标签关系(支持多标签数组)
  234. tag_list = data.get("tag", [])
  235. if tag_list:
  236. try:
  237. DataFlowService._handle_tag_relationships(dataflow_id, tag_list)
  238. except Exception as e:
  239. logger.warning(f"处理标签关系时出错: {str(e)}")
  240. # 成功创建图数据库节点后,写入PG数据库
  241. try:
  242. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  243. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  244. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  245. try:
  246. DataFlowService._handle_script_relationships(
  247. data, dataflow_name, name_en
  248. )
  249. logger.info(f"脚本关系创建成功: {dataflow_name}")
  250. except Exception as script_error:
  251. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  252. except Exception as pg_error:
  253. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  254. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  255. # 在实际应用中,可能需要考虑分布式事务
  256. # 返回创建的数据流信息
  257. # 查询创建的节点获取完整信息
  258. query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
  259. with connect_graph().session() as session:
  260. id_result = session.run(query, name_zh=dataflow_name).single()
  261. if id_result:
  262. dataflow_node = id_result["n"]
  263. node_id = id_result["node_id"]
  264. # 将节点属性转换为字典
  265. result = dict(dataflow_node)
  266. result["id"] = node_id
  267. else:
  268. # 如果查询失败,返回基本信息
  269. result = {
  270. "id": (dataflow_id if isinstance(dataflow_id, int) else None),
  271. "name_zh": dataflow_name,
  272. "name_en": name_en,
  273. "created_at": get_formatted_time(),
  274. }
  275. # 注册数据产品到数据服务
  276. try:
  277. DataFlowService._register_data_product(
  278. data=data,
  279. dataflow_name=dataflow_name,
  280. name_en=name_en,
  281. dataflow_id=result.get("id"),
  282. )
  283. logger.info(f"数据产品注册成功: {dataflow_name}")
  284. except Exception as product_error:
  285. logger.warning(f"注册数据产品失败: {str(product_error)}")
  286. # 不影响主流程,仅记录警告
  287. logger.info(f"创建数据流成功: {dataflow_name}")
  288. return result
  289. except Exception as e:
  290. logger.error(f"创建数据流失败: {str(e)}")
  291. raise e
  292. @staticmethod
  293. def _save_to_pg_database(
  294. data: Dict[str, Any],
  295. script_name: str,
  296. name_en: str,
  297. ):
  298. """
  299. 将脚本信息保存到PG数据库
  300. Args:
  301. data: 包含脚本信息的数据
  302. script_name: 脚本名称
  303. name_en: 英文名称
  304. """
  305. try:
  306. # 提取脚本相关信息
  307. # 处理 script_requirement,确保保存为 JSON 字符串
  308. script_requirement_raw = data.get("script_requirement")
  309. # 用于保存从 script_requirement 中提取的 rule
  310. rule_from_requirement = ""
  311. if script_requirement_raw is not None:
  312. # 如果是字典,提取 rule 字段
  313. if isinstance(script_requirement_raw, dict):
  314. rule_from_requirement = script_requirement_raw.get("rule", "")
  315. script_requirement = json.dumps(
  316. script_requirement_raw, ensure_ascii=False
  317. )
  318. elif isinstance(script_requirement_raw, list):
  319. script_requirement = json.dumps(
  320. script_requirement_raw, ensure_ascii=False
  321. )
  322. else:
  323. # 如果已经是字符串,尝试解析以提取 rule
  324. script_requirement = str(script_requirement_raw)
  325. try:
  326. parsed_req = json.loads(script_requirement)
  327. if isinstance(parsed_req, dict):
  328. rule_from_requirement = parsed_req.get("rule", "")
  329. except (json.JSONDecodeError, TypeError):
  330. pass
  331. else:
  332. script_requirement = ""
  333. # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
  334. script_content = data.get("script_content", "")
  335. if not script_content and rule_from_requirement:
  336. script_content = rule_from_requirement
  337. logger.info(
  338. "script_content为空,使用从script_requirement提取的rule: %s",
  339. rule_from_requirement,
  340. )
  341. # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误)
  342. source_table_raw = data.get("source_table") or ""
  343. source_table = (
  344. source_table_raw.split(":")[-1]
  345. if ":" in source_table_raw
  346. else source_table_raw
  347. )
  348. target_table_raw = data.get("target_table") or ""
  349. target_table = (
  350. target_table_raw.split(":")[-1]
  351. if ":" in target_table_raw
  352. else (target_table_raw or name_en)
  353. )
  354. script_type = data.get("script_type", "python")
  355. user_name = data.get("created_by", "system")
  356. target_dt_column = data.get("target_dt_column", "")
  357. # 验证必需字段
  358. if not target_table:
  359. target_table = name_en
  360. if not script_name:
  361. raise ValueError("script_name不能为空")
  362. # 构建插入SQL
  363. insert_sql = text(
  364. """
  365. INSERT INTO dags.data_transform_scripts
  366. (source_table, target_table, script_name, script_type,
  367. script_requirement, script_content, user_name, create_time,
  368. update_time, target_dt_column)
  369. VALUES
  370. (:source_table, :target_table, :script_name, :script_type,
  371. :script_requirement, :script_content, :user_name,
  372. :create_time, :update_time, :target_dt_column)
  373. ON CONFLICT (target_table, script_name)
  374. DO UPDATE SET
  375. source_table = EXCLUDED.source_table,
  376. script_type = EXCLUDED.script_type,
  377. script_requirement = EXCLUDED.script_requirement,
  378. script_content = EXCLUDED.script_content,
  379. user_name = EXCLUDED.user_name,
  380. update_time = EXCLUDED.update_time,
  381. target_dt_column = EXCLUDED.target_dt_column
  382. """
  383. )
  384. # 准备参数
  385. current_time = datetime.now()
  386. params = {
  387. "source_table": source_table,
  388. "target_table": target_table,
  389. "script_name": script_name,
  390. "script_type": script_type,
  391. "script_requirement": script_requirement,
  392. "script_content": script_content,
  393. "user_name": user_name,
  394. "create_time": current_time,
  395. "update_time": current_time,
  396. "target_dt_column": target_dt_column,
  397. }
  398. # 执行插入操作
  399. db.session.execute(insert_sql, params)
  400. # 新增:保存到task_list表
  401. try:
  402. # 1. 解析script_requirement并构建详细的任务描述
  403. task_description_md = script_requirement
  404. try:
  405. # 尝试解析JSON
  406. try:
  407. req_json = json.loads(script_requirement)
  408. except (json.JSONDecodeError, TypeError):
  409. req_json = None
  410. if isinstance(req_json, dict):
  411. # 1. 从script_requirement中提取rule字段作为request_content_str
  412. request_content_str = req_json.get("rule", "")
  413. # 2. 从script_requirement中提取source_table和
  414. # target_table字段信息
  415. source_table_ids = req_json.get("source_table", [])
  416. target_table_ids = req_json.get("target_table", [])
  417. # 确保是列表格式
  418. if not isinstance(source_table_ids, list):
  419. source_table_ids = (
  420. [source_table_ids] if source_table_ids else []
  421. )
  422. if not isinstance(target_table_ids, list):
  423. target_table_ids = (
  424. [target_table_ids] if target_table_ids else []
  425. )
  426. # 合并所有BusinessDomain ID
  427. all_bd_ids = source_table_ids + target_table_ids
  428. # 4. 从data参数中提取update_mode
  429. update_mode = data.get("update_mode", "append")
  430. # 生成Business Domain DDLs
  431. source_ddls = []
  432. target_ddls = []
  433. data_source_info = None
  434. if all_bd_ids:
  435. try:
  436. with connect_graph().session() as session:
  437. # 处理source tables
  438. for bd_id in source_table_ids:
  439. ddl_info = DataFlowService._generate_businessdomain_ddl(
  440. session,
  441. bd_id,
  442. is_target=False,
  443. )
  444. if ddl_info:
  445. source_ddls.append(ddl_info["ddl"])
  446. # 3. 如果BELONGS_TO关系连接的是
  447. # "数据资源",获取数据源信息
  448. if (
  449. ddl_info.get("data_source")
  450. and not data_source_info
  451. ):
  452. data_source_info = ddl_info[
  453. "data_source"
  454. ]
  455. # 处理target tables(5. 目标表缺省要有create_time字段)
  456. for bd_id in target_table_ids:
  457. ddl_info = DataFlowService._generate_businessdomain_ddl(
  458. session,
  459. bd_id,
  460. is_target=True,
  461. update_mode=update_mode,
  462. )
  463. if ddl_info:
  464. target_ddls.append(ddl_info["ddl"])
  465. # 同样检查BELONGS_TO关系,获取数据源信息
  466. if (
  467. ddl_info.get("data_source")
  468. and not data_source_info
  469. ):
  470. data_source_info = ddl_info[
  471. "data_source"
  472. ]
  473. except Exception as neo_e:
  474. logger.error(
  475. f"获取BusinessDomain DDL失败: {str(neo_e)}"
  476. )
  477. # 构建Markdown格式的任务描述
  478. task_desc_parts = [f"# Task: {script_name}\n"]
  479. # 添加数据源信息
  480. if data_source_info:
  481. task_desc_parts.append("## Data Source")
  482. task_desc_parts.append(
  483. f"- **Type**: {data_source_info.get('type', 'N/A')}"
  484. )
  485. task_desc_parts.append(
  486. f"- **Host**: {data_source_info.get('host', 'N/A')}"
  487. )
  488. task_desc_parts.append(
  489. f"- **Port**: {data_source_info.get('port', 'N/A')}"
  490. )
  491. task_desc_parts.append(
  492. f"- **Database**: "
  493. f"{data_source_info.get('database', 'N/A')}\n"
  494. )
  495. # 添加源表DDL
  496. if source_ddls:
  497. task_desc_parts.append("## Source Tables (DDL)")
  498. for ddl in source_ddls:
  499. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  500. # 添加目标表DDL
  501. if target_ddls:
  502. task_desc_parts.append("## Target Tables (DDL)")
  503. for ddl in target_ddls:
  504. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  505. # 添加更新模式说明
  506. task_desc_parts.append("## Update Mode")
  507. if update_mode == "append":
  508. task_desc_parts.append("- **Mode**: Append (追加模式)")
  509. task_desc_parts.append(
  510. "- **Description**: 新数据将追加到目标表,不删除现有数据\n"
  511. )
  512. else:
  513. task_desc_parts.append(
  514. "- **Mode**: Full Refresh (全量更新)"
  515. )
  516. task_desc_parts.append(
  517. "- **Description**: 目标表将被清空后重新写入数据\n"
  518. )
  519. # 添加请求内容(rule)
  520. if request_content_str:
  521. task_desc_parts.append("## Request Content")
  522. task_desc_parts.append(f"{request_content_str}\n")
  523. # 添加实施步骤(根据任务类型优化)
  524. task_desc_parts.append("## Implementation Steps")
  525. # 判断是否为远程数据源导入任务
  526. if data_source_info:
  527. # 从远程数据源导入数据的简化步骤
  528. task_desc_parts.append(
  529. "1. Create an n8n workflow to execute the "
  530. "data import task"
  531. )
  532. task_desc_parts.append(
  533. "2. Configure the workflow to call "
  534. "`import_resource_data.py` Python script"
  535. )
  536. task_desc_parts.append(
  537. "3. Pass the following parameters to the "
  538. "Python execution node:"
  539. )
  540. task_desc_parts.append(
  541. " - `--source-config`: JSON configuration "
  542. "for the remote data source"
  543. )
  544. task_desc_parts.append(
  545. " - `--target-table`: Target table name "
  546. "(data resource English name)"
  547. )
  548. task_desc_parts.append(
  549. f" - `--update-mode`: {update_mode}"
  550. )
  551. task_desc_parts.append(
  552. "4. The Python script will automatically:"
  553. )
  554. task_desc_parts.append(
  555. " - Connect to the remote data source"
  556. )
  557. task_desc_parts.append(
  558. " - Extract data from the source table"
  559. )
  560. task_desc_parts.append(
  561. f" - Write data to target table using "
  562. f"{update_mode} mode"
  563. )
  564. else:
  565. # 数据转换任务的完整步骤
  566. task_desc_parts.append(
  567. "1. Extract data from source tables as "
  568. "specified in the DDL"
  569. )
  570. task_desc_parts.append(
  571. "2. Apply transformation logic according to the rule:"
  572. )
  573. if request_content_str:
  574. task_desc_parts.append(
  575. f" - Rule: {request_content_str}"
  576. )
  577. task_desc_parts.append(
  578. "3. Generate Python program to implement the "
  579. "data transformation logic"
  580. )
  581. task_desc_parts.append(
  582. f"4. Write transformed data to target table "
  583. f"using {update_mode} mode"
  584. )
  585. task_desc_parts.append(
  586. "5. Create an n8n workflow to schedule and "
  587. "execute the Python program"
  588. )
  589. task_description_md = "\n".join(task_desc_parts)
  590. except Exception as parse_e:
  591. logger.warning(
  592. f"解析任务描述详情失败,使用原始描述: {str(parse_e)}"
  593. )
  594. task_description_md = script_requirement
  595. # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
  596. code_path = "app/core/data_flow"
  597. task_insert_sql = text(
  598. "INSERT INTO public.task_list\n"
  599. "(task_name, task_description, status, code_name, "
  600. "code_path, create_by, create_time, update_time)\n"
  601. "VALUES\n"
  602. "(:task_name, :task_description, :status, :code_name, "
  603. ":code_path, :create_by, :create_time, :update_time)"
  604. )
  605. task_params = {
  606. "task_name": script_name,
  607. "task_description": task_description_md,
  608. "status": "pending",
  609. "code_name": script_name,
  610. "code_path": code_path,
  611. "create_by": "cursor",
  612. "create_time": current_time,
  613. "update_time": current_time,
  614. }
  615. # 使用嵌套事务,确保task_list插入失败不影响主流程
  616. with db.session.begin_nested():
  617. db.session.execute(task_insert_sql, task_params)
  618. logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
  619. except Exception as task_error:
  620. # 记录错误但不中断主流程
  621. logger.error(f"写入task_list表失败: {str(task_error)}")
  622. # 如果要求必须成功写入任务列表,则这里应该raise task_error
  623. # raise task_error
  624. db.session.commit()
  625. logger.info(
  626. "成功将脚本信息写入PG数据库: target_table=%s, script_name=%s",
  627. target_table,
  628. script_name,
  629. )
  630. except Exception as e:
  631. db.session.rollback()
  632. logger.error(f"写入PG数据库失败: {str(e)}")
  633. raise e
  634. @staticmethod
  635. def _handle_children_relationships(dataflow_node, children_ids):
  636. """处理子节点关系"""
  637. logger.debug(
  638. "处理子节点关系,原始children_ids: %s, 类型: %s",
  639. children_ids,
  640. type(children_ids),
  641. )
  642. # 确保children_ids是列表格式
  643. if not isinstance(children_ids, (list, tuple)):
  644. if children_ids is not None:
  645. children_ids = [children_ids] # 如果是单个值,转换为列表
  646. logger.debug(f"将单个值转换为列表: {children_ids}")
  647. else:
  648. children_ids = [] # 如果是None,转换为空列表
  649. logger.debug("将None转换为空列表")
  650. for child_id in children_ids:
  651. try:
  652. # 查找子节点
  653. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  654. with connect_graph().session() as session:
  655. result = session.run(query, child_id=child_id).data()
  656. if result:
  657. # 获取dataflow_node的ID
  658. dataflow_id = getattr(dataflow_node, "identity", None)
  659. if dataflow_id is None:
  660. # 如果没有identity属性,从名称查询ID
  661. query_id = (
  662. "MATCH (n:DataFlow) WHERE n.name_zh = "
  663. "$name_zh RETURN id(n) as node_id"
  664. )
  665. id_result = session.run(
  666. query_id,
  667. name_zh=dataflow_node.get("name_zh"),
  668. ).single()
  669. dataflow_id = id_result["node_id"] if id_result else None
  670. # 创建关系 - 使用ID调用relationship_exists
  671. if dataflow_id and not relationship_exists(
  672. dataflow_id, "child", child_id
  673. ):
  674. session.run(
  675. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  676. "AND id(b) = $child_id "
  677. "CREATE (a)-[:child]->(b)",
  678. dataflow_id=dataflow_id,
  679. child_id=child_id,
  680. )
  681. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  682. except Exception as e:
  683. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  684. @staticmethod
  685. def _handle_tag_relationships(dataflow_id, tag_list):
  686. """
  687. 处理多标签关系
  688. Args:
  689. dataflow_id: 数据流节点ID
  690. tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
  691. """
  692. # 确保tag_list是列表格式
  693. if not isinstance(tag_list, list):
  694. tag_list = [tag_list] if tag_list else []
  695. for tag_item in tag_list:
  696. tag_id = None
  697. if isinstance(tag_item, dict) and "id" in tag_item:
  698. tag_id = int(tag_item["id"])
  699. elif isinstance(tag_item, (int, str)):
  700. try:
  701. tag_id = int(tag_item)
  702. except (ValueError, TypeError):
  703. pass
  704. if tag_id:
  705. DataFlowService._handle_single_tag_relationship(dataflow_id, tag_id)
  706. @staticmethod
  707. def _handle_single_tag_relationship(dataflow_id, tag_id):
  708. """处理单个标签关系"""
  709. try:
  710. # 查找标签节点
  711. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  712. with connect_graph().session() as session:
  713. result = session.run(query, tag_id=tag_id).data()
  714. if result:
  715. # 创建关系 - 使用ID调用relationship_exists
  716. if dataflow_id and not relationship_exists(
  717. dataflow_id, "LABEL", tag_id
  718. ):
  719. session.run(
  720. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  721. "AND id(b) = $tag_id "
  722. "CREATE (a)-[:LABEL]->(b)",
  723. dataflow_id=dataflow_id,
  724. tag_id=tag_id,
  725. )
  726. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  727. except Exception as e:
  728. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  729. @staticmethod
  730. def update_dataflow(
  731. dataflow_id: int,
  732. data: Dict[str, Any],
  733. ) -> Optional[Dict[str, Any]]:
  734. """
  735. 更新数据流
  736. Args:
  737. dataflow_id: 数据流ID
  738. data: 更新的数据
  739. Returns:
  740. 更新后的数据流信息,如果不存在则返回None
  741. """
  742. try:
  743. # 提取 tag 数组(不作为节点属性存储)
  744. tag_list = data.pop("tag", None)
  745. # 查找节点
  746. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  747. with connect_graph().session() as session:
  748. result = session.run(query, dataflow_id=dataflow_id).data()
  749. if not result:
  750. return None
  751. # 更新节点属性
  752. update_fields = []
  753. params: Dict[str, Any] = {"dataflow_id": dataflow_id}
  754. for key, value in data.items():
  755. if key not in ["id", "created_at"]: # 保护字段
  756. # 复杂对象序列化为 JSON 字符串
  757. if key in ["config", "script_requirement"]:
  758. if isinstance(value, dict):
  759. value = json.dumps(value, ensure_ascii=False)
  760. update_fields.append(f"n.{key} = ${key}")
  761. params[key] = value
  762. if update_fields:
  763. params["updated_at"] = get_formatted_time()
  764. update_fields.append("n.updated_at = $updated_at")
  765. update_query = f"""
  766. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  767. SET {", ".join(update_fields)}
  768. RETURN n, id(n) as node_id
  769. """
  770. result = session.run(update_query, params).data()
  771. # 处理 tag 关系(支持多标签数组)
  772. if tag_list is not None:
  773. # 确保是列表格式
  774. if not isinstance(tag_list, list):
  775. tag_list = [tag_list] if tag_list else []
  776. # 先删除现有的 LABEL 关系
  777. delete_query = """
  778. MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
  779. WHERE id(n) = $dataflow_id
  780. DELETE r
  781. """
  782. session.run(delete_query, dataflow_id=dataflow_id)
  783. logger.info(f"删除数据流 {dataflow_id} 的现有标签关系")
  784. # 为每个 tag 创建新的 LABEL 关系
  785. for tag_item in tag_list:
  786. tag_id = None
  787. if isinstance(tag_item, dict) and "id" in tag_item:
  788. tag_id = int(tag_item["id"])
  789. elif isinstance(tag_item, (int, str)):
  790. try:
  791. tag_id = int(tag_item)
  792. except (ValueError, TypeError):
  793. pass
  794. if tag_id:
  795. DataFlowService._handle_single_tag_relationship(
  796. dataflow_id, tag_id
  797. )
  798. if result:
  799. node = result[0]["n"]
  800. updated_dataflow = dict(node)
  801. # 使用查询返回的node_id
  802. updated_dataflow["id"] = result[0]["node_id"]
  803. # 查询并添加标签数组到返回数据
  804. tags_query = """
  805. MATCH (n:DataFlow)
  806. WHERE id(n) = $dataflow_id
  807. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  808. RETURN collect({
  809. id: id(label),
  810. name_zh: label.name_zh,
  811. name_en: label.name_en
  812. }) as tags
  813. """
  814. tags_result = session.run(
  815. tags_query, dataflow_id=dataflow_id
  816. ).single()
  817. if tags_result:
  818. tags = tags_result.get("tags", [])
  819. updated_dataflow["tag"] = [
  820. tag for tag in tags if tag.get("id") is not None
  821. ]
  822. else:
  823. updated_dataflow["tag"] = []
  824. logger.info(f"更新数据流成功: ID={dataflow_id}")
  825. return updated_dataflow
  826. return None
  827. except Exception as e:
  828. logger.error(f"更新数据流失败: {str(e)}")
  829. raise e
  830. @staticmethod
  831. def delete_dataflow(dataflow_id: int) -> bool:
  832. """
  833. 删除数据流
  834. Args:
  835. dataflow_id: 数据流ID
  836. Returns:
  837. 删除是否成功
  838. """
  839. try:
  840. # 删除节点及其关系
  841. query = """
  842. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  843. DETACH DELETE n
  844. RETURN count(n) as deleted_count
  845. """
  846. with connect_graph().session() as session:
  847. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  848. result = delete_result["deleted_count"] if delete_result else 0
  849. if result and result > 0:
  850. logger.info(f"删除数据流成功: ID={dataflow_id}")
  851. return True
  852. return False
  853. except Exception as e:
  854. logger.error(f"删除数据流失败: {str(e)}")
  855. raise e
  856. @staticmethod
  857. def execute_dataflow(
  858. dataflow_id: int,
  859. params: Optional[Dict[str, Any]] = None,
  860. ) -> Dict[str, Any]:
  861. """
  862. 执行数据流
  863. Args:
  864. dataflow_id: 数据流ID
  865. params: 执行参数
  866. Returns:
  867. 执行结果信息
  868. """
  869. try:
  870. # 检查数据流是否存在
  871. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  872. with connect_graph().session() as session:
  873. result = session.run(query, dataflow_id=dataflow_id).data()
  874. if not result:
  875. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  876. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  877. # TODO: 这里应该实际执行数据流
  878. # 目前返回模拟结果
  879. result = {
  880. "execution_id": execution_id,
  881. "dataflow_id": dataflow_id,
  882. "status": "running",
  883. "started_at": datetime.now().isoformat(),
  884. "params": params or {},
  885. "progress": 0,
  886. }
  887. logger.info(
  888. "开始执行数据流: ID=%s, execution_id=%s",
  889. dataflow_id,
  890. execution_id,
  891. )
  892. return result
  893. except Exception as e:
  894. logger.error(f"执行数据流失败: {str(e)}")
  895. raise e
  896. @staticmethod
  897. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  898. """
  899. 获取数据流执行状态
  900. Args:
  901. dataflow_id: 数据流ID
  902. Returns:
  903. 执行状态信息
  904. """
  905. try:
  906. # TODO: 这里应该查询实际的执行状态
  907. # 目前返回模拟状态
  908. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  909. with connect_graph().session() as session:
  910. result = session.run(query, dataflow_id=dataflow_id).data()
  911. if not result:
  912. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  913. status = ["running", "completed", "failed", "pending"][dataflow_id % 4]
  914. return {
  915. "dataflow_id": dataflow_id,
  916. "status": status,
  917. "progress": (
  918. 100 if status == "completed" else (dataflow_id * 10) % 100
  919. ),
  920. "started_at": datetime.now().isoformat(),
  921. "completed_at": (
  922. datetime.now().isoformat() if status == "completed" else None
  923. ),
  924. "error_message": ("执行过程中发生错误" if status == "failed" else None),
  925. }
  926. except Exception as e:
  927. logger.error(f"获取数据流状态失败: {str(e)}")
  928. raise e
  929. @staticmethod
  930. def get_dataflow_logs(
  931. dataflow_id: int,
  932. page: int = 1,
  933. page_size: int = 50,
  934. ) -> Dict[str, Any]:
  935. """
  936. 获取数据流执行日志
  937. Args:
  938. dataflow_id: 数据流ID
  939. page: 页码
  940. page_size: 每页大小
  941. Returns:
  942. 执行日志列表和分页信息
  943. """
  944. try:
  945. # TODO: 这里应该查询实际的执行日志
  946. # 目前返回模拟日志
  947. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  948. with connect_graph().session() as session:
  949. result = session.run(query, dataflow_id=dataflow_id).data()
  950. if not result:
  951. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  952. mock_logs = [
  953. {
  954. "id": i,
  955. "timestamp": datetime.now().isoformat(),
  956. "level": ["INFO", "WARNING", "ERROR"][i % 3],
  957. "message": f"数据流执行日志消息 {i}",
  958. "component": ["source", "transform", "target"][i % 3],
  959. }
  960. for i in range(1, 101)
  961. ]
  962. # 分页处理
  963. total = len(mock_logs)
  964. start = (page - 1) * page_size
  965. end = start + page_size
  966. logs = mock_logs[start:end]
  967. return {
  968. "logs": logs,
  969. "pagination": {
  970. "page": page,
  971. "page_size": page_size,
  972. "total": total,
  973. "total_pages": (total + page_size - 1) // page_size,
  974. },
  975. }
  976. except Exception as e:
  977. logger.error(f"获取数据流日志失败: {str(e)}")
  978. raise e
  979. @staticmethod
  980. def create_script(request_data: Union[Dict[str, Any], str]) -> str:
  981. """
  982. 使用Deepseek模型生成SQL脚本
  983. Args:
  984. request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
  985. Returns:
  986. 生成的SQL脚本内容
  987. """
  988. try:
  989. logger.info(f"开始处理脚本生成请求: {request_data}")
  990. logger.info(f"request_data类型: {type(request_data)}")
  991. # 类型检查和处理
  992. if isinstance(request_data, str):
  993. logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
  994. try:
  995. import json
  996. request_data = json.loads(request_data)
  997. except json.JSONDecodeError as e:
  998. raise ValueError(f"无法解析request_data为JSON: {str(e)}")
  999. if not isinstance(request_data, dict):
  1000. raise ValueError(
  1001. f"request_data必须是字典类型,实际类型: {type(request_data)}"
  1002. )
  1003. # 1. 从传入的request_data中解析input, output, request_content内容
  1004. input_data = request_data.get("input", "")
  1005. output_data = request_data.get("output", "")
  1006. request_content = request_data.get("request_data", "")
  1007. # 如果request_content是HTML格式,提取纯文本
  1008. if request_content and (
  1009. request_content.startswith("<p>") or "<" in request_content
  1010. ):
  1011. # 简单的HTML标签清理
  1012. import re
  1013. request_content = re.sub(r"<[^>]+>", "", request_content).strip()
  1014. if not input_data or not output_data or not request_content:
  1015. raise ValueError(
  1016. "缺少必要参数:input='{}', output='{}', "
  1017. "request_content='{}' 不能为空".format(
  1018. input_data,
  1019. output_data,
  1020. request_content[:100] if request_content else "",
  1021. )
  1022. )
  1023. logger.info(
  1024. "解析得到 - input: %s, output: %s, request_content: %s",
  1025. input_data,
  1026. output_data,
  1027. request_content,
  1028. )
  1029. # 2. 解析input中的多个数据表并生成源表DDL
  1030. source_tables_ddl = []
  1031. input_tables = []
  1032. if input_data:
  1033. tables = [
  1034. table.strip() for table in input_data.split(",") if table.strip()
  1035. ]
  1036. for table in tables:
  1037. ddl = DataFlowService._parse_table_and_get_ddl(table, "input")
  1038. if ddl:
  1039. input_tables.append(table)
  1040. source_tables_ddl.append(ddl)
  1041. else:
  1042. logger.warning(f"无法获取输入表 {table} 的DDL结构")
  1043. # 3. 解析output中的数据表并生成目标表DDL
  1044. target_table_ddl = ""
  1045. if output_data:
  1046. target_table_ddl = DataFlowService._parse_table_and_get_ddl(
  1047. output_data.strip(), "output"
  1048. )
  1049. if not target_table_ddl:
  1050. logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
  1051. # 4. 按照Deepseek-prompt.txt的框架构建提示语
  1052. prompt_parts = []
  1053. # 开场白 - 角色定义
  1054. prompt_parts.append(
  1055. "你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。"
  1056. "请为以下需求生成一段标准的 PostgreSQL SQL 脚本:"
  1057. )
  1058. # 动态生成源表部分(第1点)
  1059. for i, (table, ddl) in enumerate(zip(input_tables, source_tables_ddl), 1):
  1060. table_name = table.split(":")[-1] if ":" in table else table
  1061. prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
  1062. prompt_parts.append(ddl)
  1063. prompt_parts.append("") # 添加空行分隔
  1064. # 动态生成目标表部分(第2点)
  1065. if target_table_ddl:
  1066. target_table_name = (
  1067. output_data.split(":")[-1] if ":" in output_data else output_data
  1068. )
  1069. next_index = len(input_tables) + 1
  1070. prompt_parts.append(
  1071. f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:"
  1072. )
  1073. prompt_parts.append(target_table_ddl)
  1074. prompt_parts.append("") # 添加空行分隔
  1075. # 动态生成处理逻辑部分(第3点)
  1076. next_index = (
  1077. len(input_tables) + 2 if target_table_ddl else len(input_tables) + 1
  1078. )
  1079. prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
  1080. prompt_parts.append("") # 添加空行分隔
  1081. # 固定的技术要求部分(第4-8点)
  1082. tech_requirements = [
  1083. (
  1084. f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,"
  1085. "适合在 Airflow、Python 脚本、或调度系统中调用;"
  1086. ),
  1087. f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
  1088. f"{next_index + 3}.请直接输出SQL,无需进行解释。",
  1089. (
  1090. f'{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用"_"分隔,'
  1091. "采用蛇形命名法。把sql的名字作为注释写在返回的sql中。"
  1092. ),
  1093. (
  1094. f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期"
  1095. "时间now(),不用处理update_time字段"
  1096. ),
  1097. ]
  1098. prompt_parts.extend(tech_requirements)
  1099. # 组合完整的提示语
  1100. full_prompt = "\n".join(prompt_parts)
  1101. logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
  1102. logger.info(f"完整提示语内容: {full_prompt}")
  1103. # 5. 调用LLM生成SQL脚本
  1104. logger.info("开始调用Deepseek模型生成SQL脚本")
  1105. script_content = llm_sql(full_prompt)
  1106. if not script_content:
  1107. raise ValueError("Deepseek模型返回空内容")
  1108. # 确保返回的是文本格式
  1109. if not isinstance(script_content, str):
  1110. script_content = str(script_content)
  1111. logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
  1112. return script_content
  1113. except Exception as e:
  1114. logger.error(f"生成SQL脚本失败: {str(e)}")
  1115. raise e
  1116. @staticmethod
  1117. def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
  1118. """
  1119. 解析表格式(A:B)并从Neo4j查询元数据生成DDL
  1120. Args:
  1121. table_str: 表格式字符串,格式为"label:name_en"
  1122. table_type: 表类型,用于日志记录(input/output)
  1123. Returns:
  1124. DDL格式的表结构字符串
  1125. """
  1126. try:
  1127. # 解析A:B格式
  1128. if ":" not in table_str:
  1129. logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
  1130. return ""
  1131. parts = table_str.split(":", 1)
  1132. if len(parts) != 2:
  1133. logger.error(f"表格式解析失败: {table_str}")
  1134. return ""
  1135. label = parts[0].strip()
  1136. name_en = parts[1].strip()
  1137. if not label or not name_en:
  1138. logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
  1139. return ""
  1140. logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
  1141. # 从Neo4j查询节点及其关联的元数据
  1142. with connect_graph().session() as session:
  1143. # 查询节点及其关联的元数据
  1144. cypher = f"""
  1145. MATCH (n:{label} {{name_en: $name_en}})
  1146. OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
  1147. RETURN n, collect(m) as metadata
  1148. """
  1149. result = session.run(
  1150. cypher, # type: ignore[arg-type]
  1151. {"name_en": name_en},
  1152. )
  1153. record = result.single()
  1154. if not record:
  1155. logger.error(f"未找到节点: label={label}, name_en={name_en}")
  1156. return ""
  1157. node = record["n"]
  1158. metadata = record["metadata"]
  1159. logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
  1160. # 生成DDL格式的表结构
  1161. ddl_lines = []
  1162. ddl_lines.append(f"CREATE TABLE {name_en} (")
  1163. if metadata:
  1164. column_definitions = []
  1165. for meta in metadata:
  1166. if meta: # 确保meta不为空
  1167. meta_props = dict(meta)
  1168. column_name = meta_props.get(
  1169. "name_en",
  1170. meta_props.get("name_zh", "unknown_column"),
  1171. )
  1172. data_type = meta_props.get("data_type", "VARCHAR(255)")
  1173. comment = meta_props.get("name_zh", "")
  1174. # 构建列定义
  1175. column_def = f" {column_name} {data_type}"
  1176. if comment:
  1177. column_def += f" COMMENT '{comment}'"
  1178. column_definitions.append(column_def)
  1179. if column_definitions:
  1180. ddl_lines.append(",\n".join(column_definitions))
  1181. else:
  1182. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1183. else:
  1184. # 如果没有元数据,添加默认列
  1185. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1186. ddl_lines.append(");")
  1187. # 添加表注释
  1188. node_props = dict(node)
  1189. table_comment = node_props.get(
  1190. "name_zh", node_props.get("describe", name_en)
  1191. )
  1192. if table_comment and table_comment != name_en:
  1193. ddl_lines.append(
  1194. f"COMMENT ON TABLE {name_en} IS '{table_comment}';"
  1195. )
  1196. ddl_content = "\n".join(ddl_lines)
  1197. logger.info(f"{table_type}表DDL生成成功: {name_en}")
  1198. logger.debug(f"生成的DDL: {ddl_content}")
  1199. return ddl_content
  1200. except Exception as e:
  1201. logger.error(f"解析表格式和生成DDL失败: {str(e)}")
  1202. return ""
  1203. @staticmethod
  1204. def _generate_businessdomain_ddl(
  1205. session,
  1206. bd_id: int,
  1207. is_target: bool = False,
  1208. update_mode: str = "append",
  1209. ) -> Optional[Dict[str, Any]]:
  1210. """
  1211. 根据BusinessDomain节点ID生成DDL
  1212. Args:
  1213. session: Neo4j session对象
  1214. bd_id: BusinessDomain节点ID
  1215. is_target: 是否为目标表(目标表需要添加create_time字段)
  1216. update_mode: 更新模式(append或full)
  1217. Returns:
  1218. 包含ddl和data_source信息的字典,如果节点不存在则返回None
  1219. """
  1220. try:
  1221. # 查询BusinessDomain节点、元数据、标签关系和数据源关系
  1222. cypher = """
  1223. MATCH (bd:BusinessDomain)
  1224. WHERE id(bd) = $bd_id
  1225. OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
  1226. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1227. OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
  1228. RETURN bd,
  1229. collect(DISTINCT m) as metadata,
  1230. collect(DISTINCT {
  1231. id: id(label),
  1232. name_zh: label.name_zh,
  1233. name_en: label.name_en
  1234. }) as labels,
  1235. ds.type as ds_type,
  1236. ds.host as ds_host,
  1237. ds.port as ds_port,
  1238. ds.database as ds_database
  1239. """
  1240. result = session.run(cypher, bd_id=bd_id).single()
  1241. if not result or not result["bd"]:
  1242. logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
  1243. return None
  1244. node = result["bd"]
  1245. metadata = result["metadata"]
  1246. # 处理标签数组,获取第一个有效标签名称用于判断
  1247. labels = result.get("labels", [])
  1248. valid_labels = [label for label in labels if label.get("id") is not None]
  1249. label_name = valid_labels[0].get("name_zh") if valid_labels else None
  1250. # 生成DDL
  1251. node_props = dict(node)
  1252. table_name = node_props.get("name_en", f"table_{bd_id}")
  1253. ddl_lines = []
  1254. ddl_lines.append(f"CREATE TABLE {table_name} (")
  1255. column_definitions = []
  1256. # 添加元数据列
  1257. if metadata:
  1258. for meta in metadata:
  1259. if meta:
  1260. meta_props = dict(meta)
  1261. column_name = meta_props.get(
  1262. "name_en",
  1263. meta_props.get("name_zh", "unknown_column"),
  1264. )
  1265. data_type = meta_props.get("data_type", "VARCHAR(255)")
  1266. comment = meta_props.get("name_zh", "")
  1267. column_def = f" {column_name} {data_type}"
  1268. if comment:
  1269. column_def += f" COMMENT '{comment}'"
  1270. column_definitions.append(column_def)
  1271. # 如果没有元数据,添加默认主键
  1272. if not column_definitions:
  1273. column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1274. # 5. 如果是目标表,添加create_time字段
  1275. if is_target:
  1276. column_definitions.append(
  1277. " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
  1278. "COMMENT '数据创建时间'"
  1279. )
  1280. ddl_lines.append(",\n".join(column_definitions))
  1281. ddl_lines.append(");")
  1282. # 添加表注释
  1283. table_comment = node_props.get(
  1284. "name_zh", node_props.get("describe", table_name)
  1285. )
  1286. if table_comment and table_comment != table_name:
  1287. ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
  1288. ddl_content = "\n".join(ddl_lines)
  1289. # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
  1290. data_source = None
  1291. if label_name == "数据资源" and result["ds_type"]:
  1292. data_source = {
  1293. "type": result["ds_type"],
  1294. "host": result["ds_host"],
  1295. "port": result["ds_port"],
  1296. "database": result["ds_database"],
  1297. }
  1298. logger.info(f"获取到数据源信息: {data_source}")
  1299. logger.debug(
  1300. f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}"
  1301. )
  1302. return {
  1303. "ddl": ddl_content,
  1304. "table_name": table_name,
  1305. "data_source": data_source,
  1306. }
  1307. except Exception as e:
  1308. logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
  1309. return None
  1310. @staticmethod
  1311. def _handle_script_relationships(
  1312. data: Dict[str, Any],
  1313. dataflow_name: str,
  1314. name_en: str,
  1315. ):
  1316. """
  1317. 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的
  1318. DERIVED_FROM关系
  1319. Args:
  1320. data: 包含脚本信息的数据字典,应包含script_name, script_type,
  1321. schedule_status, source_table, target_table, update_mode
  1322. """
  1323. try:
  1324. # 从data中读取键值对
  1325. script_name = (dataflow_name,)
  1326. script_type = data.get("script_type", "sql")
  1327. schedule_status = data.get("status", "inactive")
  1328. source_table_full = data.get("source_table", "")
  1329. target_table_full = data.get("target_table", "")
  1330. update_mode = data.get("update_mode", "full")
  1331. # 处理source_table和target_table的格式
  1332. source_table = (
  1333. source_table_full.split(":")[-1]
  1334. if ":" in source_table_full
  1335. else source_table_full
  1336. )
  1337. target_table = (
  1338. target_table_full.split(":")[-1]
  1339. if ":" in target_table_full
  1340. else target_table_full
  1341. )
  1342. source_label = (
  1343. source_table_full.split(":")[0]
  1344. if ":" in source_table_full
  1345. else source_table_full
  1346. )
  1347. target_label = (
  1348. target_table_full.split(":")[0]
  1349. if ":" in target_table_full
  1350. else target_table_full
  1351. )
  1352. # 验证必要字段
  1353. if not source_table or not target_table:
  1354. logger.warning(
  1355. "source_table或target_table为空,跳过关系创建: "
  1356. "source_table=%s, target_table=%s",
  1357. source_table,
  1358. target_table,
  1359. )
  1360. return
  1361. logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
  1362. with connect_graph().session() as session:
  1363. # 创建或获取source和target节点
  1364. create_nodes_query = f"""
  1365. MERGE (source:{source_label} {{name: $source_table}})
  1366. ON CREATE SET source.created_at = $created_at,
  1367. source.type = 'source'
  1368. WITH source
  1369. MERGE (target:{target_label} {{name: $target_table}})
  1370. ON CREATE SET target.created_at = $created_at,
  1371. target.type = 'target'
  1372. RETURN source, target, id(source) as source_id,
  1373. id(target) as target_id
  1374. """
  1375. # 执行创建节点的查询
  1376. result = session.run(
  1377. create_nodes_query, # type: ignore[arg-type]
  1378. {
  1379. "source_table": source_table,
  1380. "target_table": target_table,
  1381. "created_at": get_formatted_time(),
  1382. },
  1383. ).single()
  1384. if result:
  1385. source_id = result["source_id"]
  1386. target_id = result["target_id"]
  1387. # 检查并创建关系
  1388. create_relationship_query = f"""
  1389. MATCH (source:{source_label}), (target:{target_label})
  1390. WHERE id(source) = $source_id AND id(target) = $target_id
  1391. AND NOT EXISTS((target)-[:DERIVED_FROM]->(source))
  1392. CREATE (target)-[r:DERIVED_FROM]->(source)
  1393. SET r.script_name = $script_name,
  1394. r.script_type = $script_type,
  1395. r.schedule_status = $schedule_status,
  1396. r.update_mode = $update_mode,
  1397. r.created_at = $created_at,
  1398. r.updated_at = $created_at
  1399. RETURN r
  1400. """
  1401. relationship_result = session.run(
  1402. create_relationship_query, # type: ignore[arg-type]
  1403. {
  1404. "source_id": source_id,
  1405. "target_id": target_id,
  1406. "script_name": script_name,
  1407. "script_type": script_type,
  1408. "schedule_status": schedule_status,
  1409. "update_mode": update_mode,
  1410. "created_at": get_formatted_time(),
  1411. },
  1412. ).single()
  1413. if relationship_result:
  1414. logger.info(
  1415. "成功创建DERIVED_FROM关系: %s -> %s (script: %s)",
  1416. target_table,
  1417. source_table,
  1418. script_name,
  1419. )
  1420. else:
  1421. logger.info(
  1422. "DERIVED_FROM关系已存在: %s -> %s",
  1423. target_table,
  1424. source_table,
  1425. )
  1426. else:
  1427. logger.error(
  1428. "创建表节点失败: source_table=%s, target_table=%s",
  1429. source_table,
  1430. target_table,
  1431. )
  1432. except Exception as e:
  1433. logger.error(f"处理脚本关系失败: {str(e)}")
  1434. raise e
  1435. @staticmethod
  1436. def get_business_domain_list() -> List[Dict[str, Any]]:
  1437. """
  1438. 获取BusinessDomain节点列表
  1439. Returns:
  1440. BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
  1441. """
  1442. try:
  1443. logger.info("开始查询BusinessDomain节点列表")
  1444. with connect_graph().session() as session:
  1445. # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
  1446. query = """
  1447. MATCH (bd:BusinessDomain)
  1448. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1449. RETURN id(bd) as id,
  1450. bd.name_zh as name_zh,
  1451. bd.name_en as name_en,
  1452. bd.create_time as create_time,
  1453. collect({
  1454. id: id(label),
  1455. name_zh: label.name_zh,
  1456. name_en: label.name_en
  1457. }) as tags
  1458. ORDER BY create_time DESC
  1459. """
  1460. result = session.run(query)
  1461. bd_list = []
  1462. for record in result:
  1463. # 处理标签数组,过滤掉空标签
  1464. tags = record.get("tags", [])
  1465. tag_list = [tag for tag in tags if tag.get("id") is not None]
  1466. bd_item = {
  1467. "id": record["id"],
  1468. "name_zh": record.get("name_zh", "") or "",
  1469. "name_en": record.get("name_en", "") or "",
  1470. "tag": tag_list,
  1471. }
  1472. bd_list.append(bd_item)
  1473. logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
  1474. return bd_list
  1475. except Exception as e:
  1476. logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
  1477. raise e
  1478. @staticmethod
  1479. def _register_data_product(
  1480. data: Dict[str, Any],
  1481. dataflow_name: str,
  1482. name_en: str,
  1483. dataflow_id: Optional[int] = None,
  1484. ) -> None:
  1485. """
  1486. 注册数据产品到数据服务
  1487. 当数据流创建成功后,自动将其注册为数据产品,
  1488. 以便在数据服务模块中展示和管理。
  1489. Args:
  1490. data: 数据流配置数据
  1491. dataflow_name: 数据流名称(中文)
  1492. name_en: 数据流英文名
  1493. dataflow_id: 数据流ID(Neo4j节点ID)
  1494. """
  1495. try:
  1496. # 解析目标表信息
  1497. target_table_raw = data.get("target_table") or ""
  1498. target_table = (
  1499. target_table_raw.split(":")[-1]
  1500. if ":" in target_table_raw
  1501. else target_table_raw
  1502. )
  1503. # 如果没有指定目标表,使用英文名作为目标表名
  1504. if not target_table:
  1505. target_table = name_en
  1506. # 解析目标schema(默认为public)
  1507. target_schema = "public"
  1508. # 从script_requirement中尝试获取更多信息
  1509. script_requirement = data.get("script_requirement")
  1510. description = data.get("describe", "")
  1511. if script_requirement:
  1512. if isinstance(script_requirement, dict):
  1513. # 如果有rule字段,添加到描述中
  1514. rule = script_requirement.get("rule", "")
  1515. if rule and not description:
  1516. description = rule
  1517. elif isinstance(script_requirement, str):
  1518. try:
  1519. req_json = json.loads(script_requirement)
  1520. if isinstance(req_json, dict):
  1521. rule = req_json.get("rule", "")
  1522. if rule and not description:
  1523. description = rule
  1524. except (json.JSONDecodeError, TypeError):
  1525. pass
  1526. # 调用数据产品服务进行注册
  1527. DataProductService.register_data_product(
  1528. product_name=dataflow_name,
  1529. product_name_en=name_en,
  1530. target_table=target_table,
  1531. target_schema=target_schema,
  1532. description=description,
  1533. source_dataflow_id=dataflow_id,
  1534. source_dataflow_name=dataflow_name,
  1535. created_by=data.get("created_by", "dataflow"),
  1536. )
  1537. logger.info(
  1538. f"数据产品注册成功: {dataflow_name} -> {target_schema}.{target_table}"
  1539. )
  1540. except Exception as e:
  1541. logger.error(f"注册数据产品失败: {str(e)}")
  1542. raise