dataflows.py 78 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938
  1. import contextlib
  2. import json
  3. import logging
  4. from datetime import datetime
  5. from typing import Any, Dict, List, Optional, Union
  6. from sqlalchemy import text
  7. from app import db
  8. from app.core.data_service.data_product_service import DataProductService
  9. from app.core.graph.graph_operations import (
  10. connect_graph,
  11. create_or_get_node,
  12. get_node,
  13. relationship_exists,
  14. )
  15. from app.core.llm.llm_service import llm_sql
  16. from app.core.meta_data import get_formatted_time, translate_and_parse
  17. logger = logging.getLogger(__name__)
  18. class DataFlowService:
  19. """数据流服务类,处理数据流相关的业务逻辑"""
  20. @staticmethod
  21. def get_dataflows(
  22. page: int = 1,
  23. page_size: int = 10,
  24. search: str = "",
  25. ) -> Dict[str, Any]:
  26. """
  27. 获取数据流列表
  28. Args:
  29. page: 页码
  30. page_size: 每页大小
  31. search: 搜索关键词
  32. Returns:
  33. 包含数据流列表和分页信息的字典
  34. """
  35. try:
  36. # 从图数据库查询数据流列表
  37. skip_count = (page - 1) * page_size
  38. # 构建搜索条件
  39. where_clause = ""
  40. params: Dict[str, Union[int, str]] = {
  41. "skip": skip_count,
  42. "limit": page_size,
  43. }
  44. if search:
  45. where_clause = (
  46. "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
  47. )
  48. params["search"] = search
  49. # 查询数据流列表(包含标签数组)
  50. # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
  51. query = f"""
  52. MATCH (n:DataFlow)
  53. {where_clause}
  54. WITH n
  55. ORDER BY n.created_at DESC
  56. SKIP $skip
  57. LIMIT $limit
  58. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  59. RETURN n, id(n) as node_id,
  60. n.created_at as created_at,
  61. collect({{
  62. id: id(label),
  63. name_zh: label.name_zh,
  64. name_en: label.name_en
  65. }}) as tags
  66. """
  67. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  68. try:
  69. with connect_graph().session() as session:
  70. list_result = session.run(query, params).data()
  71. # 查询总数
  72. count_query = f"""
  73. MATCH (n:DataFlow)
  74. {where_clause}
  75. RETURN count(n) as total
  76. """
  77. count_params = {"search": search} if search else {}
  78. count_result = session.run(count_query, count_params).single()
  79. total = count_result["total"] if count_result else 0
  80. except Exception as e:
  81. # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭
  82. # driver,因为connect_graph可能返回单例或新实例。如果是新实例,
  83. # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭,
  84. # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。
  85. # 用户反馈:The driver.close() call prematurely closes a shared
  86. # driver instance,所以直接使用 session,并不关闭 driver。
  87. logger.error(f"查询数据流失败: {str(e)}")
  88. raise e
  89. # 格式化结果
  90. dataflows = []
  91. for record in list_result:
  92. node = record["n"]
  93. dataflow = dict(node)
  94. dataflow["id"] = record["node_id"] # 使用查询返回的node_id
  95. # 处理标签数组,过滤掉空标签
  96. tags = record.get("tags", [])
  97. dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
  98. dataflows.append(dataflow)
  99. return {
  100. "list": dataflows,
  101. "pagination": {
  102. "page": page,
  103. "page_size": page_size,
  104. "total": total,
  105. "total_pages": (total + page_size - 1) // page_size,
  106. },
  107. }
  108. except Exception as e:
  109. logger.error(f"获取数据流列表失败: {str(e)}")
  110. raise e
  111. @staticmethod
  112. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  113. """
  114. 根据ID获取数据流详情
  115. Args:
  116. dataflow_id: 数据流ID
  117. Returns:
  118. 数据流详情字典,如果不存在则返回None
  119. """
  120. try:
  121. # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
  122. neo4j_query = """
  123. MATCH (n:DataFlow)
  124. WHERE id(n) = $dataflow_id
  125. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  126. RETURN n, id(n) as node_id,
  127. collect({
  128. id: id(label),
  129. name_zh: label.name_zh,
  130. name_en: label.name_en
  131. }) as tags
  132. """
  133. with connect_graph().session() as session:
  134. neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
  135. if not neo4j_result:
  136. logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
  137. return None
  138. record = neo4j_result[0]
  139. node = record["n"]
  140. # 将节点属性转换为字典
  141. dataflow = dict(node)
  142. dataflow["id"] = record["node_id"]
  143. # 处理标签数组,过滤掉空标签
  144. tags = record.get("tags", [])
  145. dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
  146. # 处理 script_requirement:如果是JSON字符串,解析为对象
  147. script_requirement_str = dataflow.get("script_requirement", "")
  148. if script_requirement_str:
  149. try:
  150. # 尝试解析JSON字符串
  151. script_requirement_obj = json.loads(script_requirement_str)
  152. dataflow["script_requirement"] = script_requirement_obj
  153. logger.debug(
  154. "成功解析script_requirement: %s",
  155. script_requirement_obj,
  156. )
  157. except (json.JSONDecodeError, TypeError) as e:
  158. logger.warning(f"script_requirement解析失败,保持原值: {e}")
  159. # 保持原值(字符串)
  160. dataflow["script_requirement"] = script_requirement_str
  161. else:
  162. # 如果为空,设置为None
  163. dataflow["script_requirement"] = None
  164. logger.info(
  165. "成功获取DataFlow详情,ID: %s, 名称: %s",
  166. dataflow_id,
  167. dataflow.get("name_zh"),
  168. )
  169. return dataflow
  170. except Exception as e:
  171. logger.error(f"获取数据流详情失败: {str(e)}")
  172. raise e
  173. @staticmethod
  174. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  175. """
  176. 创建新的数据流
  177. Args:
  178. data: 数据流配置数据
  179. Returns:
  180. 创建的数据流信息
  181. """
  182. try:
  183. # 验证必填字段
  184. required_fields = ["name_zh", "describe"]
  185. for field in required_fields:
  186. if field not in data:
  187. raise ValueError(f"缺少必填字段: {field}")
  188. dataflow_name = data["name_zh"]
  189. # 使用LLM翻译名称生成英文名
  190. try:
  191. result_list = translate_and_parse(dataflow_name)
  192. name_en = (
  193. result_list[0]
  194. if result_list
  195. else dataflow_name.lower().replace(" ", "_")
  196. )
  197. except Exception as e:
  198. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  199. name_en = dataflow_name.lower().replace(" ", "_")
  200. # 处理 script_requirement,将其转换为 JSON 字符串
  201. script_requirement = data.get("script_requirement")
  202. if script_requirement is not None:
  203. # 如果是字典或列表,转换为 JSON 字符串
  204. if isinstance(script_requirement, (dict, list)):
  205. script_requirement_str = json.dumps(
  206. script_requirement, ensure_ascii=False
  207. )
  208. else:
  209. # 如果已经是字符串,直接使用
  210. script_requirement_str = str(script_requirement)
  211. else:
  212. script_requirement_str = ""
  213. # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
  214. node_data = {
  215. "name_zh": dataflow_name,
  216. "name_en": name_en,
  217. "category": data.get("category", ""),
  218. "organization": data.get("organization", ""),
  219. "leader": data.get("leader", ""),
  220. "frequency": data.get("frequency", ""),
  221. "describe": data.get("describe", ""),
  222. "status": data.get("status", "inactive"),
  223. "update_mode": data.get("update_mode", "append"),
  224. "script_type": data.get("script_type", "python"),
  225. "script_requirement": script_requirement_str,
  226. "created_at": get_formatted_time(),
  227. "updated_at": get_formatted_time(),
  228. }
  229. # 创建或获取数据流节点
  230. dataflow_id = get_node("DataFlow", name=dataflow_name)
  231. if dataflow_id:
  232. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  233. dataflow_id = create_or_get_node("DataFlow", **node_data)
  234. # 处理标签关系(支持多标签数组)
  235. tag_list = data.get("tag", [])
  236. if tag_list:
  237. try:
  238. DataFlowService._handle_tag_relationships(dataflow_id, tag_list)
  239. except Exception as e:
  240. logger.warning(f"处理标签关系时出错: {str(e)}")
  241. # 成功创建图数据库节点后,写入PG数据库
  242. try:
  243. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  244. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  245. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  246. try:
  247. DataFlowService._handle_script_relationships(
  248. data, dataflow_name, name_en
  249. )
  250. logger.info(f"脚本关系创建成功: {dataflow_name}")
  251. except Exception as script_error:
  252. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  253. except Exception as pg_error:
  254. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  255. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  256. # 在实际应用中,可能需要考虑分布式事务
  257. # 返回创建的数据流信息
  258. # 查询创建的节点获取完整信息
  259. query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
  260. with connect_graph().session() as session:
  261. id_result = session.run(query, name_zh=dataflow_name).single()
  262. if id_result:
  263. dataflow_node = id_result["n"]
  264. node_id = id_result["node_id"]
  265. # 将节点属性转换为字典
  266. result = dict(dataflow_node)
  267. result["id"] = node_id
  268. else:
  269. # 如果查询失败,返回基本信息
  270. result = {
  271. "id": (dataflow_id if isinstance(dataflow_id, int) else None),
  272. "name_zh": dataflow_name,
  273. "name_en": name_en,
  274. "created_at": get_formatted_time(),
  275. }
  276. # 注册数据产品到数据服务
  277. try:
  278. DataFlowService._register_data_product(
  279. data=data,
  280. dataflow_name=dataflow_name,
  281. name_en=name_en,
  282. dataflow_id=result.get("id"),
  283. )
  284. logger.info(f"数据产品注册成功: {dataflow_name}")
  285. except Exception as product_error:
  286. logger.warning(f"注册数据产品失败: {str(product_error)}")
  287. # 不影响主流程,仅记录警告
  288. logger.info(f"创建数据流成功: {dataflow_name}")
  289. return result
  290. except Exception as e:
  291. logger.error(f"创建数据流失败: {str(e)}")
  292. raise e
  293. @staticmethod
  294. def _save_to_pg_database(
  295. data: Dict[str, Any],
  296. script_name: str,
  297. name_en: str,
  298. ):
  299. """
  300. 将脚本信息保存到PG数据库
  301. Args:
  302. data: 包含脚本信息的数据
  303. script_name: 脚本名称
  304. name_en: 英文名称
  305. """
  306. try:
  307. # 提取脚本相关信息
  308. # 处理 script_requirement,确保保存为 JSON 字符串
  309. script_requirement_raw = data.get("script_requirement")
  310. # 用于保存从 script_requirement 中提取的 rule
  311. rule_from_requirement = ""
  312. if script_requirement_raw is not None:
  313. # 如果是字典,提取 rule 字段
  314. if isinstance(script_requirement_raw, dict):
  315. rule_from_requirement = script_requirement_raw.get("rule", "")
  316. script_requirement = json.dumps(
  317. script_requirement_raw, ensure_ascii=False
  318. )
  319. elif isinstance(script_requirement_raw, list):
  320. script_requirement = json.dumps(
  321. script_requirement_raw, ensure_ascii=False
  322. )
  323. else:
  324. # 如果已经是字符串,尝试解析以提取 rule
  325. script_requirement = str(script_requirement_raw)
  326. try:
  327. parsed_req = json.loads(script_requirement)
  328. if isinstance(parsed_req, dict):
  329. rule_from_requirement = parsed_req.get("rule", "")
  330. except (json.JSONDecodeError, TypeError):
  331. pass
  332. else:
  333. script_requirement = ""
  334. # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
  335. script_content = data.get("script_content", "")
  336. if not script_content and rule_from_requirement:
  337. script_content = rule_from_requirement
  338. logger.info(
  339. "script_content为空,使用从script_requirement提取的rule: %s",
  340. rule_from_requirement,
  341. )
  342. # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误)
  343. source_table_raw = data.get("source_table") or ""
  344. source_table = (
  345. source_table_raw.split(":")[-1]
  346. if ":" in source_table_raw
  347. else source_table_raw
  348. )
  349. target_table_raw = data.get("target_table") or ""
  350. target_table = (
  351. target_table_raw.split(":")[-1]
  352. if ":" in target_table_raw
  353. else (target_table_raw or name_en)
  354. )
  355. script_type = data.get("script_type", "python")
  356. user_name = data.get("created_by", "system")
  357. target_dt_column = data.get("target_dt_column", "")
  358. # 验证必需字段
  359. if not target_table:
  360. target_table = name_en
  361. if not script_name:
  362. raise ValueError("script_name不能为空")
  363. # 构建插入SQL
  364. insert_sql = text(
  365. """
  366. INSERT INTO dags.data_transform_scripts
  367. (source_table, target_table, script_name, script_type,
  368. script_requirement, script_content, user_name, create_time,
  369. update_time, target_dt_column)
  370. VALUES
  371. (:source_table, :target_table, :script_name, :script_type,
  372. :script_requirement, :script_content, :user_name,
  373. :create_time, :update_time, :target_dt_column)
  374. ON CONFLICT (target_table, script_name)
  375. DO UPDATE SET
  376. source_table = EXCLUDED.source_table,
  377. script_type = EXCLUDED.script_type,
  378. script_requirement = EXCLUDED.script_requirement,
  379. script_content = EXCLUDED.script_content,
  380. user_name = EXCLUDED.user_name,
  381. update_time = EXCLUDED.update_time,
  382. target_dt_column = EXCLUDED.target_dt_column
  383. """
  384. )
  385. # 准备参数
  386. current_time = datetime.now()
  387. params = {
  388. "source_table": source_table,
  389. "target_table": target_table,
  390. "script_name": script_name,
  391. "script_type": script_type,
  392. "script_requirement": script_requirement,
  393. "script_content": script_content,
  394. "user_name": user_name,
  395. "create_time": current_time,
  396. "update_time": current_time,
  397. "target_dt_column": target_dt_column,
  398. }
  399. # 执行插入操作
  400. db.session.execute(insert_sql, params)
  401. # 新增:保存到task_list表
  402. try:
  403. # 1. 解析script_requirement并构建详细的任务描述
  404. task_description_md = script_requirement
  405. try:
  406. # 尝试解析JSON
  407. try:
  408. req_json = json.loads(script_requirement)
  409. except (json.JSONDecodeError, TypeError):
  410. req_json = None
  411. if isinstance(req_json, dict):
  412. # 1. 从script_requirement中提取rule字段作为request_content_str
  413. request_content_str = req_json.get("rule", "")
  414. # 2. 从script_requirement中提取source_table和
  415. # target_table字段信息
  416. source_table_ids = req_json.get("source_table", [])
  417. target_table_ids = req_json.get("target_table", [])
  418. # 确保是列表格式
  419. if not isinstance(source_table_ids, list):
  420. source_table_ids = (
  421. [source_table_ids] if source_table_ids else []
  422. )
  423. if not isinstance(target_table_ids, list):
  424. target_table_ids = (
  425. [target_table_ids] if target_table_ids else []
  426. )
  427. # 合并所有BusinessDomain ID
  428. all_bd_ids = source_table_ids + target_table_ids
  429. # 4. 从data参数中提取update_mode
  430. update_mode = data.get("update_mode", "append")
  431. # 生成Business Domain DDLs
  432. source_ddls = []
  433. target_ddls = []
  434. data_source_info = None
  435. if all_bd_ids:
  436. try:
  437. with connect_graph().session() as session:
  438. # 处理source tables
  439. for bd_id in source_table_ids:
  440. ddl_info = DataFlowService._generate_businessdomain_ddl(
  441. session,
  442. bd_id,
  443. is_target=False,
  444. )
  445. if ddl_info:
  446. source_ddls.append(ddl_info["ddl"])
  447. # 3. 如果BELONGS_TO关系连接的是
  448. # "数据资源",获取数据源信息
  449. if (
  450. ddl_info.get("data_source")
  451. and not data_source_info
  452. ):
  453. data_source_info = ddl_info[
  454. "data_source"
  455. ]
  456. # 处理target tables(5. 目标表缺省要有create_time字段)
  457. for bd_id in target_table_ids:
  458. ddl_info = DataFlowService._generate_businessdomain_ddl(
  459. session,
  460. bd_id,
  461. is_target=True,
  462. update_mode=update_mode,
  463. )
  464. if ddl_info:
  465. target_ddls.append(ddl_info["ddl"])
  466. # 同样检查BELONGS_TO关系,获取数据源信息
  467. if (
  468. ddl_info.get("data_source")
  469. and not data_source_info
  470. ):
  471. data_source_info = ddl_info[
  472. "data_source"
  473. ]
  474. except Exception as neo_e:
  475. logger.error(
  476. f"获取BusinessDomain DDL失败: {str(neo_e)}"
  477. )
  478. # 构建Markdown格式的任务描述
  479. task_desc_parts = [f"# Task: {script_name}\n"]
  480. # 添加数据源信息
  481. if data_source_info:
  482. task_desc_parts.append("## Data Source")
  483. task_desc_parts.append(
  484. f"- **Type**: {data_source_info.get('type', 'N/A')}"
  485. )
  486. task_desc_parts.append(
  487. f"- **Host**: {data_source_info.get('host', 'N/A')}"
  488. )
  489. task_desc_parts.append(
  490. f"- **Port**: {data_source_info.get('port', 'N/A')}"
  491. )
  492. task_desc_parts.append(
  493. f"- **Database**: "
  494. f"{data_source_info.get('database', 'N/A')}\n"
  495. )
  496. # 添加源表DDL
  497. if source_ddls:
  498. task_desc_parts.append("## Source Tables (DDL)")
  499. for ddl in source_ddls:
  500. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  501. # 添加目标表DDL
  502. if target_ddls:
  503. task_desc_parts.append("## Target Tables (DDL)")
  504. for ddl in target_ddls:
  505. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  506. # 添加更新模式说明
  507. task_desc_parts.append("## Update Mode")
  508. if update_mode == "append":
  509. task_desc_parts.append("- **Mode**: Append (追加模式)")
  510. task_desc_parts.append(
  511. "- **Description**: 新数据将追加到目标表,不删除现有数据\n"
  512. )
  513. else:
  514. task_desc_parts.append(
  515. "- **Mode**: Full Refresh (全量更新)"
  516. )
  517. task_desc_parts.append(
  518. "- **Description**: 目标表将被清空后重新写入数据\n"
  519. )
  520. # 添加请求内容(rule)
  521. if request_content_str:
  522. task_desc_parts.append("## Request Content")
  523. task_desc_parts.append(f"{request_content_str}\n")
  524. # 添加实施步骤(根据任务类型优化)
  525. task_desc_parts.append("## Implementation Steps")
  526. # 判断是否为远程数据源导入任务
  527. if data_source_info:
  528. # 从远程数据源导入数据的简化步骤
  529. task_desc_parts.append(
  530. "1. Create an n8n workflow to execute the "
  531. "data import task"
  532. )
  533. task_desc_parts.append(
  534. "2. Configure the workflow to call "
  535. "`import_resource_data.py` Python script"
  536. )
  537. task_desc_parts.append(
  538. "3. Pass the following parameters to the "
  539. "Python execution node:"
  540. )
  541. task_desc_parts.append(
  542. " - `--source-config`: JSON configuration "
  543. "for the remote data source"
  544. )
  545. task_desc_parts.append(
  546. " - `--target-table`: Target table name "
  547. "(data resource English name)"
  548. )
  549. task_desc_parts.append(
  550. f" - `--update-mode`: {update_mode}"
  551. )
  552. task_desc_parts.append(
  553. "4. The Python script will automatically:"
  554. )
  555. task_desc_parts.append(
  556. " - Connect to the remote data source"
  557. )
  558. task_desc_parts.append(
  559. " - Extract data from the source table"
  560. )
  561. task_desc_parts.append(
  562. f" - Write data to target table using "
  563. f"{update_mode} mode"
  564. )
  565. else:
  566. # 数据转换任务的完整步骤
  567. task_desc_parts.append(
  568. "1. Extract data from source tables as "
  569. "specified in the DDL"
  570. )
  571. task_desc_parts.append(
  572. "2. Apply transformation logic according to the rule:"
  573. )
  574. if request_content_str:
  575. task_desc_parts.append(
  576. f" - Rule: {request_content_str}"
  577. )
  578. task_desc_parts.append(
  579. "3. Generate Python program to implement the "
  580. "data transformation logic"
  581. )
  582. task_desc_parts.append(
  583. f"4. Write transformed data to target table "
  584. f"using {update_mode} mode"
  585. )
  586. task_desc_parts.append(
  587. "5. Create an n8n workflow to schedule and "
  588. "execute the Python program"
  589. )
  590. task_description_md = "\n".join(task_desc_parts)
  591. except Exception as parse_e:
  592. logger.warning(
  593. f"解析任务描述详情失败,使用原始描述: {str(parse_e)}"
  594. )
  595. task_description_md = script_requirement
  596. # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
  597. code_path = "app/core/data_flow"
  598. task_insert_sql = text(
  599. "INSERT INTO public.task_list\n"
  600. "(task_name, task_description, status, code_name, "
  601. "code_path, create_by, create_time, update_time)\n"
  602. "VALUES\n"
  603. "(:task_name, :task_description, :status, :code_name, "
  604. ":code_path, :create_by, :create_time, :update_time)"
  605. )
  606. task_params = {
  607. "task_name": script_name,
  608. "task_description": task_description_md,
  609. "status": "pending",
  610. "code_name": script_name,
  611. "code_path": code_path,
  612. "create_by": "cursor",
  613. "create_time": current_time,
  614. "update_time": current_time,
  615. }
  616. # 使用嵌套事务,确保task_list插入失败不影响主流程
  617. with db.session.begin_nested():
  618. db.session.execute(task_insert_sql, task_params)
  619. logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
  620. except Exception as task_error:
  621. # 记录错误但不中断主流程
  622. logger.error(f"写入task_list表失败: {str(task_error)}")
  623. # 如果要求必须成功写入任务列表,则这里应该raise task_error
  624. # raise task_error
  625. db.session.commit()
  626. logger.info(
  627. "成功将脚本信息写入PG数据库: target_table=%s, script_name=%s",
  628. target_table,
  629. script_name,
  630. )
  631. except Exception as e:
  632. db.session.rollback()
  633. logger.error(f"写入PG数据库失败: {str(e)}")
  634. raise e
  635. @staticmethod
  636. def _handle_children_relationships(dataflow_node, children_ids):
  637. """处理子节点关系"""
  638. logger.debug(
  639. "处理子节点关系,原始children_ids: %s, 类型: %s",
  640. children_ids,
  641. type(children_ids),
  642. )
  643. # 确保children_ids是列表格式
  644. if not isinstance(children_ids, (list, tuple)):
  645. if children_ids is not None:
  646. children_ids = [children_ids] # 如果是单个值,转换为列表
  647. logger.debug(f"将单个值转换为列表: {children_ids}")
  648. else:
  649. children_ids = [] # 如果是None,转换为空列表
  650. logger.debug("将None转换为空列表")
  651. for child_id in children_ids:
  652. try:
  653. # 查找子节点
  654. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  655. with connect_graph().session() as session:
  656. result = session.run(query, child_id=child_id).data()
  657. if result:
  658. # 获取dataflow_node的ID
  659. dataflow_id = getattr(dataflow_node, "identity", None)
  660. if dataflow_id is None:
  661. # 如果没有identity属性,从名称查询ID
  662. query_id = (
  663. "MATCH (n:DataFlow) WHERE n.name_zh = "
  664. "$name_zh RETURN id(n) as node_id"
  665. )
  666. id_result = session.run(
  667. query_id,
  668. name_zh=dataflow_node.get("name_zh"),
  669. ).single()
  670. dataflow_id = id_result["node_id"] if id_result else None
  671. # 创建关系 - 使用ID调用relationship_exists
  672. if dataflow_id and not relationship_exists(
  673. dataflow_id, "child", child_id
  674. ):
  675. session.run(
  676. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  677. "AND id(b) = $child_id "
  678. "CREATE (a)-[:child]->(b)",
  679. dataflow_id=dataflow_id,
  680. child_id=child_id,
  681. )
  682. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  683. except Exception as e:
  684. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  685. @staticmethod
  686. def _handle_tag_relationships(dataflow_id, tag_list):
  687. """
  688. 处理多标签关系
  689. Args:
  690. dataflow_id: 数据流节点ID
  691. tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
  692. """
  693. # 确保tag_list是列表格式
  694. if not isinstance(tag_list, list):
  695. tag_list = [tag_list] if tag_list else []
  696. for tag_item in tag_list:
  697. tag_id = None
  698. if isinstance(tag_item, dict) and "id" in tag_item:
  699. tag_id = int(tag_item["id"])
  700. elif isinstance(tag_item, (int, str)):
  701. with contextlib.suppress(ValueError, TypeError):
  702. tag_id = int(tag_item)
  703. if tag_id:
  704. DataFlowService._handle_single_tag_relationship(dataflow_id, tag_id)
  705. @staticmethod
  706. def _handle_single_tag_relationship(dataflow_id, tag_id):
  707. """处理单个标签关系"""
  708. try:
  709. # 查找标签节点
  710. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  711. with connect_graph().session() as session:
  712. result = session.run(query, tag_id=tag_id).data()
  713. # 创建关系 - 使用ID调用relationship_exists
  714. if (
  715. result
  716. and dataflow_id
  717. and not relationship_exists(dataflow_id, "LABEL", tag_id)
  718. ):
  719. session.run(
  720. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  721. "AND id(b) = $tag_id "
  722. "CREATE (a)-[:LABEL]->(b)",
  723. dataflow_id=dataflow_id,
  724. tag_id=tag_id,
  725. )
  726. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  727. except Exception as e:
  728. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  729. @staticmethod
  730. def update_dataflow(
  731. dataflow_id: int,
  732. data: Dict[str, Any],
  733. ) -> Optional[Dict[str, Any]]:
  734. """
  735. 更新数据流
  736. Args:
  737. dataflow_id: 数据流ID
  738. data: 更新的数据
  739. Returns:
  740. 更新后的数据流信息,如果不存在则返回None
  741. """
  742. try:
  743. # 提取 tag 数组(不作为节点属性存储)
  744. tag_list = data.pop("tag", None)
  745. # 查找节点
  746. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  747. with connect_graph().session() as session:
  748. result = session.run(query, dataflow_id=dataflow_id).data()
  749. if not result:
  750. return None
  751. # 更新节点属性
  752. update_fields = []
  753. params: Dict[str, Any] = {"dataflow_id": dataflow_id}
  754. for key, value in data.items():
  755. if key not in ["id", "created_at"]: # 保护字段
  756. # 复杂对象序列化为 JSON 字符串
  757. if key in ["config", "script_requirement"] and isinstance(
  758. value, dict
  759. ):
  760. value = json.dumps(value, ensure_ascii=False)
  761. update_fields.append(f"n.{key} = ${key}")
  762. params[key] = value
  763. if update_fields:
  764. params["updated_at"] = get_formatted_time()
  765. update_fields.append("n.updated_at = $updated_at")
  766. update_query = f"""
  767. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  768. SET {", ".join(update_fields)}
  769. RETURN n, id(n) as node_id
  770. """
  771. result = session.run(update_query, params).data()
  772. # 处理 tag 关系(支持多标签数组)
  773. if tag_list is not None:
  774. # 确保是列表格式
  775. if not isinstance(tag_list, list):
  776. tag_list = [tag_list] if tag_list else []
  777. # 先删除现有的 LABEL 关系
  778. delete_query = """
  779. MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
  780. WHERE id(n) = $dataflow_id
  781. DELETE r
  782. """
  783. session.run(delete_query, dataflow_id=dataflow_id)
  784. logger.info(f"删除数据流 {dataflow_id} 的现有标签关系")
  785. # 为每个 tag 创建新的 LABEL 关系
  786. for tag_item in tag_list:
  787. tag_id = None
  788. if isinstance(tag_item, dict) and "id" in tag_item:
  789. tag_id = int(tag_item["id"])
  790. elif isinstance(tag_item, (int, str)):
  791. with contextlib.suppress(ValueError, TypeError):
  792. tag_id = int(tag_item)
  793. if tag_id:
  794. DataFlowService._handle_single_tag_relationship(
  795. dataflow_id, tag_id
  796. )
  797. if result:
  798. node = result[0]["n"]
  799. updated_dataflow = dict(node)
  800. # 使用查询返回的node_id
  801. updated_dataflow["id"] = result[0]["node_id"]
  802. # 查询并添加标签数组到返回数据
  803. tags_query = """
  804. MATCH (n:DataFlow)
  805. WHERE id(n) = $dataflow_id
  806. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  807. RETURN collect({
  808. id: id(label),
  809. name_zh: label.name_zh,
  810. name_en: label.name_en
  811. }) as tags
  812. """
  813. tags_result = session.run(
  814. tags_query, dataflow_id=dataflow_id
  815. ).single()
  816. if tags_result:
  817. tags = tags_result.get("tags", [])
  818. updated_dataflow["tag"] = [
  819. tag for tag in tags if tag.get("id") is not None
  820. ]
  821. else:
  822. updated_dataflow["tag"] = []
  823. logger.info(f"更新数据流成功: ID={dataflow_id}")
  824. return updated_dataflow
  825. return None
  826. except Exception as e:
  827. logger.error(f"更新数据流失败: {str(e)}")
  828. raise e
  829. @staticmethod
  830. def delete_dataflow(dataflow_id: int) -> bool:
  831. """
  832. 删除数据流
  833. Args:
  834. dataflow_id: 数据流ID
  835. Returns:
  836. 删除是否成功
  837. """
  838. try:
  839. # 删除节点及其关系
  840. query = """
  841. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  842. DETACH DELETE n
  843. RETURN count(n) as deleted_count
  844. """
  845. with connect_graph().session() as session:
  846. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  847. result = delete_result["deleted_count"] if delete_result else 0
  848. if result and result > 0:
  849. logger.info(f"删除数据流成功: ID={dataflow_id}")
  850. return True
  851. return False
  852. except Exception as e:
  853. logger.error(f"删除数据流失败: {str(e)}")
  854. raise e
  855. @staticmethod
  856. def execute_dataflow(
  857. dataflow_id: int,
  858. params: Optional[Dict[str, Any]] = None,
  859. ) -> Dict[str, Any]:
  860. """
  861. 执行数据流
  862. Args:
  863. dataflow_id: 数据流ID
  864. params: 执行参数
  865. Returns:
  866. 执行结果信息
  867. """
  868. try:
  869. # 检查数据流是否存在
  870. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  871. with connect_graph().session() as session:
  872. result = session.run(query, dataflow_id=dataflow_id).data()
  873. if not result:
  874. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  875. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  876. # TODO: 这里应该实际执行数据流
  877. # 目前返回模拟结果
  878. result = {
  879. "execution_id": execution_id,
  880. "dataflow_id": dataflow_id,
  881. "status": "running",
  882. "started_at": datetime.now().isoformat(),
  883. "params": params or {},
  884. "progress": 0,
  885. }
  886. logger.info(
  887. "开始执行数据流: ID=%s, execution_id=%s",
  888. dataflow_id,
  889. execution_id,
  890. )
  891. return result
  892. except Exception as e:
  893. logger.error(f"执行数据流失败: {str(e)}")
  894. raise e
  895. @staticmethod
  896. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  897. """
  898. 获取数据流执行状态
  899. Args:
  900. dataflow_id: 数据流ID
  901. Returns:
  902. 执行状态信息
  903. """
  904. try:
  905. # TODO: 这里应该查询实际的执行状态
  906. # 目前返回模拟状态
  907. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  908. with connect_graph().session() as session:
  909. result = session.run(query, dataflow_id=dataflow_id).data()
  910. if not result:
  911. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  912. status = ["running", "completed", "failed", "pending"][dataflow_id % 4]
  913. return {
  914. "dataflow_id": dataflow_id,
  915. "status": status,
  916. "progress": (
  917. 100 if status == "completed" else (dataflow_id * 10) % 100
  918. ),
  919. "started_at": datetime.now().isoformat(),
  920. "completed_at": (
  921. datetime.now().isoformat() if status == "completed" else None
  922. ),
  923. "error_message": ("执行过程中发生错误" if status == "failed" else None),
  924. }
  925. except Exception as e:
  926. logger.error(f"获取数据流状态失败: {str(e)}")
  927. raise e
  928. @staticmethod
  929. def get_dataflow_logs(
  930. dataflow_id: int,
  931. page: int = 1,
  932. page_size: int = 50,
  933. ) -> Dict[str, Any]:
  934. """
  935. 获取数据流执行日志
  936. Args:
  937. dataflow_id: 数据流ID
  938. page: 页码
  939. page_size: 每页大小
  940. Returns:
  941. 执行日志列表和分页信息
  942. """
  943. try:
  944. # TODO: 这里应该查询实际的执行日志
  945. # 目前返回模拟日志
  946. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  947. with connect_graph().session() as session:
  948. result = session.run(query, dataflow_id=dataflow_id).data()
  949. if not result:
  950. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  951. mock_logs = [
  952. {
  953. "id": i,
  954. "timestamp": datetime.now().isoformat(),
  955. "level": ["INFO", "WARNING", "ERROR"][i % 3],
  956. "message": f"数据流执行日志消息 {i}",
  957. "component": ["source", "transform", "target"][i % 3],
  958. }
  959. for i in range(1, 101)
  960. ]
  961. # 分页处理
  962. total = len(mock_logs)
  963. start = (page - 1) * page_size
  964. end = start + page_size
  965. logs = mock_logs[start:end]
  966. return {
  967. "logs": logs,
  968. "pagination": {
  969. "page": page,
  970. "page_size": page_size,
  971. "total": total,
  972. "total_pages": (total + page_size - 1) // page_size,
  973. },
  974. }
  975. except Exception as e:
  976. logger.error(f"获取数据流日志失败: {str(e)}")
  977. raise e
  978. @staticmethod
  979. def create_script(request_data: Union[Dict[str, Any], str]) -> str:
  980. """
  981. 使用Deepseek模型生成SQL脚本
  982. Args:
  983. request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
  984. Returns:
  985. 生成的SQL脚本内容
  986. """
  987. try:
  988. logger.info(f"开始处理脚本生成请求: {request_data}")
  989. logger.info(f"request_data类型: {type(request_data)}")
  990. # 类型检查和处理
  991. if isinstance(request_data, str):
  992. logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
  993. try:
  994. import json
  995. request_data = json.loads(request_data)
  996. except json.JSONDecodeError as e:
  997. raise ValueError(f"无法解析request_data为JSON: {str(e)}") from e
  998. if not isinstance(request_data, dict):
  999. raise ValueError(
  1000. f"request_data必须是字典类型,实际类型: {type(request_data)}"
  1001. )
  1002. # 1. 从传入的request_data中解析input, output, request_content内容
  1003. input_data = request_data.get("input", "")
  1004. output_data = request_data.get("output", "")
  1005. request_content = request_data.get("request_data", "")
  1006. # 如果request_content是HTML格式,提取纯文本
  1007. if request_content and (
  1008. request_content.startswith("<p>") or "<" in request_content
  1009. ):
  1010. # 简单的HTML标签清理
  1011. import re
  1012. request_content = re.sub(r"<[^>]+>", "", request_content).strip()
  1013. if not input_data or not output_data or not request_content:
  1014. raise ValueError(
  1015. "缺少必要参数:input='{}', output='{}', "
  1016. "request_content='{}' 不能为空".format(
  1017. input_data,
  1018. output_data,
  1019. request_content[:100] if request_content else "",
  1020. )
  1021. )
  1022. logger.info(
  1023. "解析得到 - input: %s, output: %s, request_content: %s",
  1024. input_data,
  1025. output_data,
  1026. request_content,
  1027. )
  1028. # 2. 解析input中的多个数据表并生成源表DDL
  1029. source_tables_ddl = []
  1030. input_tables = []
  1031. if input_data:
  1032. tables = [
  1033. table.strip() for table in input_data.split(",") if table.strip()
  1034. ]
  1035. for table in tables:
  1036. ddl = DataFlowService._parse_table_and_get_ddl(table, "input")
  1037. if ddl:
  1038. input_tables.append(table)
  1039. source_tables_ddl.append(ddl)
  1040. else:
  1041. logger.warning(f"无法获取输入表 {table} 的DDL结构")
  1042. # 3. 解析output中的数据表并生成目标表DDL
  1043. target_table_ddl = ""
  1044. if output_data:
  1045. target_table_ddl = DataFlowService._parse_table_and_get_ddl(
  1046. output_data.strip(), "output"
  1047. )
  1048. if not target_table_ddl:
  1049. logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
  1050. # 4. 按照Deepseek-prompt.txt的框架构建提示语
  1051. prompt_parts = []
  1052. # 开场白 - 角色定义
  1053. prompt_parts.append(
  1054. "你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。"
  1055. "请为以下需求生成一段标准的 PostgreSQL SQL 脚本:"
  1056. )
  1057. # 动态生成源表部分(第1点)
  1058. for i, (table, ddl) in enumerate(zip(input_tables, source_tables_ddl), 1):
  1059. table_name = table.split(":")[-1] if ":" in table else table
  1060. prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
  1061. prompt_parts.append(ddl)
  1062. prompt_parts.append("") # 添加空行分隔
  1063. # 动态生成目标表部分(第2点)
  1064. if target_table_ddl:
  1065. target_table_name = (
  1066. output_data.split(":")[-1] if ":" in output_data else output_data
  1067. )
  1068. next_index = len(input_tables) + 1
  1069. prompt_parts.append(
  1070. f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:"
  1071. )
  1072. prompt_parts.append(target_table_ddl)
  1073. prompt_parts.append("") # 添加空行分隔
  1074. # 动态生成处理逻辑部分(第3点)
  1075. next_index = (
  1076. len(input_tables) + 2 if target_table_ddl else len(input_tables) + 1
  1077. )
  1078. prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
  1079. prompt_parts.append("") # 添加空行分隔
  1080. # 固定的技术要求部分(第4-8点)
  1081. tech_requirements = [
  1082. (
  1083. f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,"
  1084. "适合在 Airflow、Python 脚本、或调度系统中调用;"
  1085. ),
  1086. f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
  1087. f"{next_index + 3}.请直接输出SQL,无需进行解释。",
  1088. (
  1089. f'{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用"_"分隔,'
  1090. "采用蛇形命名法。把sql的名字作为注释写在返回的sql中。"
  1091. ),
  1092. (
  1093. f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期"
  1094. "时间now(),不用处理update_time字段"
  1095. ),
  1096. ]
  1097. prompt_parts.extend(tech_requirements)
  1098. # 组合完整的提示语
  1099. full_prompt = "\n".join(prompt_parts)
  1100. logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
  1101. logger.info(f"完整提示语内容: {full_prompt}")
  1102. # 5. 调用LLM生成SQL脚本
  1103. logger.info("开始调用Deepseek模型生成SQL脚本")
  1104. script_content = llm_sql(full_prompt)
  1105. if not script_content:
  1106. raise ValueError("Deepseek模型返回空内容")
  1107. # 确保返回的是文本格式
  1108. if not isinstance(script_content, str):
  1109. script_content = str(script_content)
  1110. logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
  1111. return script_content
  1112. except Exception as e:
  1113. logger.error(f"生成SQL脚本失败: {str(e)}")
  1114. raise e
  1115. @staticmethod
  1116. def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
  1117. """
  1118. 解析表格式(A:B)并从Neo4j查询元数据生成DDL
  1119. Args:
  1120. table_str: 表格式字符串,格式为"label:name_en"
  1121. table_type: 表类型,用于日志记录(input/output)
  1122. Returns:
  1123. DDL格式的表结构字符串
  1124. """
  1125. try:
  1126. # 解析A:B格式
  1127. if ":" not in table_str:
  1128. logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
  1129. return ""
  1130. parts = table_str.split(":", 1)
  1131. if len(parts) != 2:
  1132. logger.error(f"表格式解析失败: {table_str}")
  1133. return ""
  1134. label = parts[0].strip()
  1135. name_en = parts[1].strip()
  1136. if not label or not name_en:
  1137. logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
  1138. return ""
  1139. logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
  1140. # 从Neo4j查询节点及其关联的元数据
  1141. with connect_graph().session() as session:
  1142. # 查询节点及其关联的元数据
  1143. cypher = f"""
  1144. MATCH (n:{label} {{name_en: $name_en}})
  1145. OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
  1146. RETURN n, collect(m) as metadata
  1147. """
  1148. result = session.run(
  1149. cypher, # type: ignore[arg-type]
  1150. {"name_en": name_en},
  1151. )
  1152. record = result.single()
  1153. if not record:
  1154. logger.error(f"未找到节点: label={label}, name_en={name_en}")
  1155. return ""
  1156. node = record["n"]
  1157. metadata = record["metadata"]
  1158. logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
  1159. # 生成DDL格式的表结构
  1160. ddl_lines = []
  1161. ddl_lines.append(f"CREATE TABLE {name_en} (")
  1162. if metadata:
  1163. column_definitions = []
  1164. for meta in metadata:
  1165. if meta: # 确保meta不为空
  1166. meta_props = dict(meta)
  1167. column_name = meta_props.get(
  1168. "name_en",
  1169. meta_props.get("name_zh", "unknown_column"),
  1170. )
  1171. data_type = meta_props.get("data_type", "VARCHAR(255)")
  1172. comment = meta_props.get("name_zh", "")
  1173. # 构建列定义
  1174. column_def = f" {column_name} {data_type}"
  1175. if comment:
  1176. column_def += f" COMMENT '{comment}'"
  1177. column_definitions.append(column_def)
  1178. if column_definitions:
  1179. ddl_lines.append(",\n".join(column_definitions))
  1180. else:
  1181. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1182. else:
  1183. # 如果没有元数据,添加默认列
  1184. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1185. ddl_lines.append(");")
  1186. # 添加表注释
  1187. node_props = dict(node)
  1188. table_comment = node_props.get(
  1189. "name_zh", node_props.get("describe", name_en)
  1190. )
  1191. if table_comment and table_comment != name_en:
  1192. ddl_lines.append(
  1193. f"COMMENT ON TABLE {name_en} IS '{table_comment}';"
  1194. )
  1195. ddl_content = "\n".join(ddl_lines)
  1196. logger.info(f"{table_type}表DDL生成成功: {name_en}")
  1197. logger.debug(f"生成的DDL: {ddl_content}")
  1198. return ddl_content
  1199. except Exception as e:
  1200. logger.error(f"解析表格式和生成DDL失败: {str(e)}")
  1201. return ""
  1202. @staticmethod
  1203. def _generate_businessdomain_ddl(
  1204. session,
  1205. bd_id: int,
  1206. is_target: bool = False,
  1207. update_mode: str = "append",
  1208. ) -> Optional[Dict[str, Any]]:
  1209. """
  1210. 根据BusinessDomain节点ID生成DDL
  1211. Args:
  1212. session: Neo4j session对象
  1213. bd_id: BusinessDomain节点ID
  1214. is_target: 是否为目标表(目标表需要添加create_time字段)
  1215. update_mode: 更新模式(append或full)
  1216. Returns:
  1217. 包含ddl和data_source信息的字典,如果节点不存在则返回None
  1218. """
  1219. try:
  1220. # 查询BusinessDomain节点、元数据、标签关系和数据源关系
  1221. cypher = """
  1222. MATCH (bd:BusinessDomain)
  1223. WHERE id(bd) = $bd_id
  1224. OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
  1225. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1226. OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
  1227. RETURN bd,
  1228. collect(DISTINCT m) as metadata,
  1229. collect(DISTINCT {
  1230. id: id(label),
  1231. name_zh: label.name_zh,
  1232. name_en: label.name_en
  1233. }) as labels,
  1234. ds.type as ds_type,
  1235. ds.host as ds_host,
  1236. ds.port as ds_port,
  1237. ds.database as ds_database
  1238. """
  1239. result = session.run(cypher, bd_id=bd_id).single()
  1240. if not result or not result["bd"]:
  1241. logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
  1242. return None
  1243. node = result["bd"]
  1244. metadata = result["metadata"]
  1245. # 处理标签数组,获取第一个有效标签名称用于判断
  1246. labels = result.get("labels", [])
  1247. valid_labels = [label for label in labels if label.get("id") is not None]
  1248. label_name = valid_labels[0].get("name_zh") if valid_labels else None
  1249. # 生成DDL
  1250. node_props = dict(node)
  1251. table_name = node_props.get("name_en", f"table_{bd_id}")
  1252. ddl_lines = []
  1253. ddl_lines.append(f"CREATE TABLE {table_name} (")
  1254. column_definitions = []
  1255. # 添加元数据列
  1256. if metadata:
  1257. for meta in metadata:
  1258. if meta:
  1259. meta_props = dict(meta)
  1260. column_name = meta_props.get(
  1261. "name_en",
  1262. meta_props.get("name_zh", "unknown_column"),
  1263. )
  1264. data_type = meta_props.get("data_type", "VARCHAR(255)")
  1265. comment = meta_props.get("name_zh", "")
  1266. column_def = f" {column_name} {data_type}"
  1267. if comment:
  1268. column_def += f" COMMENT '{comment}'"
  1269. column_definitions.append(column_def)
  1270. # 如果没有元数据,添加默认主键
  1271. if not column_definitions:
  1272. column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1273. # 5. 如果是目标表,添加create_time字段
  1274. if is_target:
  1275. column_definitions.append(
  1276. " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
  1277. "COMMENT '数据创建时间'"
  1278. )
  1279. ddl_lines.append(",\n".join(column_definitions))
  1280. ddl_lines.append(");")
  1281. # 添加表注释
  1282. table_comment = node_props.get(
  1283. "name_zh", node_props.get("describe", table_name)
  1284. )
  1285. if table_comment and table_comment != table_name:
  1286. ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
  1287. ddl_content = "\n".join(ddl_lines)
  1288. # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
  1289. data_source = None
  1290. if label_name == "数据资源" and result["ds_type"]:
  1291. data_source = {
  1292. "type": result["ds_type"],
  1293. "host": result["ds_host"],
  1294. "port": result["ds_port"],
  1295. "database": result["ds_database"],
  1296. }
  1297. logger.info(f"获取到数据源信息: {data_source}")
  1298. logger.debug(
  1299. f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}"
  1300. )
  1301. return {
  1302. "ddl": ddl_content,
  1303. "table_name": table_name,
  1304. "data_source": data_source,
  1305. }
  1306. except Exception as e:
  1307. logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
  1308. return None
  1309. @staticmethod
  1310. def _handle_script_relationships(
  1311. data: Dict[str, Any],
  1312. dataflow_name: str,
  1313. name_en: str,
  1314. ):
  1315. """
  1316. 处理脚本关系,在Neo4j图数据库中创建从source BusinessDomain到DataFlow的
  1317. INPUT关系,以及从DataFlow到target BusinessDomain的OUTPUT关系。
  1318. 关系模型:
  1319. - (source:BusinessDomain)-[:INPUT]->(dataflow:DataFlow)
  1320. - (dataflow:DataFlow)-[:OUTPUT]->(target:BusinessDomain)
  1321. Args:
  1322. data: 包含脚本信息的数据字典,应包含script_name, script_type,
  1323. schedule_status, source_table, target_table, update_mode
  1324. """
  1325. try:
  1326. # 从data中读取键值对
  1327. source_table_full = data.get("source_table", "")
  1328. target_table_full = data.get("target_table", "")
  1329. # 处理source_table和target_table的格式
  1330. # 格式: "label:name" 或 直接 "name"
  1331. source_table = (
  1332. source_table_full.split(":")[-1]
  1333. if ":" in source_table_full
  1334. else source_table_full
  1335. )
  1336. target_table = (
  1337. target_table_full.split(":")[-1]
  1338. if ":" in target_table_full
  1339. else target_table_full
  1340. )
  1341. source_label = (
  1342. source_table_full.split(":")[0]
  1343. if ":" in source_table_full
  1344. else "BusinessDomain"
  1345. )
  1346. target_label = (
  1347. target_table_full.split(":")[0]
  1348. if ":" in target_table_full
  1349. else "BusinessDomain"
  1350. )
  1351. # 验证必要字段
  1352. if not source_table or not target_table:
  1353. logger.warning(
  1354. "source_table或target_table为空,跳过关系创建: "
  1355. "source_table=%s, target_table=%s",
  1356. source_table,
  1357. target_table,
  1358. )
  1359. return
  1360. logger.info(
  1361. "开始创建INPUT/OUTPUT关系: %s -[INPUT]-> %s -[OUTPUT]-> %s",
  1362. source_table,
  1363. dataflow_name,
  1364. target_table,
  1365. )
  1366. with connect_graph().session() as session:
  1367. # 步骤1:获取DataFlow节点ID
  1368. dataflow_query = """
  1369. MATCH (df:DataFlow {name_zh: $dataflow_name})
  1370. RETURN id(df) as dataflow_id
  1371. """
  1372. df_result = session.run(
  1373. dataflow_query, # type: ignore[arg-type]
  1374. {"dataflow_name": dataflow_name},
  1375. ).single()
  1376. if not df_result:
  1377. logger.error(f"未找到DataFlow节点: {dataflow_name}")
  1378. return
  1379. dataflow_id = df_result["dataflow_id"]
  1380. # 步骤2:获取或创建source节点
  1381. # 优先通过name_en匹配,其次通过name匹配
  1382. source_query = f"""
  1383. MATCH (source:{source_label})
  1384. WHERE source.name_en = $source_table OR source.name = $source_table
  1385. RETURN id(source) as source_id
  1386. LIMIT 1
  1387. """
  1388. source_result = session.run(
  1389. source_query, # type: ignore[arg-type]
  1390. {"source_table": source_table},
  1391. ).single()
  1392. if not source_result:
  1393. logger.warning(
  1394. "未找到source节点: %s,将创建新节点",
  1395. source_table,
  1396. )
  1397. # 创建source节点
  1398. create_source_query = f"""
  1399. CREATE (source:{source_label} {{
  1400. name: $source_table,
  1401. name_en: $source_table,
  1402. created_at: $created_at,
  1403. type: 'source'
  1404. }})
  1405. RETURN id(source) as source_id
  1406. """
  1407. source_result = session.run(
  1408. create_source_query, # type: ignore[arg-type]
  1409. {
  1410. "source_table": source_table,
  1411. "created_at": get_formatted_time(),
  1412. },
  1413. ).single()
  1414. source_id = source_result["source_id"] if source_result else None
  1415. # 步骤3:获取或创建target节点
  1416. target_query = f"""
  1417. MATCH (target:{target_label})
  1418. WHERE target.name_en = $target_table OR target.name = $target_table
  1419. RETURN id(target) as target_id
  1420. LIMIT 1
  1421. """
  1422. target_result = session.run(
  1423. target_query, # type: ignore[arg-type]
  1424. {"target_table": target_table},
  1425. ).single()
  1426. if not target_result:
  1427. logger.warning(
  1428. "未找到target节点: %s,将创建新节点",
  1429. target_table,
  1430. )
  1431. # 创建target节点
  1432. create_target_query = f"""
  1433. CREATE (target:{target_label} {{
  1434. name: $target_table,
  1435. name_en: $target_table,
  1436. created_at: $created_at,
  1437. type: 'target'
  1438. }})
  1439. RETURN id(target) as target_id
  1440. """
  1441. target_result = session.run(
  1442. create_target_query, # type: ignore[arg-type]
  1443. {
  1444. "target_table": target_table,
  1445. "created_at": get_formatted_time(),
  1446. },
  1447. ).single()
  1448. target_id = target_result["target_id"] if target_result else None
  1449. if not source_id or not target_id:
  1450. logger.error(
  1451. "无法获取source或target节点ID: source_id=%s, target_id=%s",
  1452. source_id,
  1453. target_id,
  1454. )
  1455. return
  1456. # 步骤4:创建 INPUT 关系 (source)-[:INPUT]->(dataflow)
  1457. create_input_query = """
  1458. MATCH (source), (dataflow:DataFlow)
  1459. WHERE id(source) = $source_id AND id(dataflow) = $dataflow_id
  1460. MERGE (source)-[r:INPUT]->(dataflow)
  1461. ON CREATE SET r.created_at = $created_at
  1462. ON MATCH SET r.updated_at = $created_at
  1463. RETURN r
  1464. """
  1465. input_result = session.run(
  1466. create_input_query, # type: ignore[arg-type]
  1467. {
  1468. "source_id": source_id,
  1469. "dataflow_id": dataflow_id,
  1470. "created_at": get_formatted_time(),
  1471. },
  1472. ).single()
  1473. if input_result:
  1474. logger.info(
  1475. "成功创建INPUT关系: %s -> %s",
  1476. source_table,
  1477. dataflow_name,
  1478. )
  1479. else:
  1480. logger.warning(
  1481. "INPUT关系创建失败或已存在: %s -> %s",
  1482. source_table,
  1483. dataflow_name,
  1484. )
  1485. # 步骤5:创建 OUTPUT 关系 (dataflow)-[:OUTPUT]->(target)
  1486. create_output_query = """
  1487. MATCH (dataflow:DataFlow), (target)
  1488. WHERE id(dataflow) = $dataflow_id AND id(target) = $target_id
  1489. MERGE (dataflow)-[r:OUTPUT]->(target)
  1490. ON CREATE SET r.created_at = $created_at
  1491. ON MATCH SET r.updated_at = $created_at
  1492. RETURN r
  1493. """
  1494. output_result = session.run(
  1495. create_output_query, # type: ignore[arg-type]
  1496. {
  1497. "dataflow_id": dataflow_id,
  1498. "target_id": target_id,
  1499. "created_at": get_formatted_time(),
  1500. },
  1501. ).single()
  1502. if output_result:
  1503. logger.info(
  1504. "成功创建OUTPUT关系: %s -> %s",
  1505. dataflow_name,
  1506. target_table,
  1507. )
  1508. else:
  1509. logger.warning(
  1510. "OUTPUT关系创建失败或已存在: %s -> %s",
  1511. dataflow_name,
  1512. target_table,
  1513. )
  1514. logger.info(
  1515. "血缘关系创建完成: %s -[INPUT]-> %s -[OUTPUT]-> %s",
  1516. source_table,
  1517. dataflow_name,
  1518. target_table,
  1519. )
  1520. except Exception as e:
  1521. logger.error(f"处理脚本关系失败: {str(e)}")
  1522. raise e
  1523. @staticmethod
  1524. def get_business_domain_list() -> List[Dict[str, Any]]:
  1525. """
  1526. 获取BusinessDomain节点列表
  1527. Returns:
  1528. BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
  1529. """
  1530. try:
  1531. logger.info("开始查询BusinessDomain节点列表")
  1532. with connect_graph().session() as session:
  1533. # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
  1534. query = """
  1535. MATCH (bd:BusinessDomain)
  1536. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1537. RETURN id(bd) as id,
  1538. bd.name_zh as name_zh,
  1539. bd.name_en as name_en,
  1540. bd.create_time as create_time,
  1541. collect({
  1542. id: id(label),
  1543. name_zh: label.name_zh,
  1544. name_en: label.name_en
  1545. }) as tags
  1546. ORDER BY create_time DESC
  1547. """
  1548. result = session.run(query)
  1549. bd_list = []
  1550. for record in result:
  1551. # 处理标签数组,过滤掉空标签
  1552. tags = record.get("tags", [])
  1553. tag_list = [tag for tag in tags if tag.get("id") is not None]
  1554. bd_item = {
  1555. "id": record["id"],
  1556. "name_zh": record.get("name_zh", "") or "",
  1557. "name_en": record.get("name_en", "") or "",
  1558. "tag": tag_list,
  1559. }
  1560. bd_list.append(bd_item)
  1561. logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
  1562. return bd_list
  1563. except Exception as e:
  1564. logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
  1565. raise e
  1566. @staticmethod
  1567. def _register_data_product(
  1568. data: Dict[str, Any],
  1569. dataflow_name: str,
  1570. name_en: str,
  1571. dataflow_id: Optional[int] = None,
  1572. ) -> None:
  1573. """
  1574. 注册数据产品到数据服务
  1575. 当数据流创建成功后,自动将其注册为数据产品,
  1576. 以便在数据服务模块中展示和管理。
  1577. 从 script_requirement.target_table 中获取 BusinessDomain ID,
  1578. 然后查询 Neo4j 获取对应节点的 name_zh 和 name_en 作为数据产品名称。
  1579. Args:
  1580. data: 数据流配置数据
  1581. dataflow_name: 数据流名称(中文)
  1582. name_en: 数据流英文名
  1583. dataflow_id: 数据流ID(Neo4j节点ID)
  1584. """
  1585. try:
  1586. # 从script_requirement中获取target_table(BusinessDomain ID列表)
  1587. script_requirement = data.get("script_requirement")
  1588. description = data.get("describe", "")
  1589. # 解析 script_requirement
  1590. req_json: Optional[Dict[str, Any]] = None
  1591. if script_requirement:
  1592. if isinstance(script_requirement, dict):
  1593. req_json = script_requirement
  1594. elif isinstance(script_requirement, str):
  1595. try:
  1596. parsed = json.loads(script_requirement)
  1597. if isinstance(parsed, dict):
  1598. req_json = parsed
  1599. except (json.JSONDecodeError, TypeError):
  1600. pass
  1601. # 获取target_table中的BusinessDomain ID列表
  1602. target_bd_ids: List[int] = []
  1603. if req_json:
  1604. target_table_ids = req_json.get("target_table", [])
  1605. if isinstance(target_table_ids, list):
  1606. target_bd_ids = [
  1607. int(bid) for bid in target_table_ids if bid is not None
  1608. ]
  1609. elif target_table_ids is not None:
  1610. target_bd_ids = [int(target_table_ids)]
  1611. # 如果有rule字段,添加到描述中
  1612. rule = req_json.get("rule", "")
  1613. if rule and not description:
  1614. description = rule
  1615. # 如果没有target_table ID,则不注册数据产品
  1616. if not target_bd_ids:
  1617. logger.warning(
  1618. f"数据流 {dataflow_name} 没有指定target_table,跳过数据产品注册"
  1619. )
  1620. return
  1621. # 从Neo4j查询每个BusinessDomain节点的name_zh和name_en
  1622. with connect_graph().session() as session:
  1623. for bd_id in target_bd_ids:
  1624. try:
  1625. query = """
  1626. MATCH (bd:BusinessDomain)
  1627. WHERE id(bd) = $bd_id
  1628. RETURN bd.name_zh as name_zh,
  1629. bd.name_en as name_en,
  1630. bd.describe as describe
  1631. """
  1632. result = session.run(query, bd_id=bd_id).single()
  1633. if not result:
  1634. logger.warning(
  1635. f"未找到ID为 {bd_id} 的BusinessDomain节点,跳过"
  1636. )
  1637. continue
  1638. # 使用BusinessDomain节点的name_zh和name_en
  1639. product_name = result.get("name_zh") or ""
  1640. product_name_en = result.get("name_en") or ""
  1641. # 如果没有name_zh,使用name_en
  1642. if not product_name:
  1643. product_name = product_name_en
  1644. # 如果没有name_en,使用name_zh转换
  1645. if not product_name_en:
  1646. product_name_en = product_name.lower().replace(" ", "_")
  1647. # 目标表名使用BusinessDomain的name_en
  1648. target_table = product_name_en
  1649. # 如果BusinessDomain有describe且当前description为空,使用它
  1650. bd_describe = result.get("describe") or ""
  1651. if bd_describe and not description:
  1652. description = bd_describe
  1653. # 解析目标schema(默认为public)
  1654. target_schema = "public"
  1655. # 调用数据产品服务进行注册
  1656. DataProductService.register_data_product(
  1657. product_name=product_name,
  1658. product_name_en=product_name_en,
  1659. target_table=target_table,
  1660. target_schema=target_schema,
  1661. description=description,
  1662. source_dataflow_id=dataflow_id,
  1663. source_dataflow_name=dataflow_name,
  1664. created_by=data.get("created_by", "dataflow"),
  1665. )
  1666. logger.info(
  1667. f"数据产品注册成功: {product_name} -> "
  1668. f"{target_schema}.{target_table}"
  1669. )
  1670. except Exception as bd_error:
  1671. logger.error(
  1672. f"处理BusinessDomain {bd_id} 失败: {str(bd_error)}"
  1673. )
  1674. # 继续处理下一个
  1675. except Exception as e:
  1676. logger.error(f"注册数据产品失败: {str(e)}")
  1677. raise