dataflows.py 80 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017
  1. import contextlib
  2. import json
  3. import logging
  4. import uuid
  5. from datetime import datetime
  6. from pathlib import Path
  7. from typing import Any, Dict, List, Optional, Union
  8. from sqlalchemy import text
  9. from app import db
  10. from app.core.data_service.data_product_service import DataProductService
  11. from app.core.graph.graph_operations import (
  12. connect_graph,
  13. create_or_get_node,
  14. get_node,
  15. relationship_exists,
  16. )
  17. from app.core.meta_data import get_formatted_time, translate_and_parse
  18. logger = logging.getLogger(__name__)
  19. # 项目根目录
  20. PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
  21. class DataFlowService:
  22. """数据流服务类,处理数据流相关的业务逻辑"""
  23. @staticmethod
  24. def get_dataflows(
  25. page: int = 1,
  26. page_size: int = 10,
  27. search: str = "",
  28. ) -> Dict[str, Any]:
  29. """
  30. 获取数据流列表
  31. Args:
  32. page: 页码
  33. page_size: 每页大小
  34. search: 搜索关键词
  35. Returns:
  36. 包含数据流列表和分页信息的字典
  37. """
  38. try:
  39. # 从图数据库查询数据流列表
  40. skip_count = (page - 1) * page_size
  41. # 构建搜索条件
  42. where_clause = ""
  43. params: Dict[str, Union[int, str]] = {
  44. "skip": skip_count,
  45. "limit": page_size,
  46. }
  47. if search:
  48. where_clause = (
  49. "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
  50. )
  51. params["search"] = search
  52. # 查询数据流列表(包含标签数组)
  53. # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
  54. query = f"""
  55. MATCH (n:DataFlow)
  56. {where_clause}
  57. WITH n
  58. ORDER BY n.created_at DESC
  59. SKIP $skip
  60. LIMIT $limit
  61. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  62. RETURN n, id(n) as node_id,
  63. n.created_at as created_at,
  64. collect({{
  65. id: id(label),
  66. name_zh: label.name_zh,
  67. name_en: label.name_en
  68. }}) as tags
  69. """
  70. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  71. try:
  72. with connect_graph().session() as session:
  73. list_result = session.run(query, params).data()
  74. # 查询总数
  75. count_query = f"""
  76. MATCH (n:DataFlow)
  77. {where_clause}
  78. RETURN count(n) as total
  79. """
  80. count_params = {"search": search} if search else {}
  81. count_result = session.run(count_query, count_params).single()
  82. total = count_result["total"] if count_result else 0
  83. except Exception as e:
  84. # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭
  85. # driver,因为connect_graph可能返回单例或新实例。如果是新实例,
  86. # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭,
  87. # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。
  88. # 用户反馈:The driver.close() call prematurely closes a shared
  89. # driver instance,所以直接使用 session,并不关闭 driver。
  90. logger.error(f"查询数据流失败: {str(e)}")
  91. raise e
  92. # 格式化结果
  93. dataflows = []
  94. for record in list_result:
  95. node = record["n"]
  96. dataflow = dict(node)
  97. dataflow["id"] = record["node_id"] # 使用查询返回的node_id
  98. # 处理标签数组,过滤掉空标签
  99. tags = record.get("tags", [])
  100. dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
  101. dataflows.append(dataflow)
  102. return {
  103. "list": dataflows,
  104. "pagination": {
  105. "page": page,
  106. "page_size": page_size,
  107. "total": total,
  108. "total_pages": (total + page_size - 1) // page_size,
  109. },
  110. }
  111. except Exception as e:
  112. logger.error(f"获取数据流列表失败: {str(e)}")
  113. raise e
  114. @staticmethod
  115. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  116. """
  117. 根据ID获取数据流详情
  118. Args:
  119. dataflow_id: 数据流ID
  120. Returns:
  121. 数据流详情字典,如果不存在则返回None
  122. """
  123. try:
  124. # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
  125. neo4j_query = """
  126. MATCH (n:DataFlow)
  127. WHERE id(n) = $dataflow_id
  128. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  129. RETURN n, id(n) as node_id,
  130. collect({
  131. id: id(label),
  132. name_zh: label.name_zh,
  133. name_en: label.name_en
  134. }) as tags
  135. """
  136. with connect_graph().session() as session:
  137. neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
  138. if not neo4j_result:
  139. logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
  140. return None
  141. record = neo4j_result[0]
  142. node = record["n"]
  143. # 将节点属性转换为字典
  144. dataflow = dict(node)
  145. dataflow["id"] = record["node_id"]
  146. # 处理标签数组,过滤掉空标签
  147. tags = record.get("tags", [])
  148. dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
  149. # 处理 script_requirement:如果是JSON字符串,解析为对象
  150. script_requirement_str = dataflow.get("script_requirement", "")
  151. if script_requirement_str:
  152. try:
  153. # 尝试解析JSON字符串
  154. script_requirement_obj = json.loads(script_requirement_str)
  155. dataflow["script_requirement"] = script_requirement_obj
  156. logger.debug(
  157. "成功解析script_requirement: %s",
  158. script_requirement_obj,
  159. )
  160. except (json.JSONDecodeError, TypeError) as e:
  161. logger.warning(f"script_requirement解析失败,保持原值: {e}")
  162. # 保持原值(字符串)
  163. dataflow["script_requirement"] = script_requirement_str
  164. else:
  165. # 如果为空,设置为None
  166. dataflow["script_requirement"] = None
  167. logger.info(
  168. "成功获取DataFlow详情,ID: %s, 名称: %s",
  169. dataflow_id,
  170. dataflow.get("name_zh"),
  171. )
  172. return dataflow
  173. except Exception as e:
  174. logger.error(f"获取数据流详情失败: {str(e)}")
  175. raise e
  176. @staticmethod
  177. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  178. """
  179. 创建新的数据流
  180. Args:
  181. data: 数据流配置数据
  182. Returns:
  183. 创建的数据流信息
  184. """
  185. try:
  186. # 验证必填字段
  187. required_fields = ["name_zh", "describe"]
  188. for field in required_fields:
  189. if field not in data:
  190. raise ValueError(f"缺少必填字段: {field}")
  191. dataflow_name = data["name_zh"]
  192. # 使用LLM翻译名称生成英文名
  193. try:
  194. result_list = translate_and_parse(dataflow_name)
  195. name_en = (
  196. result_list[0]
  197. if result_list
  198. else dataflow_name.lower().replace(" ", "_")
  199. )
  200. except Exception as e:
  201. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  202. name_en = dataflow_name.lower().replace(" ", "_")
  203. # 处理 script_requirement,将其转换为 JSON 字符串
  204. script_requirement = data.get("script_requirement")
  205. if script_requirement is not None:
  206. # 如果是字典或列表,转换为 JSON 字符串
  207. if isinstance(script_requirement, (dict, list)):
  208. script_requirement_str = json.dumps(
  209. script_requirement, ensure_ascii=False
  210. )
  211. else:
  212. # 如果已经是字符串,直接使用
  213. script_requirement_str = str(script_requirement)
  214. else:
  215. script_requirement_str = ""
  216. # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
  217. node_data = {
  218. "name_zh": dataflow_name,
  219. "name_en": name_en,
  220. "category": data.get("category", ""),
  221. "organization": data.get("organization", ""),
  222. "leader": data.get("leader", ""),
  223. "frequency": data.get("frequency", ""),
  224. "describe": data.get("describe", ""),
  225. "status": data.get("status", "inactive"),
  226. "update_mode": data.get("update_mode", "append"),
  227. "script_type": data.get("script_type", "python"),
  228. "script_requirement": script_requirement_str,
  229. "script_path": "", # 脚本路径,任务完成后更新
  230. "created_at": get_formatted_time(),
  231. "updated_at": get_formatted_time(),
  232. }
  233. # 创建或获取数据流节点
  234. dataflow_id = get_node("DataFlow", name=dataflow_name)
  235. if dataflow_id:
  236. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  237. dataflow_id = create_or_get_node("DataFlow", **node_data)
  238. # 处理标签关系(支持多标签数组)
  239. tag_list = data.get("tag", [])
  240. if tag_list:
  241. try:
  242. DataFlowService._handle_tag_relationships(dataflow_id, tag_list)
  243. except Exception as e:
  244. logger.warning(f"处理标签关系时出错: {str(e)}")
  245. # 成功创建图数据库节点后,写入PG数据库
  246. try:
  247. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  248. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  249. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  250. try:
  251. DataFlowService._handle_script_relationships(
  252. data, dataflow_name, name_en
  253. )
  254. logger.info(f"脚本关系创建成功: {dataflow_name}")
  255. except Exception as script_error:
  256. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  257. except Exception as pg_error:
  258. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  259. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  260. # 在实际应用中,可能需要考虑分布式事务
  261. # 返回创建的数据流信息
  262. # 查询创建的节点获取完整信息
  263. query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
  264. with connect_graph().session() as session:
  265. id_result = session.run(query, name_zh=dataflow_name).single()
  266. if id_result:
  267. dataflow_node = id_result["n"]
  268. node_id = id_result["node_id"]
  269. # 将节点属性转换为字典
  270. result = dict(dataflow_node)
  271. result["id"] = node_id
  272. else:
  273. # 如果查询失败,返回基本信息
  274. result = {
  275. "id": (dataflow_id if isinstance(dataflow_id, int) else None),
  276. "name_zh": dataflow_name,
  277. "name_en": name_en,
  278. "created_at": get_formatted_time(),
  279. }
  280. # 注册数据产品到数据服务
  281. try:
  282. DataFlowService._register_data_product(
  283. data=data,
  284. dataflow_name=dataflow_name,
  285. name_en=name_en,
  286. dataflow_id=result.get("id"),
  287. )
  288. logger.info(f"数据产品注册成功: {dataflow_name}")
  289. except Exception as product_error:
  290. logger.warning(f"注册数据产品失败: {str(product_error)}")
  291. # 不影响主流程,仅记录警告
  292. logger.info(f"创建数据流成功: {dataflow_name}")
  293. return result
  294. except Exception as e:
  295. logger.error(f"创建数据流失败: {str(e)}")
  296. raise e
  297. @staticmethod
  298. def _save_to_pg_database(
  299. data: Dict[str, Any],
  300. script_name: str,
  301. name_en: str,
  302. ):
  303. """
  304. 将任务信息保存到PG数据库的task_list表
  305. Args:
  306. data: 包含脚本信息的数据
  307. script_name: 脚本名称
  308. name_en: 英文名称
  309. """
  310. try:
  311. # 提取脚本相关信息
  312. # 处理 script_requirement,确保保存为 JSON 字符串
  313. script_requirement_raw = data.get("script_requirement")
  314. if script_requirement_raw is not None:
  315. if isinstance(script_requirement_raw, (dict, list)):
  316. script_requirement = json.dumps(
  317. script_requirement_raw, ensure_ascii=False
  318. )
  319. else:
  320. script_requirement = str(script_requirement_raw)
  321. else:
  322. script_requirement = ""
  323. # 验证必需字段
  324. if not script_name:
  325. raise ValueError("script_name不能为空")
  326. current_time = datetime.now()
  327. # 保存到task_list表
  328. try:
  329. # 1. 解析script_requirement并构建详细的任务描述
  330. task_description_md = script_requirement
  331. try:
  332. # 尝试解析JSON
  333. try:
  334. req_json = json.loads(script_requirement)
  335. except (json.JSONDecodeError, TypeError):
  336. req_json = None
  337. if isinstance(req_json, dict):
  338. # 1. 从script_requirement中提取rule字段作为request_content_str
  339. request_content_str = req_json.get("rule", "")
  340. # 2. 从script_requirement中提取source_table和
  341. # target_table字段信息
  342. source_table_ids = req_json.get("source_table", [])
  343. target_table_ids = req_json.get("target_table", [])
  344. # 确保是列表格式
  345. if not isinstance(source_table_ids, list):
  346. source_table_ids = (
  347. [source_table_ids] if source_table_ids else []
  348. )
  349. if not isinstance(target_table_ids, list):
  350. target_table_ids = (
  351. [target_table_ids] if target_table_ids else []
  352. )
  353. # 从data参数中提取update_mode
  354. update_mode = data.get("update_mode", "append")
  355. # 生成Business Domain DDLs和数据源信息
  356. source_tables_info = []
  357. target_tables_info = []
  358. if source_table_ids or target_table_ids:
  359. try:
  360. with connect_graph().session() as session:
  361. # 处理source tables
  362. for bd_id in source_table_ids:
  363. ddl_info = DataFlowService._generate_businessdomain_ddl(
  364. session,
  365. bd_id,
  366. is_target=False,
  367. )
  368. if ddl_info:
  369. source_tables_info.append(ddl_info)
  370. # 处理target tables(目标表缺省要有create_time字段)
  371. for bd_id in target_table_ids:
  372. ddl_info = DataFlowService._generate_businessdomain_ddl(
  373. session,
  374. bd_id,
  375. is_target=True,
  376. update_mode=update_mode,
  377. )
  378. if ddl_info:
  379. target_tables_info.append(ddl_info)
  380. except Exception as neo_e:
  381. logger.error(
  382. f"获取BusinessDomain DDL失败: {str(neo_e)}"
  383. )
  384. # 构建Markdown格式的任务描述
  385. task_desc_parts = [f"# Task: {script_name}\n"]
  386. # 添加源表信息(DDL和数据源)
  387. if source_tables_info:
  388. task_desc_parts.append("## Source Tables")
  389. for info in source_tables_info:
  390. task_desc_parts.append(f"### {info['table_name']}")
  391. if info.get("data_source"):
  392. ds = info["data_source"]
  393. task_desc_parts.append("**Data Source**")
  394. task_desc_parts.append(
  395. f"- **Type**: {ds.get('type', 'N/A')}"
  396. )
  397. task_desc_parts.append(
  398. f"- **Host**: {ds.get('host', 'N/A')}"
  399. )
  400. task_desc_parts.append(
  401. f"- **Port**: {ds.get('port', 'N/A')}"
  402. )
  403. task_desc_parts.append(
  404. f"- **Database**: {ds.get('database', 'N/A')}"
  405. )
  406. task_desc_parts.append(
  407. f"- **Schema**: {ds.get('schema', 'N/A')}\n"
  408. )
  409. task_desc_parts.append("**DDL**")
  410. task_desc_parts.append(f"```sql\n{info['ddl']}\n```\n")
  411. # 添加目标表信息(DDL和数据源)
  412. if target_tables_info:
  413. task_desc_parts.append("## Target Tables")
  414. for info in target_tables_info:
  415. task_desc_parts.append(f"### {info['table_name']}")
  416. if info.get("data_source"):
  417. ds = info["data_source"]
  418. task_desc_parts.append("**Data Source**")
  419. task_desc_parts.append(
  420. f"- **Type**: {ds.get('type', 'N/A')}"
  421. )
  422. task_desc_parts.append(
  423. f"- **Host**: {ds.get('host', 'N/A')}"
  424. )
  425. task_desc_parts.append(
  426. f"- **Port**: {ds.get('port', 'N/A')}"
  427. )
  428. task_desc_parts.append(
  429. f"- **Database**: {ds.get('database', 'N/A')}"
  430. )
  431. task_desc_parts.append(
  432. f"- **Schema**: {ds.get('schema', 'N/A')}\n"
  433. )
  434. task_desc_parts.append("**DDL**")
  435. task_desc_parts.append(f"```sql\n{info['ddl']}\n```\n")
  436. # 添加更新模式说明
  437. task_desc_parts.append("## Update Mode")
  438. if update_mode == "append":
  439. task_desc_parts.append("- **Mode**: Append (追加模式)")
  440. task_desc_parts.append(
  441. "- **Description**: 新数据将追加到目标表,不删除现有数据\n"
  442. )
  443. else:
  444. task_desc_parts.append(
  445. "- **Mode**: Full Refresh (全量更新)"
  446. )
  447. task_desc_parts.append(
  448. "- **Description**: 目标表将被清空后重新写入数据\n"
  449. )
  450. # 添加请求内容(rule)
  451. if request_content_str:
  452. task_desc_parts.append("## Request Content")
  453. task_desc_parts.append(f"{request_content_str}\n")
  454. # 添加实施步骤(统一使用数据转换任务步骤)
  455. task_desc_parts.append("## Implementation Steps")
  456. task_desc_parts.append(
  457. "1. Extract data from source tables as specified in the DDL"
  458. )
  459. task_desc_parts.append(
  460. "2. Apply transformation logic according to the rule:"
  461. )
  462. if request_content_str:
  463. task_desc_parts.append(f" - Rule: {request_content_str}")
  464. task_desc_parts.append(
  465. "3. Generate Python program to implement the "
  466. "data transformation logic"
  467. )
  468. task_desc_parts.append(
  469. f"4. Write transformed data to target table "
  470. f"using {update_mode} mode"
  471. )
  472. task_description_md = "\n".join(task_desc_parts)
  473. except Exception as parse_e:
  474. logger.warning(
  475. f"解析任务描述详情失败,使用原始描述: {str(parse_e)}"
  476. )
  477. task_description_md = script_requirement
  478. # 设置 code_path(不包含文件名)
  479. # code_name 需要在获取 task_id 后生成
  480. code_path = "datafactory/scripts"
  481. task_insert_sql = text(
  482. "INSERT INTO public.task_list\n"
  483. "(task_name, task_description, status, code_name, "
  484. "code_path, create_by, create_time, update_time)\n"
  485. "VALUES\n"
  486. "(:task_name, :task_description, :status, :code_name, "
  487. ":code_path, :create_by, :create_time, :update_time)\n"
  488. "RETURNING task_id"
  489. )
  490. task_params = {
  491. "task_name": script_name,
  492. "task_description": task_description_md,
  493. "status": "pending",
  494. "code_name": "", # 暂时为空,等获取 task_id 后更新
  495. "code_path": code_path,
  496. "create_by": "cursor",
  497. "create_time": current_time,
  498. "update_time": current_time,
  499. }
  500. result = db.session.execute(task_insert_sql, task_params)
  501. row = result.fetchone()
  502. task_id = row[0] if row else None
  503. # 根据 task_id 生成脚本文件名
  504. # 格式: task_{task_id}_{task_name}.py(与 auto_execute_tasks 生成的一致)
  505. code_name = f"task_{task_id}_{script_name}.py"
  506. # 更新 code_name 字段
  507. if task_id:
  508. update_sql = text(
  509. "UPDATE public.task_list SET code_name = :code_name "
  510. "WHERE task_id = :task_id"
  511. )
  512. db.session.execute(
  513. update_sql, {"code_name": code_name, "task_id": task_id}
  514. )
  515. db.session.commit()
  516. logger.info(
  517. f"成功将任务信息写入task_list表: "
  518. f"task_id={task_id}, task_name={script_name}, code_name={code_name}"
  519. )
  520. # 自动生成 n8n 工作流 JSON 文件
  521. try:
  522. DataFlowService._generate_n8n_workflow(
  523. script_name=script_name,
  524. code_name=code_name,
  525. code_path=code_path,
  526. update_mode=update_mode,
  527. task_id=task_id,
  528. )
  529. except Exception as wf_error:
  530. logger.warning(f"生成n8n工作流文件失败: {str(wf_error)}")
  531. # 不影响主流程
  532. except Exception as task_error:
  533. db.session.rollback()
  534. logger.error(f"写入task_list表失败: {str(task_error)}")
  535. raise task_error
  536. except Exception as e:
  537. db.session.rollback()
  538. logger.error(f"保存到PG数据库失败: {str(e)}")
  539. raise e
  540. @staticmethod
  541. def _generate_n8n_workflow(
  542. script_name: str,
  543. code_name: str,
  544. code_path: str,
  545. update_mode: str = "append",
  546. task_id: Optional[int] = None,
  547. ) -> Optional[str]:
  548. """
  549. 自动生成 n8n 工作流 JSON 文件
  550. Args:
  551. script_name: 脚本/任务名称
  552. code_name: 代码文件名(如 task_42_DF_DO202601210001.py)
  553. code_path: 代码路径(如 datafactory/scripts)
  554. update_mode: 更新模式
  555. task_id: 关联的任务 ID
  556. Returns:
  557. 生成的工作流文件路径,失败返回 None
  558. """
  559. try:
  560. # 确保工作流目录存在
  561. workflows_dir = PROJECT_ROOT / "datafactory" / "workflows"
  562. workflows_dir.mkdir(parents=True, exist_ok=True)
  563. # 生成工作流文件名(使用任务ID以便于关联)
  564. if task_id:
  565. workflow_filename = f"task_{task_id}_{script_name}_workflow.json"
  566. else:
  567. workflow_filename = f"{script_name}_workflow.json"
  568. workflow_path = workflows_dir / workflow_filename
  569. # 生成唯一ID
  570. def gen_id():
  571. return str(uuid.uuid4())
  572. # 构建完整的 SSH 命令,包含激活 venv
  573. # 注意:由于 n8n 服务器与应用服务器分离,必须使用 SSH 节点
  574. # code_name 已经包含 .py 后缀(如 task_42_DF_DO202601210001.py)
  575. ssh_command = (
  576. f"cd /opt/dataops-platform && source venv/bin/activate && "
  577. f"python {code_path}/{code_name}"
  578. )
  579. workflow_json = {
  580. "name": f"{script_name}_工作流",
  581. "nodes": [
  582. {
  583. "parameters": {
  584. "rule": {
  585. "interval": [
  586. {
  587. "field": "days",
  588. "daysInterval": 1,
  589. "triggerAtHour": 1,
  590. "triggerAtMinute": 0,
  591. }
  592. ]
  593. }
  594. },
  595. "id": gen_id(),
  596. "name": "Schedule Trigger",
  597. "type": "n8n-nodes-base.scheduleTrigger",
  598. "typeVersion": 1.2,
  599. "position": [250, 300],
  600. },
  601. {
  602. "parameters": {
  603. "resource": "command",
  604. "operation": "execute",
  605. "command": ssh_command,
  606. "cwd": "/opt/dataops-platform",
  607. },
  608. "id": gen_id(),
  609. "name": "Execute Script",
  610. "type": "n8n-nodes-base.ssh",
  611. "typeVersion": 1,
  612. "position": [450, 300],
  613. "credentials": {
  614. "sshPassword": {
  615. "id": "pYTwwuyC15caQe6y",
  616. "name": "SSH Password account",
  617. }
  618. },
  619. },
  620. {
  621. "parameters": {
  622. "conditions": {
  623. "options": {
  624. "caseSensitive": True,
  625. "leftValue": "",
  626. "typeValidation": "strict",
  627. },
  628. "conditions": [
  629. {
  630. "id": "condition-success",
  631. "leftValue": "={{ $json.code }}",
  632. "rightValue": 0,
  633. "operator": {
  634. "type": "number",
  635. "operation": "equals",
  636. },
  637. }
  638. ],
  639. "combinator": "and",
  640. }
  641. },
  642. "id": gen_id(),
  643. "name": "Check Result",
  644. "type": "n8n-nodes-base.if",
  645. "typeVersion": 2,
  646. "position": [650, 300],
  647. },
  648. {
  649. "parameters": {
  650. "assignments": {
  651. "assignments": [
  652. {
  653. "id": "result-success",
  654. "name": "status",
  655. "value": "success",
  656. "type": "string",
  657. },
  658. {
  659. "id": "result-message",
  660. "name": "message",
  661. "value": f"{script_name} 执行成功",
  662. "type": "string",
  663. },
  664. {
  665. "id": "result-output",
  666. "name": "output",
  667. "value": "={{ $json.stdout }}",
  668. "type": "string",
  669. },
  670. {
  671. "id": "result-time",
  672. "name": "executionTime",
  673. "value": "={{ $now.toISO() }}",
  674. "type": "string",
  675. },
  676. ]
  677. }
  678. },
  679. "id": gen_id(),
  680. "name": "Success Response",
  681. "type": "n8n-nodes-base.set",
  682. "typeVersion": 3.4,
  683. "position": [850, 200],
  684. },
  685. {
  686. "parameters": {
  687. "assignments": {
  688. "assignments": [
  689. {
  690. "id": "error-status",
  691. "name": "status",
  692. "value": "error",
  693. "type": "string",
  694. },
  695. {
  696. "id": "error-message",
  697. "name": "message",
  698. "value": f"{script_name} 执行失败",
  699. "type": "string",
  700. },
  701. {
  702. "id": "error-output",
  703. "name": "error",
  704. "value": "={{ $json.stderr }}",
  705. "type": "string",
  706. },
  707. {
  708. "id": "error-code",
  709. "name": "exitCode",
  710. "value": "={{ $json.code }}",
  711. "type": "number",
  712. },
  713. {
  714. "id": "error-time",
  715. "name": "executionTime",
  716. "value": "={{ $now.toISO() }}",
  717. "type": "string",
  718. },
  719. ]
  720. }
  721. },
  722. "id": gen_id(),
  723. "name": "Error Response",
  724. "type": "n8n-nodes-base.set",
  725. "typeVersion": 3.4,
  726. "position": [850, 400],
  727. },
  728. ],
  729. "connections": {
  730. "Schedule Trigger": {
  731. "main": [
  732. [
  733. {
  734. "node": "Execute Script",
  735. "type": "main",
  736. "index": 0,
  737. }
  738. ]
  739. ]
  740. },
  741. "Execute Script": {
  742. "main": [
  743. [
  744. {
  745. "node": "Check Result",
  746. "type": "main",
  747. "index": 0,
  748. }
  749. ]
  750. ]
  751. },
  752. "Check Result": {
  753. "main": [
  754. [
  755. {
  756. "node": "Success Response",
  757. "type": "main",
  758. "index": 0,
  759. }
  760. ],
  761. [
  762. {
  763. "node": "Error Response",
  764. "type": "main",
  765. "index": 0,
  766. }
  767. ],
  768. ]
  769. },
  770. },
  771. "active": False,
  772. "settings": {"executionOrder": "v1"},
  773. "versionId": "1",
  774. "meta": {
  775. "templateCredsSetupCompleted": False,
  776. "instanceId": "dataops-platform",
  777. },
  778. "tags": [
  779. {
  780. "createdAt": datetime.now().isoformat() + "Z",
  781. "updatedAt": datetime.now().isoformat() + "Z",
  782. "id": "1",
  783. "name": "数据流程",
  784. }
  785. ],
  786. }
  787. # 写入文件
  788. with open(workflow_path, "w", encoding="utf-8") as f:
  789. json.dump(workflow_json, f, ensure_ascii=False, indent=2)
  790. logger.info(f"成功生成n8n工作流文件: {workflow_path}")
  791. return str(workflow_path)
  792. except Exception as e:
  793. logger.error(f"生成n8n工作流失败: {str(e)}")
  794. return None
  795. @staticmethod
  796. def _handle_children_relationships(dataflow_node, children_ids):
  797. """处理子节点关系"""
  798. logger.debug(
  799. "处理子节点关系,原始children_ids: %s, 类型: %s",
  800. children_ids,
  801. type(children_ids),
  802. )
  803. # 确保children_ids是列表格式
  804. if not isinstance(children_ids, (list, tuple)):
  805. if children_ids is not None:
  806. children_ids = [children_ids] # 如果是单个值,转换为列表
  807. logger.debug(f"将单个值转换为列表: {children_ids}")
  808. else:
  809. children_ids = [] # 如果是None,转换为空列表
  810. logger.debug("将None转换为空列表")
  811. for child_id in children_ids:
  812. try:
  813. # 查找子节点
  814. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  815. with connect_graph().session() as session:
  816. result = session.run(query, child_id=child_id).data()
  817. if result:
  818. # 获取dataflow_node的ID
  819. dataflow_id = getattr(dataflow_node, "identity", None)
  820. if dataflow_id is None:
  821. # 如果没有identity属性,从名称查询ID
  822. query_id = (
  823. "MATCH (n:DataFlow) WHERE n.name_zh = "
  824. "$name_zh RETURN id(n) as node_id"
  825. )
  826. id_result = session.run(
  827. query_id,
  828. name_zh=dataflow_node.get("name_zh"),
  829. ).single()
  830. dataflow_id = id_result["node_id"] if id_result else None
  831. # 创建关系 - 使用ID调用relationship_exists
  832. if dataflow_id and not relationship_exists(
  833. dataflow_id, "child", child_id
  834. ):
  835. session.run(
  836. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  837. "AND id(b) = $child_id "
  838. "CREATE (a)-[:child]->(b)",
  839. dataflow_id=dataflow_id,
  840. child_id=child_id,
  841. )
  842. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  843. except Exception as e:
  844. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  845. @staticmethod
  846. def _handle_tag_relationships(dataflow_id, tag_list):
  847. """
  848. 处理多标签关系
  849. Args:
  850. dataflow_id: 数据流节点ID
  851. tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
  852. """
  853. # 确保tag_list是列表格式
  854. if not isinstance(tag_list, list):
  855. tag_list = [tag_list] if tag_list else []
  856. for tag_item in tag_list:
  857. tag_id = None
  858. if isinstance(tag_item, dict) and "id" in tag_item:
  859. tag_id = int(tag_item["id"])
  860. elif isinstance(tag_item, (int, str)):
  861. with contextlib.suppress(ValueError, TypeError):
  862. tag_id = int(tag_item)
  863. if tag_id:
  864. DataFlowService._handle_single_tag_relationship(dataflow_id, tag_id)
  865. @staticmethod
  866. def _handle_single_tag_relationship(dataflow_id, tag_id):
  867. """处理单个标签关系"""
  868. try:
  869. # 查找标签节点
  870. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  871. with connect_graph().session() as session:
  872. result = session.run(query, tag_id=tag_id).data()
  873. # 创建关系 - 使用ID调用relationship_exists
  874. if (
  875. result
  876. and dataflow_id
  877. and not relationship_exists(dataflow_id, "LABEL", tag_id)
  878. ):
  879. session.run(
  880. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  881. "AND id(b) = $tag_id "
  882. "CREATE (a)-[:LABEL]->(b)",
  883. dataflow_id=dataflow_id,
  884. tag_id=tag_id,
  885. )
  886. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  887. except Exception as e:
  888. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  889. @staticmethod
  890. def update_dataflow_script_path(
  891. dataflow_name: str,
  892. script_path: str,
  893. ) -> bool:
  894. """
  895. 更新 DataFlow 节点的脚本路径
  896. 当任务完成后,将创建的 Python 脚本路径更新到 DataFlow 节点
  897. Args:
  898. dataflow_name: 数据流名称(中文名)
  899. script_path: Python 脚本的完整路径
  900. Returns:
  901. 是否更新成功
  902. """
  903. try:
  904. query = """
  905. MATCH (n:DataFlow {name_zh: $name_zh})
  906. SET n.script_path = $script_path, n.updated_at = $updated_at
  907. RETURN n
  908. """
  909. with connect_graph().session() as session:
  910. result = session.run(
  911. query,
  912. name_zh=dataflow_name,
  913. script_path=script_path,
  914. updated_at=get_formatted_time(),
  915. ).single()
  916. if result:
  917. logger.info(
  918. f"已更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
  919. )
  920. return True
  921. else:
  922. logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
  923. return False
  924. except Exception as e:
  925. logger.error(f"更新 DataFlow 脚本路径失败: {str(e)}")
  926. return False
  927. @staticmethod
  928. def get_script_content(dataflow_id: int) -> Dict[str, Any]:
  929. """
  930. 根据 DataFlow ID 获取关联的脚本内容
  931. Args:
  932. dataflow_id: 数据流ID
  933. Returns:
  934. 包含脚本内容和元信息的字典:
  935. - script_path: 脚本路径
  936. - script_content: 脚本内容
  937. - script_type: 脚本类型(如 python)
  938. - dataflow_name: 数据流名称
  939. Raises:
  940. ValueError: 当 DataFlow 不存在或脚本路径为空时
  941. FileNotFoundError: 当脚本文件不存在时
  942. """
  943. from pathlib import Path
  944. try:
  945. # 从 Neo4j 获取 DataFlow 节点
  946. query = """
  947. MATCH (n:DataFlow)
  948. WHERE id(n) = $dataflow_id
  949. RETURN n, id(n) as node_id
  950. """
  951. with connect_graph().session() as session:
  952. result = session.run(query, dataflow_id=dataflow_id).single()
  953. if not result:
  954. raise ValueError(f"未找到 ID 为 {dataflow_id} 的 DataFlow 节点")
  955. node = result["n"]
  956. node_props = dict(node)
  957. # 获取脚本路径
  958. script_path = node_props.get("script_path", "")
  959. if not script_path:
  960. raise ValueError(
  961. f"DataFlow (ID: {dataflow_id}) 的 script_path 属性为空"
  962. )
  963. # 确定脚本文件的完整路径
  964. # script_path 可能是相对路径或绝对路径
  965. script_file = Path(script_path)
  966. # 如果是相对路径,相对于项目根目录
  967. if not script_file.is_absolute():
  968. # 获取项目根目录(假设 app 目录的父目录是项目根)
  969. project_root = Path(__file__).parent.parent.parent.parent
  970. script_file = project_root / script_path
  971. # 检查文件是否存在
  972. if not script_file.exists():
  973. raise FileNotFoundError(f"脚本文件不存在: {script_file}")
  974. # 读取脚本内容
  975. with script_file.open("r", encoding="utf-8") as f:
  976. script_content = f.read()
  977. # 确定脚本类型
  978. suffix = script_file.suffix.lower()
  979. script_type_map = {
  980. ".py": "python",
  981. ".js": "javascript",
  982. ".ts": "typescript",
  983. ".sql": "sql",
  984. ".sh": "shell",
  985. }
  986. script_type = script_type_map.get(suffix, "text")
  987. logger.info(
  988. f"成功读取脚本内容: DataFlow ID={dataflow_id}, "
  989. f"路径={script_path}, 类型={script_type}"
  990. )
  991. return {
  992. "script_path": script_path,
  993. "script_content": script_content,
  994. "script_type": script_type,
  995. "dataflow_id": dataflow_id,
  996. "dataflow_name": node_props.get("name_zh", ""),
  997. "dataflow_name_en": node_props.get("name_en", ""),
  998. }
  999. except (ValueError, FileNotFoundError):
  1000. raise
  1001. except Exception as e:
  1002. logger.error(f"获取脚本内容失败: {str(e)}")
  1003. raise
  1004. @staticmethod
  1005. def update_dataflow(
  1006. dataflow_id: int,
  1007. data: Dict[str, Any],
  1008. ) -> Optional[Dict[str, Any]]:
  1009. """
  1010. 更新数据流
  1011. Args:
  1012. dataflow_id: 数据流ID
  1013. data: 更新的数据
  1014. Returns:
  1015. 更新后的数据流信息,如果不存在则返回None
  1016. """
  1017. try:
  1018. # 提取 tag 数组(不作为节点属性存储)
  1019. tag_list = data.pop("tag", None)
  1020. # 查找节点
  1021. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  1022. with connect_graph().session() as session:
  1023. result = session.run(query, dataflow_id=dataflow_id).data()
  1024. if not result:
  1025. return None
  1026. # 更新节点属性
  1027. update_fields = []
  1028. params: Dict[str, Any] = {"dataflow_id": dataflow_id}
  1029. for key, value in data.items():
  1030. if key not in ["id", "created_at"]: # 保护字段
  1031. # 复杂对象序列化为 JSON 字符串
  1032. if key in ["config", "script_requirement"] and isinstance(
  1033. value, dict
  1034. ):
  1035. value = json.dumps(value, ensure_ascii=False)
  1036. update_fields.append(f"n.{key} = ${key}")
  1037. params[key] = value
  1038. if update_fields:
  1039. params["updated_at"] = get_formatted_time()
  1040. update_fields.append("n.updated_at = $updated_at")
  1041. update_query = f"""
  1042. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  1043. SET {", ".join(update_fields)}
  1044. RETURN n, id(n) as node_id
  1045. """
  1046. result = session.run(update_query, params).data()
  1047. # 处理 tag 关系(支持多标签数组)
  1048. if tag_list is not None:
  1049. # 确保是列表格式
  1050. if not isinstance(tag_list, list):
  1051. tag_list = [tag_list] if tag_list else []
  1052. # 先删除现有的 LABEL 关系
  1053. delete_query = """
  1054. MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
  1055. WHERE id(n) = $dataflow_id
  1056. DELETE r
  1057. """
  1058. session.run(delete_query, dataflow_id=dataflow_id)
  1059. logger.info(f"删除数据流 {dataflow_id} 的现有标签关系")
  1060. # 为每个 tag 创建新的 LABEL 关系
  1061. for tag_item in tag_list:
  1062. tag_id = None
  1063. if isinstance(tag_item, dict) and "id" in tag_item:
  1064. tag_id = int(tag_item["id"])
  1065. elif isinstance(tag_item, (int, str)):
  1066. with contextlib.suppress(ValueError, TypeError):
  1067. tag_id = int(tag_item)
  1068. if tag_id:
  1069. DataFlowService._handle_single_tag_relationship(
  1070. dataflow_id, tag_id
  1071. )
  1072. if result:
  1073. node = result[0]["n"]
  1074. updated_dataflow = dict(node)
  1075. # 使用查询返回的node_id
  1076. updated_dataflow["id"] = result[0]["node_id"]
  1077. # 查询并添加标签数组到返回数据
  1078. tags_query = """
  1079. MATCH (n:DataFlow)
  1080. WHERE id(n) = $dataflow_id
  1081. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  1082. RETURN collect({
  1083. id: id(label),
  1084. name_zh: label.name_zh,
  1085. name_en: label.name_en
  1086. }) as tags
  1087. """
  1088. tags_result = session.run(
  1089. tags_query, dataflow_id=dataflow_id
  1090. ).single()
  1091. if tags_result:
  1092. tags = tags_result.get("tags", [])
  1093. updated_dataflow["tag"] = [
  1094. tag for tag in tags if tag.get("id") is not None
  1095. ]
  1096. else:
  1097. updated_dataflow["tag"] = []
  1098. logger.info(f"更新数据流成功: ID={dataflow_id}")
  1099. return updated_dataflow
  1100. return None
  1101. except Exception as e:
  1102. logger.error(f"更新数据流失败: {str(e)}")
  1103. raise e
  1104. @staticmethod
  1105. def delete_dataflow(dataflow_id: int) -> bool:
  1106. """
  1107. 删除数据流
  1108. Args:
  1109. dataflow_id: 数据流ID
  1110. Returns:
  1111. 删除是否成功
  1112. """
  1113. try:
  1114. # 删除节点及其关系
  1115. query = """
  1116. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  1117. DETACH DELETE n
  1118. RETURN count(n) as deleted_count
  1119. """
  1120. with connect_graph().session() as session:
  1121. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  1122. result = delete_result["deleted_count"] if delete_result else 0
  1123. if result and result > 0:
  1124. logger.info(f"删除数据流成功: ID={dataflow_id}")
  1125. return True
  1126. return False
  1127. except Exception as e:
  1128. logger.error(f"删除数据流失败: {str(e)}")
  1129. raise e
  1130. @staticmethod
  1131. def execute_dataflow(
  1132. dataflow_id: int,
  1133. params: Optional[Dict[str, Any]] = None,
  1134. ) -> Dict[str, Any]:
  1135. """
  1136. 执行数据流
  1137. Args:
  1138. dataflow_id: 数据流ID
  1139. params: 执行参数
  1140. Returns:
  1141. 执行结果信息
  1142. """
  1143. try:
  1144. # 检查数据流是否存在
  1145. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  1146. with connect_graph().session() as session:
  1147. result = session.run(query, dataflow_id=dataflow_id).data()
  1148. if not result:
  1149. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  1150. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  1151. # TODO: 这里应该实际执行数据流
  1152. # 目前返回模拟结果
  1153. result = {
  1154. "execution_id": execution_id,
  1155. "dataflow_id": dataflow_id,
  1156. "status": "running",
  1157. "started_at": datetime.now().isoformat(),
  1158. "params": params or {},
  1159. "progress": 0,
  1160. }
  1161. logger.info(
  1162. "开始执行数据流: ID=%s, execution_id=%s",
  1163. dataflow_id,
  1164. execution_id,
  1165. )
  1166. return result
  1167. except Exception as e:
  1168. logger.error(f"执行数据流失败: {str(e)}")
  1169. raise e
  1170. @staticmethod
  1171. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  1172. """
  1173. 获取数据流执行状态
  1174. Args:
  1175. dataflow_id: 数据流ID
  1176. Returns:
  1177. 执行状态信息
  1178. """
  1179. try:
  1180. # TODO: 这里应该查询实际的执行状态
  1181. # 目前返回模拟状态
  1182. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  1183. with connect_graph().session() as session:
  1184. result = session.run(query, dataflow_id=dataflow_id).data()
  1185. if not result:
  1186. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  1187. status = ["running", "completed", "failed", "pending"][dataflow_id % 4]
  1188. return {
  1189. "dataflow_id": dataflow_id,
  1190. "status": status,
  1191. "progress": (
  1192. 100 if status == "completed" else (dataflow_id * 10) % 100
  1193. ),
  1194. "started_at": datetime.now().isoformat(),
  1195. "completed_at": (
  1196. datetime.now().isoformat() if status == "completed" else None
  1197. ),
  1198. "error_message": ("执行过程中发生错误" if status == "failed" else None),
  1199. }
  1200. except Exception as e:
  1201. logger.error(f"获取数据流状态失败: {str(e)}")
  1202. raise e
  1203. @staticmethod
  1204. def get_dataflow_logs(
  1205. dataflow_id: int,
  1206. page: int = 1,
  1207. page_size: int = 50,
  1208. ) -> Dict[str, Any]:
  1209. """
  1210. 获取数据流执行日志
  1211. Args:
  1212. dataflow_id: 数据流ID
  1213. page: 页码
  1214. page_size: 每页大小
  1215. Returns:
  1216. 执行日志列表和分页信息
  1217. """
  1218. try:
  1219. # TODO: 这里应该查询实际的执行日志
  1220. # 目前返回模拟日志
  1221. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  1222. with connect_graph().session() as session:
  1223. result = session.run(query, dataflow_id=dataflow_id).data()
  1224. if not result:
  1225. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  1226. mock_logs = [
  1227. {
  1228. "id": i,
  1229. "timestamp": datetime.now().isoformat(),
  1230. "level": ["INFO", "WARNING", "ERROR"][i % 3],
  1231. "message": f"数据流执行日志消息 {i}",
  1232. "component": ["source", "transform", "target"][i % 3],
  1233. }
  1234. for i in range(1, 101)
  1235. ]
  1236. # 分页处理
  1237. total = len(mock_logs)
  1238. start = (page - 1) * page_size
  1239. end = start + page_size
  1240. logs = mock_logs[start:end]
  1241. return {
  1242. "logs": logs,
  1243. "pagination": {
  1244. "page": page,
  1245. "page_size": page_size,
  1246. "total": total,
  1247. "total_pages": (total + page_size - 1) // page_size,
  1248. },
  1249. }
  1250. except Exception as e:
  1251. logger.error(f"获取数据流日志失败: {str(e)}")
  1252. raise e
  1253. # 默认生产环境数据源配置
  1254. DEFAULT_DATA_SOURCE = {
  1255. "type": "postgresql",
  1256. "host": "192.168.3.143",
  1257. "port": 5432,
  1258. "database": "dataops",
  1259. "schema": "dags",
  1260. }
  1261. @staticmethod
  1262. def _generate_businessdomain_ddl(
  1263. session,
  1264. bd_id: int,
  1265. is_target: bool = False,
  1266. update_mode: str = "append",
  1267. ) -> Optional[Dict[str, Any]]:
  1268. """
  1269. 根据BusinessDomain节点ID生成DDL
  1270. Args:
  1271. session: Neo4j session对象
  1272. bd_id: BusinessDomain节点ID
  1273. is_target: 是否为目标表(目标表需要添加create_time字段)
  1274. update_mode: 更新模式(append或full)
  1275. Returns:
  1276. 包含ddl和data_source信息的字典,如果节点不存在则返回None
  1277. data_source始终返回,如果没有COME_FROM关系则使用默认生产环境配置
  1278. """
  1279. try:
  1280. # 查询BusinessDomain节点、元数据、标签关系和数据源关系
  1281. cypher = """
  1282. MATCH (bd:BusinessDomain)
  1283. WHERE id(bd) = $bd_id
  1284. OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
  1285. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1286. OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
  1287. RETURN bd,
  1288. collect(DISTINCT m) as metadata,
  1289. collect(DISTINCT {
  1290. id: id(label),
  1291. name_zh: label.name_zh,
  1292. name_en: label.name_en
  1293. }) as labels,
  1294. ds.type as ds_type,
  1295. ds.host as ds_host,
  1296. ds.port as ds_port,
  1297. ds.database as ds_database,
  1298. ds.schema as ds_schema
  1299. """
  1300. result = session.run(cypher, bd_id=bd_id).single()
  1301. if not result or not result["bd"]:
  1302. logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
  1303. return None
  1304. node = result["bd"]
  1305. metadata = result["metadata"]
  1306. # 生成DDL
  1307. node_props = dict(node)
  1308. table_name = node_props.get("name_en", f"table_{bd_id}")
  1309. ddl_lines = []
  1310. ddl_lines.append(f"CREATE TABLE {table_name} (")
  1311. column_definitions = []
  1312. # 添加元数据列
  1313. if metadata:
  1314. for meta in metadata:
  1315. if meta:
  1316. meta_props = dict(meta)
  1317. column_name = meta_props.get(
  1318. "name_en",
  1319. meta_props.get("name_zh", "unknown_column"),
  1320. )
  1321. data_type = meta_props.get("data_type", "VARCHAR(255)")
  1322. comment = meta_props.get("name_zh", "")
  1323. column_def = f" {column_name} {data_type}"
  1324. if comment:
  1325. column_def += f" COMMENT '{comment}'"
  1326. column_definitions.append(column_def)
  1327. # 如果没有元数据,添加默认主键
  1328. if not column_definitions:
  1329. column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1330. # 如果是目标表,添加create_time字段
  1331. if is_target:
  1332. column_definitions.append(
  1333. " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
  1334. "COMMENT '数据创建时间'"
  1335. )
  1336. ddl_lines.append(",\n".join(column_definitions))
  1337. ddl_lines.append(");")
  1338. # 添加表注释
  1339. table_comment = node_props.get(
  1340. "name_zh", node_props.get("describe", table_name)
  1341. )
  1342. if table_comment and table_comment != table_name:
  1343. ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
  1344. ddl_content = "\n".join(ddl_lines)
  1345. # 始终返回数据源信息
  1346. # 如果通过COME_FROM关系找到了数据源,使用该数据源
  1347. # 否则使用默认的生产环境数据库配置
  1348. if result["ds_type"]:
  1349. data_source = {
  1350. "type": result["ds_type"],
  1351. "host": result["ds_host"],
  1352. "port": result["ds_port"],
  1353. "database": result["ds_database"],
  1354. "schema": result["ds_schema"],
  1355. }
  1356. # 端口验证:确保数据库类型使用正确的端口
  1357. # PostgreSQL 默认端口 5432,MySQL 默认端口 3306
  1358. # 5678 是 n8n 服务端口,不是数据库端口
  1359. ds_type_lower = (result["ds_type"] or "").lower()
  1360. current_port = data_source.get("port")
  1361. # 定义数据库类型与默认端口的映射
  1362. db_default_ports = {
  1363. "postgresql": 5432,
  1364. "postgres": 5432,
  1365. "mysql": 3306,
  1366. "mariadb": 3306,
  1367. "sqlserver": 1433,
  1368. "mssql": 1433,
  1369. "oracle": 1521,
  1370. }
  1371. # 常见的非数据库端口(需要修正)
  1372. invalid_db_ports = {5678, 8080, 80, 443, 8000, 3000}
  1373. if ds_type_lower in db_default_ports:
  1374. expected_port = db_default_ports[ds_type_lower]
  1375. if current_port in invalid_db_ports:
  1376. logger.warning(
  1377. f"检测到数据源端口配置异常: type={ds_type_lower}, "
  1378. f"port={current_port}(疑似非数据库端口),"
  1379. f"已自动修正为默认端口 {expected_port}"
  1380. )
  1381. data_source["port"] = expected_port
  1382. elif current_port is None:
  1383. logger.info(f"数据源端口为空,使用默认端口: {expected_port}")
  1384. data_source["port"] = expected_port
  1385. logger.info(f"通过COME_FROM关系获取到数据源信息: {data_source}")
  1386. else:
  1387. # 使用默认生产环境数据源配置
  1388. data_source = DataFlowService.DEFAULT_DATA_SOURCE.copy()
  1389. logger.info(
  1390. f"未找到COME_FROM关系,使用默认生产环境数据源: {data_source}"
  1391. )
  1392. logger.debug(
  1393. f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}"
  1394. )
  1395. return {
  1396. "ddl": ddl_content,
  1397. "table_name": table_name,
  1398. "data_source": data_source,
  1399. }
  1400. except Exception as e:
  1401. logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
  1402. return None
  1403. @staticmethod
  1404. def _handle_script_relationships(
  1405. data: Dict[str, Any],
  1406. dataflow_name: str,
  1407. name_en: str,
  1408. ):
  1409. """
  1410. 处理脚本关系,在Neo4j图数据库中创建从source BusinessDomain到DataFlow的
  1411. INPUT关系,以及从DataFlow到target BusinessDomain的OUTPUT关系。
  1412. 关系模型:
  1413. - (source:BusinessDomain)-[:INPUT]->(dataflow:DataFlow)
  1414. - (dataflow:DataFlow)-[:OUTPUT]->(target:BusinessDomain)
  1415. Args:
  1416. data: 包含脚本信息的数据字典,应包含script_name, script_type,
  1417. schedule_status, source_table, target_table, update_mode
  1418. """
  1419. try:
  1420. # 从data中读取键值对
  1421. source_table_full = data.get("source_table", "")
  1422. target_table_full = data.get("target_table", "")
  1423. # 处理source_table和target_table的格式
  1424. # 格式: "label:name" 或 直接 "name"
  1425. source_table = (
  1426. source_table_full.split(":")[-1]
  1427. if ":" in source_table_full
  1428. else source_table_full
  1429. )
  1430. target_table = (
  1431. target_table_full.split(":")[-1]
  1432. if ":" in target_table_full
  1433. else target_table_full
  1434. )
  1435. source_label = (
  1436. source_table_full.split(":")[0]
  1437. if ":" in source_table_full
  1438. else "BusinessDomain"
  1439. )
  1440. target_label = (
  1441. target_table_full.split(":")[0]
  1442. if ":" in target_table_full
  1443. else "BusinessDomain"
  1444. )
  1445. # 验证必要字段
  1446. if not source_table or not target_table:
  1447. logger.warning(
  1448. "source_table或target_table为空,跳过关系创建: "
  1449. "source_table=%s, target_table=%s",
  1450. source_table,
  1451. target_table,
  1452. )
  1453. return
  1454. logger.info(
  1455. "开始创建INPUT/OUTPUT关系: %s -[INPUT]-> %s -[OUTPUT]-> %s",
  1456. source_table,
  1457. dataflow_name,
  1458. target_table,
  1459. )
  1460. with connect_graph().session() as session:
  1461. # 步骤1:获取DataFlow节点ID
  1462. dataflow_query = """
  1463. MATCH (df:DataFlow {name_zh: $dataflow_name})
  1464. RETURN id(df) as dataflow_id
  1465. """
  1466. df_result = session.run(
  1467. dataflow_query, # type: ignore[arg-type]
  1468. {"dataflow_name": dataflow_name},
  1469. ).single()
  1470. if not df_result:
  1471. logger.error(f"未找到DataFlow节点: {dataflow_name}")
  1472. return
  1473. dataflow_id = df_result["dataflow_id"]
  1474. # 步骤2:获取或创建source节点
  1475. # 优先通过name_en匹配,其次通过name匹配
  1476. source_query = f"""
  1477. MATCH (source:{source_label})
  1478. WHERE source.name_en = $source_table OR source.name = $source_table
  1479. RETURN id(source) as source_id
  1480. LIMIT 1
  1481. """
  1482. source_result = session.run(
  1483. source_query, # type: ignore[arg-type]
  1484. {"source_table": source_table},
  1485. ).single()
  1486. if not source_result:
  1487. logger.warning(
  1488. "未找到source节点: %s,将创建新节点",
  1489. source_table,
  1490. )
  1491. # 创建source节点
  1492. create_source_query = f"""
  1493. CREATE (source:{source_label} {{
  1494. name: $source_table,
  1495. name_en: $source_table,
  1496. created_at: $created_at,
  1497. type: 'source'
  1498. }})
  1499. RETURN id(source) as source_id
  1500. """
  1501. source_result = session.run(
  1502. create_source_query, # type: ignore[arg-type]
  1503. {
  1504. "source_table": source_table,
  1505. "created_at": get_formatted_time(),
  1506. },
  1507. ).single()
  1508. source_id = source_result["source_id"] if source_result else None
  1509. # 步骤3:获取或创建target节点
  1510. target_query = f"""
  1511. MATCH (target:{target_label})
  1512. WHERE target.name_en = $target_table OR target.name = $target_table
  1513. RETURN id(target) as target_id
  1514. LIMIT 1
  1515. """
  1516. target_result = session.run(
  1517. target_query, # type: ignore[arg-type]
  1518. {"target_table": target_table},
  1519. ).single()
  1520. if not target_result:
  1521. logger.warning(
  1522. "未找到target节点: %s,将创建新节点",
  1523. target_table,
  1524. )
  1525. # 创建target节点
  1526. create_target_query = f"""
  1527. CREATE (target:{target_label} {{
  1528. name: $target_table,
  1529. name_en: $target_table,
  1530. created_at: $created_at,
  1531. type: 'target'
  1532. }})
  1533. RETURN id(target) as target_id
  1534. """
  1535. target_result = session.run(
  1536. create_target_query, # type: ignore[arg-type]
  1537. {
  1538. "target_table": target_table,
  1539. "created_at": get_formatted_time(),
  1540. },
  1541. ).single()
  1542. target_id = target_result["target_id"] if target_result else None
  1543. if not source_id or not target_id:
  1544. logger.error(
  1545. "无法获取source或target节点ID: source_id=%s, target_id=%s",
  1546. source_id,
  1547. target_id,
  1548. )
  1549. return
  1550. # 步骤4:创建 INPUT 关系 (source)-[:INPUT]->(dataflow)
  1551. create_input_query = """
  1552. MATCH (source), (dataflow:DataFlow)
  1553. WHERE id(source) = $source_id AND id(dataflow) = $dataflow_id
  1554. MERGE (source)-[r:INPUT]->(dataflow)
  1555. ON CREATE SET r.created_at = $created_at
  1556. ON MATCH SET r.updated_at = $created_at
  1557. RETURN r
  1558. """
  1559. input_result = session.run(
  1560. create_input_query, # type: ignore[arg-type]
  1561. {
  1562. "source_id": source_id,
  1563. "dataflow_id": dataflow_id,
  1564. "created_at": get_formatted_time(),
  1565. },
  1566. ).single()
  1567. if input_result:
  1568. logger.info(
  1569. "成功创建INPUT关系: %s -> %s",
  1570. source_table,
  1571. dataflow_name,
  1572. )
  1573. else:
  1574. logger.warning(
  1575. "INPUT关系创建失败或已存在: %s -> %s",
  1576. source_table,
  1577. dataflow_name,
  1578. )
  1579. # 步骤5:创建 OUTPUT 关系 (dataflow)-[:OUTPUT]->(target)
  1580. create_output_query = """
  1581. MATCH (dataflow:DataFlow), (target)
  1582. WHERE id(dataflow) = $dataflow_id AND id(target) = $target_id
  1583. MERGE (dataflow)-[r:OUTPUT]->(target)
  1584. ON CREATE SET r.created_at = $created_at
  1585. ON MATCH SET r.updated_at = $created_at
  1586. RETURN r
  1587. """
  1588. output_result = session.run(
  1589. create_output_query, # type: ignore[arg-type]
  1590. {
  1591. "dataflow_id": dataflow_id,
  1592. "target_id": target_id,
  1593. "created_at": get_formatted_time(),
  1594. },
  1595. ).single()
  1596. if output_result:
  1597. logger.info(
  1598. "成功创建OUTPUT关系: %s -> %s",
  1599. dataflow_name,
  1600. target_table,
  1601. )
  1602. else:
  1603. logger.warning(
  1604. "OUTPUT关系创建失败或已存在: %s -> %s",
  1605. dataflow_name,
  1606. target_table,
  1607. )
  1608. logger.info(
  1609. "血缘关系创建完成: %s -[INPUT]-> %s -[OUTPUT]-> %s",
  1610. source_table,
  1611. dataflow_name,
  1612. target_table,
  1613. )
  1614. except Exception as e:
  1615. logger.error(f"处理脚本关系失败: {str(e)}")
  1616. raise e
  1617. @staticmethod
  1618. def get_business_domain_list() -> List[Dict[str, Any]]:
  1619. """
  1620. 获取BusinessDomain节点列表
  1621. Returns:
  1622. BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
  1623. """
  1624. try:
  1625. logger.info("开始查询BusinessDomain节点列表")
  1626. with connect_graph().session() as session:
  1627. # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
  1628. query = """
  1629. MATCH (bd:BusinessDomain)
  1630. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1631. RETURN id(bd) as id,
  1632. bd.name_zh as name_zh,
  1633. bd.name_en as name_en,
  1634. bd.create_time as create_time,
  1635. collect({
  1636. id: id(label),
  1637. name_zh: label.name_zh,
  1638. name_en: label.name_en
  1639. }) as tags
  1640. ORDER BY create_time DESC
  1641. """
  1642. result = session.run(query)
  1643. bd_list = []
  1644. for record in result:
  1645. # 处理标签数组,过滤掉空标签
  1646. tags = record.get("tags", [])
  1647. tag_list = [tag for tag in tags if tag.get("id") is not None]
  1648. bd_item = {
  1649. "id": record["id"],
  1650. "name_zh": record.get("name_zh", "") or "",
  1651. "name_en": record.get("name_en", "") or "",
  1652. "tag": tag_list,
  1653. }
  1654. bd_list.append(bd_item)
  1655. logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
  1656. return bd_list
  1657. except Exception as e:
  1658. logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
  1659. raise e
  1660. @staticmethod
  1661. def _register_data_product(
  1662. data: Dict[str, Any],
  1663. dataflow_name: str,
  1664. name_en: str,
  1665. dataflow_id: Optional[int] = None,
  1666. ) -> None:
  1667. """
  1668. 注册数据产品到数据服务
  1669. 当数据流创建成功后,自动将其注册为数据产品,
  1670. 以便在数据服务模块中展示和管理。
  1671. 从 script_requirement.target_table 中获取 BusinessDomain ID,
  1672. 然后查询 Neo4j 获取对应节点的 name_zh 和 name_en 作为数据产品名称。
  1673. Args:
  1674. data: 数据流配置数据
  1675. dataflow_name: 数据流名称(中文)
  1676. name_en: 数据流英文名
  1677. dataflow_id: 数据流ID(Neo4j节点ID)
  1678. """
  1679. try:
  1680. # 从script_requirement中获取target_table(BusinessDomain ID列表)
  1681. script_requirement = data.get("script_requirement")
  1682. description = data.get("describe", "")
  1683. # 解析 script_requirement
  1684. req_json: Optional[Dict[str, Any]] = None
  1685. if script_requirement:
  1686. if isinstance(script_requirement, dict):
  1687. req_json = script_requirement
  1688. elif isinstance(script_requirement, str):
  1689. try:
  1690. parsed = json.loads(script_requirement)
  1691. if isinstance(parsed, dict):
  1692. req_json = parsed
  1693. except (json.JSONDecodeError, TypeError):
  1694. pass
  1695. # 获取target_table中的BusinessDomain ID列表
  1696. target_bd_ids: List[int] = []
  1697. if req_json:
  1698. target_table_ids = req_json.get("target_table", [])
  1699. if isinstance(target_table_ids, list):
  1700. target_bd_ids = [
  1701. int(bid) for bid in target_table_ids if bid is not None
  1702. ]
  1703. elif target_table_ids is not None:
  1704. target_bd_ids = [int(target_table_ids)]
  1705. # 如果有rule字段,添加到描述中
  1706. rule = req_json.get("rule", "")
  1707. if rule and not description:
  1708. description = rule
  1709. # 如果没有target_table ID,则不注册数据产品
  1710. if not target_bd_ids:
  1711. logger.warning(
  1712. f"数据流 {dataflow_name} 没有指定target_table,跳过数据产品注册"
  1713. )
  1714. return
  1715. # 从Neo4j查询每个BusinessDomain节点的name_zh和name_en,以及关联数据源的schema
  1716. with connect_graph().session() as session:
  1717. for bd_id in target_bd_ids:
  1718. try:
  1719. # 查询BusinessDomain节点信息及其关联的数据源schema
  1720. query = """
  1721. MATCH (bd:BusinessDomain)
  1722. WHERE id(bd) = $bd_id
  1723. OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
  1724. RETURN bd.name_zh as name_zh,
  1725. bd.name_en as name_en,
  1726. bd.describe as describe,
  1727. ds.schema as ds_schema
  1728. """
  1729. result = session.run(query, bd_id=bd_id).single()
  1730. if not result:
  1731. logger.warning(
  1732. f"未找到ID为 {bd_id} 的BusinessDomain节点,跳过"
  1733. )
  1734. continue
  1735. # 使用BusinessDomain节点的name_zh和name_en
  1736. product_name = result.get("name_zh") or ""
  1737. product_name_en = result.get("name_en") or ""
  1738. # 如果没有name_zh,使用name_en
  1739. if not product_name:
  1740. product_name = product_name_en
  1741. # 如果没有name_en,使用name_zh转换
  1742. if not product_name_en:
  1743. product_name_en = product_name.lower().replace(" ", "_")
  1744. # 目标表名使用BusinessDomain的name_en
  1745. target_table = product_name_en
  1746. # 如果BusinessDomain有describe且当前description为空,使用它
  1747. bd_describe = result.get("describe") or ""
  1748. if bd_describe and not description:
  1749. description = bd_describe
  1750. # 从关联的数据源获取schema,如果没有则默认为public
  1751. target_schema = result.get("ds_schema") or "public"
  1752. # 调用数据产品服务进行注册
  1753. DataProductService.register_data_product(
  1754. product_name=product_name,
  1755. product_name_en=product_name_en,
  1756. target_table=target_table,
  1757. target_schema=target_schema,
  1758. description=description,
  1759. source_dataflow_id=dataflow_id,
  1760. source_dataflow_name=dataflow_name,
  1761. created_by=data.get("created_by", "dataflow"),
  1762. )
  1763. logger.info(
  1764. f"数据产品注册成功: {product_name} -> "
  1765. f"{target_schema}.{target_table}"
  1766. )
  1767. except Exception as bd_error:
  1768. logger.error(
  1769. f"处理BusinessDomain {bd_id} 失败: {str(bd_error)}"
  1770. )
  1771. # 继续处理下一个
  1772. except Exception as e:
  1773. logger.error(f"注册数据产品失败: {str(e)}")
  1774. raise