dataflows.py 79 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997
  1. import contextlib
  2. import json
  3. import logging
  4. import uuid
  5. from datetime import datetime
  6. from pathlib import Path
  7. from typing import Any, Dict, List, Optional, Union
  8. from sqlalchemy import text
  9. from app import db
  10. from app.core.data_service.data_product_service import DataProductService
  11. from app.core.graph.graph_operations import (
  12. connect_graph,
  13. create_or_get_node,
  14. get_node,
  15. relationship_exists,
  16. )
  17. from app.core.meta_data import get_formatted_time, translate_and_parse
  18. logger = logging.getLogger(__name__)
  19. # 项目根目录
  20. PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
  21. class DataFlowService:
  22. """数据流服务类,处理数据流相关的业务逻辑"""
  23. @staticmethod
  24. def get_dataflows(
  25. page: int = 1,
  26. page_size: int = 10,
  27. search: str = "",
  28. ) -> Dict[str, Any]:
  29. """
  30. 获取数据流列表
  31. Args:
  32. page: 页码
  33. page_size: 每页大小
  34. search: 搜索关键词
  35. Returns:
  36. 包含数据流列表和分页信息的字典
  37. """
  38. try:
  39. # 从图数据库查询数据流列表
  40. skip_count = (page - 1) * page_size
  41. # 构建搜索条件
  42. where_clause = ""
  43. params: Dict[str, Union[int, str]] = {
  44. "skip": skip_count,
  45. "limit": page_size,
  46. }
  47. if search:
  48. where_clause = (
  49. "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
  50. )
  51. params["search"] = search
  52. # 查询数据流列表(包含标签数组)
  53. # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
  54. query = f"""
  55. MATCH (n:DataFlow)
  56. {where_clause}
  57. WITH n
  58. ORDER BY n.created_at DESC
  59. SKIP $skip
  60. LIMIT $limit
  61. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  62. RETURN n, id(n) as node_id,
  63. n.created_at as created_at,
  64. collect({{
  65. id: id(label),
  66. name_zh: label.name_zh,
  67. name_en: label.name_en
  68. }}) as tags
  69. """
  70. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  71. try:
  72. with connect_graph().session() as session:
  73. list_result = session.run(query, params).data()
  74. # 查询总数
  75. count_query = f"""
  76. MATCH (n:DataFlow)
  77. {where_clause}
  78. RETURN count(n) as total
  79. """
  80. count_params = {"search": search} if search else {}
  81. count_result = session.run(count_query, count_params).single()
  82. total = count_result["total"] if count_result else 0
  83. except Exception as e:
  84. # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭
  85. # driver,因为connect_graph可能返回单例或新实例。如果是新实例,
  86. # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭,
  87. # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。
  88. # 用户反馈:The driver.close() call prematurely closes a shared
  89. # driver instance,所以直接使用 session,并不关闭 driver。
  90. logger.error(f"查询数据流失败: {str(e)}")
  91. raise e
  92. # 格式化结果
  93. dataflows = []
  94. for record in list_result:
  95. node = record["n"]
  96. dataflow = dict(node)
  97. dataflow["id"] = record["node_id"] # 使用查询返回的node_id
  98. # 处理标签数组,过滤掉空标签
  99. tags = record.get("tags", [])
  100. dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
  101. dataflows.append(dataflow)
  102. return {
  103. "list": dataflows,
  104. "pagination": {
  105. "page": page,
  106. "page_size": page_size,
  107. "total": total,
  108. "total_pages": (total + page_size - 1) // page_size,
  109. },
  110. }
  111. except Exception as e:
  112. logger.error(f"获取数据流列表失败: {str(e)}")
  113. raise e
  114. @staticmethod
  115. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  116. """
  117. 根据ID获取数据流详情
  118. Args:
  119. dataflow_id: 数据流ID
  120. Returns:
  121. 数据流详情字典,如果不存在则返回None
  122. """
  123. try:
  124. # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
  125. neo4j_query = """
  126. MATCH (n:DataFlow)
  127. WHERE id(n) = $dataflow_id
  128. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  129. RETURN n, id(n) as node_id,
  130. collect({
  131. id: id(label),
  132. name_zh: label.name_zh,
  133. name_en: label.name_en
  134. }) as tags
  135. """
  136. with connect_graph().session() as session:
  137. neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
  138. if not neo4j_result:
  139. logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
  140. return None
  141. record = neo4j_result[0]
  142. node = record["n"]
  143. # 将节点属性转换为字典
  144. dataflow = dict(node)
  145. dataflow["id"] = record["node_id"]
  146. # 处理标签数组,过滤掉空标签
  147. tags = record.get("tags", [])
  148. dataflow["tag"] = [tag for tag in tags if tag.get("id") is not None]
  149. # 处理 script_requirement:如果是JSON字符串,解析为对象
  150. script_requirement_str = dataflow.get("script_requirement", "")
  151. if script_requirement_str:
  152. try:
  153. # 尝试解析JSON字符串
  154. script_requirement_obj = json.loads(script_requirement_str)
  155. dataflow["script_requirement"] = script_requirement_obj
  156. logger.debug(
  157. "成功解析script_requirement: %s",
  158. script_requirement_obj,
  159. )
  160. except (json.JSONDecodeError, TypeError) as e:
  161. logger.warning(f"script_requirement解析失败,保持原值: {e}")
  162. # 保持原值(字符串)
  163. dataflow["script_requirement"] = script_requirement_str
  164. else:
  165. # 如果为空,设置为None
  166. dataflow["script_requirement"] = None
  167. logger.info(
  168. "成功获取DataFlow详情,ID: %s, 名称: %s",
  169. dataflow_id,
  170. dataflow.get("name_zh"),
  171. )
  172. return dataflow
  173. except Exception as e:
  174. logger.error(f"获取数据流详情失败: {str(e)}")
  175. raise e
  176. @staticmethod
  177. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  178. """
  179. 创建新的数据流
  180. Args:
  181. data: 数据流配置数据
  182. Returns:
  183. 创建的数据流信息
  184. """
  185. try:
  186. # 验证必填字段
  187. required_fields = ["name_zh", "describe"]
  188. for field in required_fields:
  189. if field not in data:
  190. raise ValueError(f"缺少必填字段: {field}")
  191. dataflow_name = data["name_zh"]
  192. # 使用LLM翻译名称生成英文名
  193. try:
  194. result_list = translate_and_parse(dataflow_name)
  195. name_en = (
  196. result_list[0]
  197. if result_list
  198. else dataflow_name.lower().replace(" ", "_")
  199. )
  200. except Exception as e:
  201. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  202. name_en = dataflow_name.lower().replace(" ", "_")
  203. # 处理 script_requirement,将其转换为 JSON 字符串
  204. script_requirement = data.get("script_requirement")
  205. if script_requirement is not None:
  206. # 如果是字典或列表,转换为 JSON 字符串
  207. if isinstance(script_requirement, (dict, list)):
  208. script_requirement_str = json.dumps(
  209. script_requirement, ensure_ascii=False
  210. )
  211. else:
  212. # 如果已经是字符串,直接使用
  213. script_requirement_str = str(script_requirement)
  214. else:
  215. script_requirement_str = ""
  216. # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
  217. node_data = {
  218. "name_zh": dataflow_name,
  219. "name_en": name_en,
  220. "category": data.get("category", ""),
  221. "organization": data.get("organization", ""),
  222. "leader": data.get("leader", ""),
  223. "frequency": data.get("frequency", ""),
  224. "describe": data.get("describe", ""),
  225. "status": data.get("status", "inactive"),
  226. "update_mode": data.get("update_mode", "append"),
  227. "script_type": data.get("script_type", "python"),
  228. "script_requirement": script_requirement_str,
  229. "script_path": "", # 脚本路径,任务完成后更新
  230. "created_at": get_formatted_time(),
  231. "updated_at": get_formatted_time(),
  232. }
  233. # 创建或获取数据流节点
  234. dataflow_id = get_node("DataFlow", name=dataflow_name)
  235. if dataflow_id:
  236. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  237. dataflow_id = create_or_get_node("DataFlow", **node_data)
  238. # 处理标签关系(支持多标签数组)
  239. tag_list = data.get("tag", [])
  240. if tag_list:
  241. try:
  242. DataFlowService._handle_tag_relationships(dataflow_id, tag_list)
  243. except Exception as e:
  244. logger.warning(f"处理标签关系时出错: {str(e)}")
  245. # 成功创建图数据库节点后,写入PG数据库
  246. try:
  247. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  248. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  249. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  250. try:
  251. DataFlowService._handle_script_relationships(
  252. data, dataflow_name, name_en
  253. )
  254. logger.info(f"脚本关系创建成功: {dataflow_name}")
  255. except Exception as script_error:
  256. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  257. except Exception as pg_error:
  258. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  259. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  260. # 在实际应用中,可能需要考虑分布式事务
  261. # 返回创建的数据流信息
  262. # 查询创建的节点获取完整信息
  263. query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
  264. with connect_graph().session() as session:
  265. id_result = session.run(query, name_zh=dataflow_name).single()
  266. if id_result:
  267. dataflow_node = id_result["n"]
  268. node_id = id_result["node_id"]
  269. # 将节点属性转换为字典
  270. result = dict(dataflow_node)
  271. result["id"] = node_id
  272. else:
  273. # 如果查询失败,返回基本信息
  274. result = {
  275. "id": (dataflow_id if isinstance(dataflow_id, int) else None),
  276. "name_zh": dataflow_name,
  277. "name_en": name_en,
  278. "created_at": get_formatted_time(),
  279. }
  280. # 注册数据产品到数据服务
  281. try:
  282. DataFlowService._register_data_product(
  283. data=data,
  284. dataflow_name=dataflow_name,
  285. name_en=name_en,
  286. dataflow_id=result.get("id"),
  287. )
  288. logger.info(f"数据产品注册成功: {dataflow_name}")
  289. except Exception as product_error:
  290. logger.warning(f"注册数据产品失败: {str(product_error)}")
  291. # 不影响主流程,仅记录警告
  292. logger.info(f"创建数据流成功: {dataflow_name}")
  293. return result
  294. except Exception as e:
  295. logger.error(f"创建数据流失败: {str(e)}")
  296. raise e
  297. @staticmethod
  298. def _save_to_pg_database(
  299. data: Dict[str, Any],
  300. script_name: str,
  301. name_en: str,
  302. ):
  303. """
  304. 将任务信息保存到PG数据库的task_list表
  305. Args:
  306. data: 包含脚本信息的数据
  307. script_name: 脚本名称
  308. name_en: 英文名称
  309. """
  310. from app.config.config import config, current_env
  311. try:
  312. # 获取当前环境的配置
  313. current_config = config.get(current_env, config["default"])
  314. dataflow_schema = getattr(current_config, "DATAFLOW_SCHEMA", "dags")
  315. # 提取脚本相关信息
  316. # 处理 script_requirement,确保保存为 JSON 字符串
  317. script_requirement_raw = data.get("script_requirement")
  318. if script_requirement_raw is not None:
  319. if isinstance(script_requirement_raw, (dict, list)):
  320. script_requirement = json.dumps(
  321. script_requirement_raw, ensure_ascii=False
  322. )
  323. else:
  324. script_requirement = str(script_requirement_raw)
  325. else:
  326. script_requirement = ""
  327. # 验证必需字段
  328. if not script_name:
  329. raise ValueError("script_name不能为空")
  330. current_time = datetime.now()
  331. # 保存到task_list表
  332. try:
  333. # 1. 解析script_requirement并构建详细的任务描述
  334. task_description_md = script_requirement
  335. try:
  336. # 尝试解析JSON
  337. try:
  338. req_json = json.loads(script_requirement)
  339. except (json.JSONDecodeError, TypeError):
  340. req_json = None
  341. if isinstance(req_json, dict):
  342. # 1. 从script_requirement中提取rule字段作为request_content_str
  343. request_content_str = req_json.get("rule", "")
  344. # 2. 从script_requirement中提取source_table和
  345. # target_table字段信息
  346. source_table_ids = req_json.get("source_table", [])
  347. target_table_ids = req_json.get("target_table", [])
  348. # 确保是列表格式
  349. if not isinstance(source_table_ids, list):
  350. source_table_ids = (
  351. [source_table_ids] if source_table_ids else []
  352. )
  353. if not isinstance(target_table_ids, list):
  354. target_table_ids = (
  355. [target_table_ids] if target_table_ids else []
  356. )
  357. # 合并所有BusinessDomain ID
  358. all_bd_ids = source_table_ids + target_table_ids
  359. # 4. 从data参数中提取update_mode
  360. update_mode = data.get("update_mode", "append")
  361. # 生成Business Domain DDLs
  362. source_ddls = []
  363. target_ddls = []
  364. data_source_info = None
  365. if all_bd_ids:
  366. try:
  367. with connect_graph().session() as session:
  368. # 处理source tables
  369. for bd_id in source_table_ids:
  370. ddl_info = DataFlowService._generate_businessdomain_ddl(
  371. session,
  372. bd_id,
  373. is_target=False,
  374. )
  375. if ddl_info:
  376. source_ddls.append(ddl_info["ddl"])
  377. # 3. 如果BELONGS_TO关系连接的是
  378. # "数据资源",获取数据源信息
  379. if (
  380. ddl_info.get("data_source")
  381. and not data_source_info
  382. ):
  383. data_source_info = ddl_info[
  384. "data_source"
  385. ]
  386. # 处理target tables(5. 目标表缺省要有create_time字段)
  387. for bd_id in target_table_ids:
  388. ddl_info = DataFlowService._generate_businessdomain_ddl(
  389. session,
  390. bd_id,
  391. is_target=True,
  392. update_mode=update_mode,
  393. )
  394. if ddl_info:
  395. target_ddls.append(ddl_info["ddl"])
  396. # 同样检查BELONGS_TO关系,获取数据源信息
  397. if (
  398. ddl_info.get("data_source")
  399. and not data_source_info
  400. ):
  401. data_source_info = ddl_info[
  402. "data_source"
  403. ]
  404. except Exception as neo_e:
  405. logger.error(
  406. f"获取BusinessDomain DDL失败: {str(neo_e)}"
  407. )
  408. # 构建Markdown格式的任务描述
  409. task_desc_parts = [f"# Task: {script_name}\n"]
  410. # 添加DataFlow Schema配置信息
  411. task_desc_parts.append("## DataFlow Configuration")
  412. task_desc_parts.append(f"- **Schema**: {dataflow_schema}\n")
  413. # 添加数据源信息
  414. if data_source_info:
  415. task_desc_parts.append("## Data Source")
  416. task_desc_parts.append(
  417. f"- **Type**: {data_source_info.get('type', 'N/A')}"
  418. )
  419. task_desc_parts.append(
  420. f"- **Host**: {data_source_info.get('host', 'N/A')}"
  421. )
  422. task_desc_parts.append(
  423. f"- **Port**: {data_source_info.get('port', 'N/A')}"
  424. )
  425. task_desc_parts.append(
  426. f"- **Database**: "
  427. f"{data_source_info.get('database', 'N/A')}\n"
  428. )
  429. # 添加源表DDL
  430. if source_ddls:
  431. task_desc_parts.append("## Source Tables (DDL)")
  432. for ddl in source_ddls:
  433. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  434. # 添加目标表DDL
  435. if target_ddls:
  436. task_desc_parts.append("## Target Tables (DDL)")
  437. for ddl in target_ddls:
  438. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  439. # 添加更新模式说明
  440. task_desc_parts.append("## Update Mode")
  441. if update_mode == "append":
  442. task_desc_parts.append("- **Mode**: Append (追加模式)")
  443. task_desc_parts.append(
  444. "- **Description**: 新数据将追加到目标表,不删除现有数据\n"
  445. )
  446. else:
  447. task_desc_parts.append(
  448. "- **Mode**: Full Refresh (全量更新)"
  449. )
  450. task_desc_parts.append(
  451. "- **Description**: 目标表将被清空后重新写入数据\n"
  452. )
  453. # 添加请求内容(rule)
  454. if request_content_str:
  455. task_desc_parts.append("## Request Content")
  456. task_desc_parts.append(f"{request_content_str}\n")
  457. # 添加实施步骤(根据任务类型优化)
  458. task_desc_parts.append("## Implementation Steps")
  459. # 判断是否为远程数据源导入任务
  460. if data_source_info:
  461. # 从远程数据源导入数据的简化步骤
  462. task_desc_parts.append(
  463. "1. Create an n8n workflow to execute the "
  464. "data import task"
  465. )
  466. task_desc_parts.append(
  467. "2. Configure the workflow to call "
  468. "`import_resource_data.py` Python script"
  469. )
  470. task_desc_parts.append(
  471. "3. Pass the following parameters to the "
  472. "Python execution node:"
  473. )
  474. task_desc_parts.append(
  475. " - `--source-config`: JSON configuration "
  476. "for the remote data source"
  477. )
  478. task_desc_parts.append(
  479. " - `--target-table`: Target table name "
  480. "(data resource English name)"
  481. )
  482. task_desc_parts.append(
  483. f" - `--update-mode`: {update_mode}"
  484. )
  485. task_desc_parts.append(
  486. "4. The Python script will automatically:"
  487. )
  488. task_desc_parts.append(
  489. " - Connect to the remote data source"
  490. )
  491. task_desc_parts.append(
  492. " - Extract data from the source table"
  493. )
  494. task_desc_parts.append(
  495. f" - Write data to target table using "
  496. f"{update_mode} mode"
  497. )
  498. else:
  499. # 数据转换任务的完整步骤
  500. task_desc_parts.append(
  501. "1. Extract data from source tables as "
  502. "specified in the DDL"
  503. )
  504. task_desc_parts.append(
  505. "2. Apply transformation logic according to the rule:"
  506. )
  507. if request_content_str:
  508. task_desc_parts.append(
  509. f" - Rule: {request_content_str}"
  510. )
  511. task_desc_parts.append(
  512. "3. Generate Python program to implement the "
  513. "data transformation logic"
  514. )
  515. task_desc_parts.append(
  516. f"4. Write transformed data to target table "
  517. f"using {update_mode} mode"
  518. )
  519. task_desc_parts.append(
  520. "5. Create an n8n workflow to schedule and "
  521. "execute the Python program"
  522. )
  523. task_description_md = "\n".join(task_desc_parts)
  524. except Exception as parse_e:
  525. logger.warning(
  526. f"解析任务描述详情失败,使用原始描述: {str(parse_e)}"
  527. )
  528. task_description_md = script_requirement
  529. # 判断任务类型并设置code_path和code_name
  530. # 如果是远程数据源导入任务,使用通用的import_resource_data.py脚本
  531. if data_source_info:
  532. # 远程数据源导入任务
  533. code_path = "datafactory/scripts"
  534. code_name = "import_resource_data.py"
  535. logger.info(
  536. f"检测到远程数据源导入任务,使用通用脚本: "
  537. f"{code_path}/{code_name}"
  538. )
  539. else:
  540. # 数据转换任务,需要生成专用脚本
  541. code_path = "datafactory/scripts"
  542. code_name = script_name
  543. task_insert_sql = text(
  544. "INSERT INTO public.task_list\n"
  545. "(task_name, task_description, status, code_name, "
  546. "code_path, create_by, create_time, update_time)\n"
  547. "VALUES\n"
  548. "(:task_name, :task_description, :status, :code_name, "
  549. ":code_path, :create_by, :create_time, :update_time)"
  550. )
  551. task_params = {
  552. "task_name": script_name,
  553. "task_description": task_description_md,
  554. "status": "pending",
  555. "code_name": code_name,
  556. "code_path": code_path,
  557. "create_by": "cursor",
  558. "create_time": current_time,
  559. "update_time": current_time,
  560. }
  561. db.session.execute(task_insert_sql, task_params)
  562. db.session.commit()
  563. logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
  564. # 自动生成 n8n 工作流 JSON 文件
  565. try:
  566. DataFlowService._generate_n8n_workflow(
  567. script_name=script_name,
  568. code_name=code_name,
  569. code_path=code_path,
  570. update_mode=update_mode,
  571. is_import_task=bool(data_source_info),
  572. )
  573. except Exception as wf_error:
  574. logger.warning(f"生成n8n工作流文件失败: {str(wf_error)}")
  575. # 不影响主流程
  576. except Exception as task_error:
  577. db.session.rollback()
  578. logger.error(f"写入task_list表失败: {str(task_error)}")
  579. raise task_error
  580. except Exception as e:
  581. db.session.rollback()
  582. logger.error(f"保存到PG数据库失败: {str(e)}")
  583. raise e
  584. @staticmethod
  585. def _generate_n8n_workflow(
  586. script_name: str,
  587. code_name: str,
  588. code_path: str,
  589. update_mode: str = "append",
  590. is_import_task: bool = False,
  591. ) -> Optional[str]:
  592. """
  593. 自动生成 n8n 工作流 JSON 文件
  594. Args:
  595. script_name: 脚本/任务名称
  596. code_name: 代码文件名
  597. code_path: 代码路径
  598. update_mode: 更新模式
  599. is_import_task: 是否为数据导入任务
  600. Returns:
  601. 生成的工作流文件路径,失败返回 None
  602. """
  603. try:
  604. # 确保工作流目录存在
  605. workflows_dir = PROJECT_ROOT / "datafactory" / "workflows"
  606. workflows_dir.mkdir(parents=True, exist_ok=True)
  607. # 生成工作流文件名
  608. workflow_filename = f"{script_name}_workflow.json"
  609. workflow_path = workflows_dir / workflow_filename
  610. # 生成唯一ID
  611. def gen_id():
  612. return str(uuid.uuid4())
  613. # 构建完整的 SSH 命令,包含激活 venv
  614. # 注意:由于 n8n 服务器与应用服务器分离,必须使用 SSH 节点
  615. ssh_command = (
  616. f"cd /opt/dataops-platform && source venv/bin/activate && "
  617. f"python {code_path}/{code_name}"
  618. )
  619. workflow_json = {
  620. "name": f"{script_name}_工作流",
  621. "nodes": [
  622. {
  623. "parameters": {},
  624. "id": gen_id(),
  625. "name": "Manual Trigger",
  626. "type": "n8n-nodes-base.manualTrigger",
  627. "typeVersion": 1,
  628. "position": [250, 300],
  629. },
  630. {
  631. "parameters": {
  632. "resource": "command",
  633. "operation": "execute",
  634. "command": ssh_command,
  635. "cwd": "/opt/dataops-platform",
  636. },
  637. "id": gen_id(),
  638. "name": "Execute Script",
  639. "type": "n8n-nodes-base.ssh",
  640. "typeVersion": 1,
  641. "position": [450, 300],
  642. "credentials": {
  643. "sshPassword": {
  644. "id": "pYTwwuyC15caQe6y",
  645. "name": "SSH Password account",
  646. }
  647. },
  648. },
  649. {
  650. "parameters": {
  651. "conditions": {
  652. "options": {
  653. "caseSensitive": True,
  654. "leftValue": "",
  655. "typeValidation": "strict",
  656. },
  657. "conditions": [
  658. {
  659. "id": "condition-success",
  660. "leftValue": "={{ $json.code }}",
  661. "rightValue": 0,
  662. "operator": {
  663. "type": "number",
  664. "operation": "equals",
  665. },
  666. }
  667. ],
  668. "combinator": "and",
  669. }
  670. },
  671. "id": gen_id(),
  672. "name": "Check Result",
  673. "type": "n8n-nodes-base.if",
  674. "typeVersion": 2,
  675. "position": [650, 300],
  676. },
  677. {
  678. "parameters": {
  679. "assignments": {
  680. "assignments": [
  681. {
  682. "id": "result-success",
  683. "name": "status",
  684. "value": "success",
  685. "type": "string",
  686. },
  687. {
  688. "id": "result-message",
  689. "name": "message",
  690. "value": f"{script_name} 执行成功",
  691. "type": "string",
  692. },
  693. {
  694. "id": "result-output",
  695. "name": "output",
  696. "value": "={{ $json.stdout }}",
  697. "type": "string",
  698. },
  699. {
  700. "id": "result-time",
  701. "name": "executionTime",
  702. "value": "={{ $now.toISO() }}",
  703. "type": "string",
  704. },
  705. ]
  706. }
  707. },
  708. "id": gen_id(),
  709. "name": "Success Response",
  710. "type": "n8n-nodes-base.set",
  711. "typeVersion": 3.4,
  712. "position": [850, 200],
  713. },
  714. {
  715. "parameters": {
  716. "assignments": {
  717. "assignments": [
  718. {
  719. "id": "error-status",
  720. "name": "status",
  721. "value": "error",
  722. "type": "string",
  723. },
  724. {
  725. "id": "error-message",
  726. "name": "message",
  727. "value": f"{script_name} 执行失败",
  728. "type": "string",
  729. },
  730. {
  731. "id": "error-output",
  732. "name": "error",
  733. "value": "={{ $json.stderr }}",
  734. "type": "string",
  735. },
  736. {
  737. "id": "error-code",
  738. "name": "exitCode",
  739. "value": "={{ $json.code }}",
  740. "type": "number",
  741. },
  742. {
  743. "id": "error-time",
  744. "name": "executionTime",
  745. "value": "={{ $now.toISO() }}",
  746. "type": "string",
  747. },
  748. ]
  749. }
  750. },
  751. "id": gen_id(),
  752. "name": "Error Response",
  753. "type": "n8n-nodes-base.set",
  754. "typeVersion": 3.4,
  755. "position": [850, 400],
  756. },
  757. ],
  758. "connections": {
  759. "Manual Trigger": {
  760. "main": [
  761. [
  762. {
  763. "node": "Execute Script",
  764. "type": "main",
  765. "index": 0,
  766. }
  767. ]
  768. ]
  769. },
  770. "Execute Script": {
  771. "main": [
  772. [
  773. {
  774. "node": "Check Result",
  775. "type": "main",
  776. "index": 0,
  777. }
  778. ]
  779. ]
  780. },
  781. "Check Result": {
  782. "main": [
  783. [
  784. {
  785. "node": "Success Response",
  786. "type": "main",
  787. "index": 0,
  788. }
  789. ],
  790. [
  791. {
  792. "node": "Error Response",
  793. "type": "main",
  794. "index": 0,
  795. }
  796. ],
  797. ]
  798. },
  799. },
  800. "active": False,
  801. "settings": {"executionOrder": "v1"},
  802. "versionId": "1",
  803. "meta": {
  804. "templateCredsSetupCompleted": False,
  805. "instanceId": "dataops-platform",
  806. },
  807. "tags": [
  808. {
  809. "createdAt": datetime.now().isoformat() + "Z",
  810. "updatedAt": datetime.now().isoformat() + "Z",
  811. "id": "1",
  812. "name": "数据导入" if is_import_task else "数据流程",
  813. }
  814. ],
  815. }
  816. # 写入文件
  817. with open(workflow_path, "w", encoding="utf-8") as f:
  818. json.dump(workflow_json, f, ensure_ascii=False, indent=2)
  819. logger.info(f"成功生成n8n工作流文件: {workflow_path}")
  820. return str(workflow_path)
  821. except Exception as e:
  822. logger.error(f"生成n8n工作流失败: {str(e)}")
  823. return None
  824. @staticmethod
  825. def _handle_children_relationships(dataflow_node, children_ids):
  826. """处理子节点关系"""
  827. logger.debug(
  828. "处理子节点关系,原始children_ids: %s, 类型: %s",
  829. children_ids,
  830. type(children_ids),
  831. )
  832. # 确保children_ids是列表格式
  833. if not isinstance(children_ids, (list, tuple)):
  834. if children_ids is not None:
  835. children_ids = [children_ids] # 如果是单个值,转换为列表
  836. logger.debug(f"将单个值转换为列表: {children_ids}")
  837. else:
  838. children_ids = [] # 如果是None,转换为空列表
  839. logger.debug("将None转换为空列表")
  840. for child_id in children_ids:
  841. try:
  842. # 查找子节点
  843. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  844. with connect_graph().session() as session:
  845. result = session.run(query, child_id=child_id).data()
  846. if result:
  847. # 获取dataflow_node的ID
  848. dataflow_id = getattr(dataflow_node, "identity", None)
  849. if dataflow_id is None:
  850. # 如果没有identity属性,从名称查询ID
  851. query_id = (
  852. "MATCH (n:DataFlow) WHERE n.name_zh = "
  853. "$name_zh RETURN id(n) as node_id"
  854. )
  855. id_result = session.run(
  856. query_id,
  857. name_zh=dataflow_node.get("name_zh"),
  858. ).single()
  859. dataflow_id = id_result["node_id"] if id_result else None
  860. # 创建关系 - 使用ID调用relationship_exists
  861. if dataflow_id and not relationship_exists(
  862. dataflow_id, "child", child_id
  863. ):
  864. session.run(
  865. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  866. "AND id(b) = $child_id "
  867. "CREATE (a)-[:child]->(b)",
  868. dataflow_id=dataflow_id,
  869. child_id=child_id,
  870. )
  871. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  872. except Exception as e:
  873. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  874. @staticmethod
  875. def _handle_tag_relationships(dataflow_id, tag_list):
  876. """
  877. 处理多标签关系
  878. Args:
  879. dataflow_id: 数据流节点ID
  880. tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
  881. """
  882. # 确保tag_list是列表格式
  883. if not isinstance(tag_list, list):
  884. tag_list = [tag_list] if tag_list else []
  885. for tag_item in tag_list:
  886. tag_id = None
  887. if isinstance(tag_item, dict) and "id" in tag_item:
  888. tag_id = int(tag_item["id"])
  889. elif isinstance(tag_item, (int, str)):
  890. with contextlib.suppress(ValueError, TypeError):
  891. tag_id = int(tag_item)
  892. if tag_id:
  893. DataFlowService._handle_single_tag_relationship(dataflow_id, tag_id)
  894. @staticmethod
  895. def _handle_single_tag_relationship(dataflow_id, tag_id):
  896. """处理单个标签关系"""
  897. try:
  898. # 查找标签节点
  899. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  900. with connect_graph().session() as session:
  901. result = session.run(query, tag_id=tag_id).data()
  902. # 创建关系 - 使用ID调用relationship_exists
  903. if (
  904. result
  905. and dataflow_id
  906. and not relationship_exists(dataflow_id, "LABEL", tag_id)
  907. ):
  908. session.run(
  909. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  910. "AND id(b) = $tag_id "
  911. "CREATE (a)-[:LABEL]->(b)",
  912. dataflow_id=dataflow_id,
  913. tag_id=tag_id,
  914. )
  915. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  916. except Exception as e:
  917. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  918. @staticmethod
  919. def update_dataflow_script_path(
  920. dataflow_name: str,
  921. script_path: str,
  922. ) -> bool:
  923. """
  924. 更新 DataFlow 节点的脚本路径
  925. 当任务完成后,将创建的 Python 脚本路径更新到 DataFlow 节点
  926. Args:
  927. dataflow_name: 数据流名称(中文名)
  928. script_path: Python 脚本的完整路径
  929. Returns:
  930. 是否更新成功
  931. """
  932. try:
  933. query = """
  934. MATCH (n:DataFlow {name_zh: $name_zh})
  935. SET n.script_path = $script_path, n.updated_at = $updated_at
  936. RETURN n
  937. """
  938. with connect_graph().session() as session:
  939. result = session.run(
  940. query,
  941. name_zh=dataflow_name,
  942. script_path=script_path,
  943. updated_at=get_formatted_time(),
  944. ).single()
  945. if result:
  946. logger.info(
  947. f"已更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
  948. )
  949. return True
  950. else:
  951. logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
  952. return False
  953. except Exception as e:
  954. logger.error(f"更新 DataFlow 脚本路径失败: {str(e)}")
  955. return False
  956. @staticmethod
  957. def get_script_content(dataflow_id: int) -> Dict[str, Any]:
  958. """
  959. 根据 DataFlow ID 获取关联的脚本内容
  960. Args:
  961. dataflow_id: 数据流ID
  962. Returns:
  963. 包含脚本内容和元信息的字典:
  964. - script_path: 脚本路径
  965. - script_content: 脚本内容
  966. - script_type: 脚本类型(如 python)
  967. - dataflow_name: 数据流名称
  968. Raises:
  969. ValueError: 当 DataFlow 不存在或脚本路径为空时
  970. FileNotFoundError: 当脚本文件不存在时
  971. """
  972. from pathlib import Path
  973. try:
  974. # 从 Neo4j 获取 DataFlow 节点
  975. query = """
  976. MATCH (n:DataFlow)
  977. WHERE id(n) = $dataflow_id
  978. RETURN n, id(n) as node_id
  979. """
  980. with connect_graph().session() as session:
  981. result = session.run(query, dataflow_id=dataflow_id).single()
  982. if not result:
  983. raise ValueError(f"未找到 ID 为 {dataflow_id} 的 DataFlow 节点")
  984. node = result["n"]
  985. node_props = dict(node)
  986. # 获取脚本路径
  987. script_path = node_props.get("script_path", "")
  988. if not script_path:
  989. raise ValueError(
  990. f"DataFlow (ID: {dataflow_id}) 的 script_path 属性为空"
  991. )
  992. # 确定脚本文件的完整路径
  993. # script_path 可能是相对路径或绝对路径
  994. script_file = Path(script_path)
  995. # 如果是相对路径,相对于项目根目录
  996. if not script_file.is_absolute():
  997. # 获取项目根目录(假设 app 目录的父目录是项目根)
  998. project_root = Path(__file__).parent.parent.parent.parent
  999. script_file = project_root / script_path
  1000. # 检查文件是否存在
  1001. if not script_file.exists():
  1002. raise FileNotFoundError(f"脚本文件不存在: {script_file}")
  1003. # 读取脚本内容
  1004. with script_file.open("r", encoding="utf-8") as f:
  1005. script_content = f.read()
  1006. # 确定脚本类型
  1007. suffix = script_file.suffix.lower()
  1008. script_type_map = {
  1009. ".py": "python",
  1010. ".js": "javascript",
  1011. ".ts": "typescript",
  1012. ".sql": "sql",
  1013. ".sh": "shell",
  1014. }
  1015. script_type = script_type_map.get(suffix, "text")
  1016. logger.info(
  1017. f"成功读取脚本内容: DataFlow ID={dataflow_id}, "
  1018. f"路径={script_path}, 类型={script_type}"
  1019. )
  1020. return {
  1021. "script_path": script_path,
  1022. "script_content": script_content,
  1023. "script_type": script_type,
  1024. "dataflow_id": dataflow_id,
  1025. "dataflow_name": node_props.get("name_zh", ""),
  1026. "dataflow_name_en": node_props.get("name_en", ""),
  1027. }
  1028. except (ValueError, FileNotFoundError):
  1029. raise
  1030. except Exception as e:
  1031. logger.error(f"获取脚本内容失败: {str(e)}")
  1032. raise
  1033. @staticmethod
  1034. def update_dataflow(
  1035. dataflow_id: int,
  1036. data: Dict[str, Any],
  1037. ) -> Optional[Dict[str, Any]]:
  1038. """
  1039. 更新数据流
  1040. Args:
  1041. dataflow_id: 数据流ID
  1042. data: 更新的数据
  1043. Returns:
  1044. 更新后的数据流信息,如果不存在则返回None
  1045. """
  1046. try:
  1047. # 提取 tag 数组(不作为节点属性存储)
  1048. tag_list = data.pop("tag", None)
  1049. # 查找节点
  1050. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  1051. with connect_graph().session() as session:
  1052. result = session.run(query, dataflow_id=dataflow_id).data()
  1053. if not result:
  1054. return None
  1055. # 更新节点属性
  1056. update_fields = []
  1057. params: Dict[str, Any] = {"dataflow_id": dataflow_id}
  1058. for key, value in data.items():
  1059. if key not in ["id", "created_at"]: # 保护字段
  1060. # 复杂对象序列化为 JSON 字符串
  1061. if key in ["config", "script_requirement"] and isinstance(
  1062. value, dict
  1063. ):
  1064. value = json.dumps(value, ensure_ascii=False)
  1065. update_fields.append(f"n.{key} = ${key}")
  1066. params[key] = value
  1067. if update_fields:
  1068. params["updated_at"] = get_formatted_time()
  1069. update_fields.append("n.updated_at = $updated_at")
  1070. update_query = f"""
  1071. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  1072. SET {", ".join(update_fields)}
  1073. RETURN n, id(n) as node_id
  1074. """
  1075. result = session.run(update_query, params).data()
  1076. # 处理 tag 关系(支持多标签数组)
  1077. if tag_list is not None:
  1078. # 确保是列表格式
  1079. if not isinstance(tag_list, list):
  1080. tag_list = [tag_list] if tag_list else []
  1081. # 先删除现有的 LABEL 关系
  1082. delete_query = """
  1083. MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
  1084. WHERE id(n) = $dataflow_id
  1085. DELETE r
  1086. """
  1087. session.run(delete_query, dataflow_id=dataflow_id)
  1088. logger.info(f"删除数据流 {dataflow_id} 的现有标签关系")
  1089. # 为每个 tag 创建新的 LABEL 关系
  1090. for tag_item in tag_list:
  1091. tag_id = None
  1092. if isinstance(tag_item, dict) and "id" in tag_item:
  1093. tag_id = int(tag_item["id"])
  1094. elif isinstance(tag_item, (int, str)):
  1095. with contextlib.suppress(ValueError, TypeError):
  1096. tag_id = int(tag_item)
  1097. if tag_id:
  1098. DataFlowService._handle_single_tag_relationship(
  1099. dataflow_id, tag_id
  1100. )
  1101. if result:
  1102. node = result[0]["n"]
  1103. updated_dataflow = dict(node)
  1104. # 使用查询返回的node_id
  1105. updated_dataflow["id"] = result[0]["node_id"]
  1106. # 查询并添加标签数组到返回数据
  1107. tags_query = """
  1108. MATCH (n:DataFlow)
  1109. WHERE id(n) = $dataflow_id
  1110. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  1111. RETURN collect({
  1112. id: id(label),
  1113. name_zh: label.name_zh,
  1114. name_en: label.name_en
  1115. }) as tags
  1116. """
  1117. tags_result = session.run(
  1118. tags_query, dataflow_id=dataflow_id
  1119. ).single()
  1120. if tags_result:
  1121. tags = tags_result.get("tags", [])
  1122. updated_dataflow["tag"] = [
  1123. tag for tag in tags if tag.get("id") is not None
  1124. ]
  1125. else:
  1126. updated_dataflow["tag"] = []
  1127. logger.info(f"更新数据流成功: ID={dataflow_id}")
  1128. return updated_dataflow
  1129. return None
  1130. except Exception as e:
  1131. logger.error(f"更新数据流失败: {str(e)}")
  1132. raise e
  1133. @staticmethod
  1134. def delete_dataflow(dataflow_id: int) -> bool:
  1135. """
  1136. 删除数据流
  1137. Args:
  1138. dataflow_id: 数据流ID
  1139. Returns:
  1140. 删除是否成功
  1141. """
  1142. try:
  1143. # 删除节点及其关系
  1144. query = """
  1145. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  1146. DETACH DELETE n
  1147. RETURN count(n) as deleted_count
  1148. """
  1149. with connect_graph().session() as session:
  1150. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  1151. result = delete_result["deleted_count"] if delete_result else 0
  1152. if result and result > 0:
  1153. logger.info(f"删除数据流成功: ID={dataflow_id}")
  1154. return True
  1155. return False
  1156. except Exception as e:
  1157. logger.error(f"删除数据流失败: {str(e)}")
  1158. raise e
  1159. @staticmethod
  1160. def execute_dataflow(
  1161. dataflow_id: int,
  1162. params: Optional[Dict[str, Any]] = None,
  1163. ) -> Dict[str, Any]:
  1164. """
  1165. 执行数据流
  1166. Args:
  1167. dataflow_id: 数据流ID
  1168. params: 执行参数
  1169. Returns:
  1170. 执行结果信息
  1171. """
  1172. try:
  1173. # 检查数据流是否存在
  1174. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  1175. with connect_graph().session() as session:
  1176. result = session.run(query, dataflow_id=dataflow_id).data()
  1177. if not result:
  1178. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  1179. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  1180. # TODO: 这里应该实际执行数据流
  1181. # 目前返回模拟结果
  1182. result = {
  1183. "execution_id": execution_id,
  1184. "dataflow_id": dataflow_id,
  1185. "status": "running",
  1186. "started_at": datetime.now().isoformat(),
  1187. "params": params or {},
  1188. "progress": 0,
  1189. }
  1190. logger.info(
  1191. "开始执行数据流: ID=%s, execution_id=%s",
  1192. dataflow_id,
  1193. execution_id,
  1194. )
  1195. return result
  1196. except Exception as e:
  1197. logger.error(f"执行数据流失败: {str(e)}")
  1198. raise e
  1199. @staticmethod
  1200. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  1201. """
  1202. 获取数据流执行状态
  1203. Args:
  1204. dataflow_id: 数据流ID
  1205. Returns:
  1206. 执行状态信息
  1207. """
  1208. try:
  1209. # TODO: 这里应该查询实际的执行状态
  1210. # 目前返回模拟状态
  1211. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  1212. with connect_graph().session() as session:
  1213. result = session.run(query, dataflow_id=dataflow_id).data()
  1214. if not result:
  1215. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  1216. status = ["running", "completed", "failed", "pending"][dataflow_id % 4]
  1217. return {
  1218. "dataflow_id": dataflow_id,
  1219. "status": status,
  1220. "progress": (
  1221. 100 if status == "completed" else (dataflow_id * 10) % 100
  1222. ),
  1223. "started_at": datetime.now().isoformat(),
  1224. "completed_at": (
  1225. datetime.now().isoformat() if status == "completed" else None
  1226. ),
  1227. "error_message": ("执行过程中发生错误" if status == "failed" else None),
  1228. }
  1229. except Exception as e:
  1230. logger.error(f"获取数据流状态失败: {str(e)}")
  1231. raise e
  1232. @staticmethod
  1233. def get_dataflow_logs(
  1234. dataflow_id: int,
  1235. page: int = 1,
  1236. page_size: int = 50,
  1237. ) -> Dict[str, Any]:
  1238. """
  1239. 获取数据流执行日志
  1240. Args:
  1241. dataflow_id: 数据流ID
  1242. page: 页码
  1243. page_size: 每页大小
  1244. Returns:
  1245. 执行日志列表和分页信息
  1246. """
  1247. try:
  1248. # TODO: 这里应该查询实际的执行日志
  1249. # 目前返回模拟日志
  1250. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  1251. with connect_graph().session() as session:
  1252. result = session.run(query, dataflow_id=dataflow_id).data()
  1253. if not result:
  1254. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  1255. mock_logs = [
  1256. {
  1257. "id": i,
  1258. "timestamp": datetime.now().isoformat(),
  1259. "level": ["INFO", "WARNING", "ERROR"][i % 3],
  1260. "message": f"数据流执行日志消息 {i}",
  1261. "component": ["source", "transform", "target"][i % 3],
  1262. }
  1263. for i in range(1, 101)
  1264. ]
  1265. # 分页处理
  1266. total = len(mock_logs)
  1267. start = (page - 1) * page_size
  1268. end = start + page_size
  1269. logs = mock_logs[start:end]
  1270. return {
  1271. "logs": logs,
  1272. "pagination": {
  1273. "page": page,
  1274. "page_size": page_size,
  1275. "total": total,
  1276. "total_pages": (total + page_size - 1) // page_size,
  1277. },
  1278. }
  1279. except Exception as e:
  1280. logger.error(f"获取数据流日志失败: {str(e)}")
  1281. raise e
  1282. @staticmethod
  1283. def _generate_businessdomain_ddl(
  1284. session,
  1285. bd_id: int,
  1286. is_target: bool = False,
  1287. update_mode: str = "append",
  1288. ) -> Optional[Dict[str, Any]]:
  1289. """
  1290. 根据BusinessDomain节点ID生成DDL
  1291. Args:
  1292. session: Neo4j session对象
  1293. bd_id: BusinessDomain节点ID
  1294. is_target: 是否为目标表(目标表需要添加create_time字段)
  1295. update_mode: 更新模式(append或full)
  1296. Returns:
  1297. 包含ddl和data_source信息的字典,如果节点不存在则返回None
  1298. """
  1299. try:
  1300. # 查询BusinessDomain节点、元数据、标签关系和数据源关系
  1301. cypher = """
  1302. MATCH (bd:BusinessDomain)
  1303. WHERE id(bd) = $bd_id
  1304. OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
  1305. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1306. OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
  1307. RETURN bd,
  1308. collect(DISTINCT m) as metadata,
  1309. collect(DISTINCT {
  1310. id: id(label),
  1311. name_zh: label.name_zh,
  1312. name_en: label.name_en
  1313. }) as labels,
  1314. ds.type as ds_type,
  1315. ds.host as ds_host,
  1316. ds.port as ds_port,
  1317. ds.database as ds_database
  1318. """
  1319. result = session.run(cypher, bd_id=bd_id).single()
  1320. if not result or not result["bd"]:
  1321. logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
  1322. return None
  1323. node = result["bd"]
  1324. metadata = result["metadata"]
  1325. # 处理标签数组,获取第一个有效标签名称用于判断
  1326. labels = result.get("labels", [])
  1327. valid_labels = [label for label in labels if label.get("id") is not None]
  1328. label_name = valid_labels[0].get("name_zh") if valid_labels else None
  1329. # 生成DDL
  1330. node_props = dict(node)
  1331. table_name = node_props.get("name_en", f"table_{bd_id}")
  1332. ddl_lines = []
  1333. ddl_lines.append(f"CREATE TABLE {table_name} (")
  1334. column_definitions = []
  1335. # 添加元数据列
  1336. if metadata:
  1337. for meta in metadata:
  1338. if meta:
  1339. meta_props = dict(meta)
  1340. column_name = meta_props.get(
  1341. "name_en",
  1342. meta_props.get("name_zh", "unknown_column"),
  1343. )
  1344. data_type = meta_props.get("data_type", "VARCHAR(255)")
  1345. comment = meta_props.get("name_zh", "")
  1346. column_def = f" {column_name} {data_type}"
  1347. if comment:
  1348. column_def += f" COMMENT '{comment}'"
  1349. column_definitions.append(column_def)
  1350. # 如果没有元数据,添加默认主键
  1351. if not column_definitions:
  1352. column_definitions.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1353. # 5. 如果是目标表,添加create_time字段
  1354. if is_target:
  1355. column_definitions.append(
  1356. " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
  1357. "COMMENT '数据创建时间'"
  1358. )
  1359. ddl_lines.append(",\n".join(column_definitions))
  1360. ddl_lines.append(");")
  1361. # 添加表注释
  1362. table_comment = node_props.get(
  1363. "name_zh", node_props.get("describe", table_name)
  1364. )
  1365. if table_comment and table_comment != table_name:
  1366. ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
  1367. ddl_content = "\n".join(ddl_lines)
  1368. # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
  1369. data_source = None
  1370. if label_name == "数据资源" and result["ds_type"]:
  1371. data_source = {
  1372. "type": result["ds_type"],
  1373. "host": result["ds_host"],
  1374. "port": result["ds_port"],
  1375. "database": result["ds_database"],
  1376. }
  1377. logger.info(f"获取到数据源信息: {data_source}")
  1378. logger.debug(
  1379. f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}"
  1380. )
  1381. return {
  1382. "ddl": ddl_content,
  1383. "table_name": table_name,
  1384. "data_source": data_source,
  1385. }
  1386. except Exception as e:
  1387. logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
  1388. return None
  1389. @staticmethod
  1390. def _handle_script_relationships(
  1391. data: Dict[str, Any],
  1392. dataflow_name: str,
  1393. name_en: str,
  1394. ):
  1395. """
  1396. 处理脚本关系,在Neo4j图数据库中创建从source BusinessDomain到DataFlow的
  1397. INPUT关系,以及从DataFlow到target BusinessDomain的OUTPUT关系。
  1398. 关系模型:
  1399. - (source:BusinessDomain)-[:INPUT]->(dataflow:DataFlow)
  1400. - (dataflow:DataFlow)-[:OUTPUT]->(target:BusinessDomain)
  1401. Args:
  1402. data: 包含脚本信息的数据字典,应包含script_name, script_type,
  1403. schedule_status, source_table, target_table, update_mode
  1404. """
  1405. try:
  1406. # 从data中读取键值对
  1407. source_table_full = data.get("source_table", "")
  1408. target_table_full = data.get("target_table", "")
  1409. # 处理source_table和target_table的格式
  1410. # 格式: "label:name" 或 直接 "name"
  1411. source_table = (
  1412. source_table_full.split(":")[-1]
  1413. if ":" in source_table_full
  1414. else source_table_full
  1415. )
  1416. target_table = (
  1417. target_table_full.split(":")[-1]
  1418. if ":" in target_table_full
  1419. else target_table_full
  1420. )
  1421. source_label = (
  1422. source_table_full.split(":")[0]
  1423. if ":" in source_table_full
  1424. else "BusinessDomain"
  1425. )
  1426. target_label = (
  1427. target_table_full.split(":")[0]
  1428. if ":" in target_table_full
  1429. else "BusinessDomain"
  1430. )
  1431. # 验证必要字段
  1432. if not source_table or not target_table:
  1433. logger.warning(
  1434. "source_table或target_table为空,跳过关系创建: "
  1435. "source_table=%s, target_table=%s",
  1436. source_table,
  1437. target_table,
  1438. )
  1439. return
  1440. logger.info(
  1441. "开始创建INPUT/OUTPUT关系: %s -[INPUT]-> %s -[OUTPUT]-> %s",
  1442. source_table,
  1443. dataflow_name,
  1444. target_table,
  1445. )
  1446. with connect_graph().session() as session:
  1447. # 步骤1:获取DataFlow节点ID
  1448. dataflow_query = """
  1449. MATCH (df:DataFlow {name_zh: $dataflow_name})
  1450. RETURN id(df) as dataflow_id
  1451. """
  1452. df_result = session.run(
  1453. dataflow_query, # type: ignore[arg-type]
  1454. {"dataflow_name": dataflow_name},
  1455. ).single()
  1456. if not df_result:
  1457. logger.error(f"未找到DataFlow节点: {dataflow_name}")
  1458. return
  1459. dataflow_id = df_result["dataflow_id"]
  1460. # 步骤2:获取或创建source节点
  1461. # 优先通过name_en匹配,其次通过name匹配
  1462. source_query = f"""
  1463. MATCH (source:{source_label})
  1464. WHERE source.name_en = $source_table OR source.name = $source_table
  1465. RETURN id(source) as source_id
  1466. LIMIT 1
  1467. """
  1468. source_result = session.run(
  1469. source_query, # type: ignore[arg-type]
  1470. {"source_table": source_table},
  1471. ).single()
  1472. if not source_result:
  1473. logger.warning(
  1474. "未找到source节点: %s,将创建新节点",
  1475. source_table,
  1476. )
  1477. # 创建source节点
  1478. create_source_query = f"""
  1479. CREATE (source:{source_label} {{
  1480. name: $source_table,
  1481. name_en: $source_table,
  1482. created_at: $created_at,
  1483. type: 'source'
  1484. }})
  1485. RETURN id(source) as source_id
  1486. """
  1487. source_result = session.run(
  1488. create_source_query, # type: ignore[arg-type]
  1489. {
  1490. "source_table": source_table,
  1491. "created_at": get_formatted_time(),
  1492. },
  1493. ).single()
  1494. source_id = source_result["source_id"] if source_result else None
  1495. # 步骤3:获取或创建target节点
  1496. target_query = f"""
  1497. MATCH (target:{target_label})
  1498. WHERE target.name_en = $target_table OR target.name = $target_table
  1499. RETURN id(target) as target_id
  1500. LIMIT 1
  1501. """
  1502. target_result = session.run(
  1503. target_query, # type: ignore[arg-type]
  1504. {"target_table": target_table},
  1505. ).single()
  1506. if not target_result:
  1507. logger.warning(
  1508. "未找到target节点: %s,将创建新节点",
  1509. target_table,
  1510. )
  1511. # 创建target节点
  1512. create_target_query = f"""
  1513. CREATE (target:{target_label} {{
  1514. name: $target_table,
  1515. name_en: $target_table,
  1516. created_at: $created_at,
  1517. type: 'target'
  1518. }})
  1519. RETURN id(target) as target_id
  1520. """
  1521. target_result = session.run(
  1522. create_target_query, # type: ignore[arg-type]
  1523. {
  1524. "target_table": target_table,
  1525. "created_at": get_formatted_time(),
  1526. },
  1527. ).single()
  1528. target_id = target_result["target_id"] if target_result else None
  1529. if not source_id or not target_id:
  1530. logger.error(
  1531. "无法获取source或target节点ID: source_id=%s, target_id=%s",
  1532. source_id,
  1533. target_id,
  1534. )
  1535. return
  1536. # 步骤4:创建 INPUT 关系 (source)-[:INPUT]->(dataflow)
  1537. create_input_query = """
  1538. MATCH (source), (dataflow:DataFlow)
  1539. WHERE id(source) = $source_id AND id(dataflow) = $dataflow_id
  1540. MERGE (source)-[r:INPUT]->(dataflow)
  1541. ON CREATE SET r.created_at = $created_at
  1542. ON MATCH SET r.updated_at = $created_at
  1543. RETURN r
  1544. """
  1545. input_result = session.run(
  1546. create_input_query, # type: ignore[arg-type]
  1547. {
  1548. "source_id": source_id,
  1549. "dataflow_id": dataflow_id,
  1550. "created_at": get_formatted_time(),
  1551. },
  1552. ).single()
  1553. if input_result:
  1554. logger.info(
  1555. "成功创建INPUT关系: %s -> %s",
  1556. source_table,
  1557. dataflow_name,
  1558. )
  1559. else:
  1560. logger.warning(
  1561. "INPUT关系创建失败或已存在: %s -> %s",
  1562. source_table,
  1563. dataflow_name,
  1564. )
  1565. # 步骤5:创建 OUTPUT 关系 (dataflow)-[:OUTPUT]->(target)
  1566. create_output_query = """
  1567. MATCH (dataflow:DataFlow), (target)
  1568. WHERE id(dataflow) = $dataflow_id AND id(target) = $target_id
  1569. MERGE (dataflow)-[r:OUTPUT]->(target)
  1570. ON CREATE SET r.created_at = $created_at
  1571. ON MATCH SET r.updated_at = $created_at
  1572. RETURN r
  1573. """
  1574. output_result = session.run(
  1575. create_output_query, # type: ignore[arg-type]
  1576. {
  1577. "dataflow_id": dataflow_id,
  1578. "target_id": target_id,
  1579. "created_at": get_formatted_time(),
  1580. },
  1581. ).single()
  1582. if output_result:
  1583. logger.info(
  1584. "成功创建OUTPUT关系: %s -> %s",
  1585. dataflow_name,
  1586. target_table,
  1587. )
  1588. else:
  1589. logger.warning(
  1590. "OUTPUT关系创建失败或已存在: %s -> %s",
  1591. dataflow_name,
  1592. target_table,
  1593. )
  1594. logger.info(
  1595. "血缘关系创建完成: %s -[INPUT]-> %s -[OUTPUT]-> %s",
  1596. source_table,
  1597. dataflow_name,
  1598. target_table,
  1599. )
  1600. except Exception as e:
  1601. logger.error(f"处理脚本关系失败: {str(e)}")
  1602. raise e
  1603. @staticmethod
  1604. def get_business_domain_list() -> List[Dict[str, Any]]:
  1605. """
  1606. 获取BusinessDomain节点列表
  1607. Returns:
  1608. BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
  1609. """
  1610. try:
  1611. logger.info("开始查询BusinessDomain节点列表")
  1612. with connect_graph().session() as session:
  1613. # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
  1614. query = """
  1615. MATCH (bd:BusinessDomain)
  1616. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1617. RETURN id(bd) as id,
  1618. bd.name_zh as name_zh,
  1619. bd.name_en as name_en,
  1620. bd.create_time as create_time,
  1621. collect({
  1622. id: id(label),
  1623. name_zh: label.name_zh,
  1624. name_en: label.name_en
  1625. }) as tags
  1626. ORDER BY create_time DESC
  1627. """
  1628. result = session.run(query)
  1629. bd_list = []
  1630. for record in result:
  1631. # 处理标签数组,过滤掉空标签
  1632. tags = record.get("tags", [])
  1633. tag_list = [tag for tag in tags if tag.get("id") is not None]
  1634. bd_item = {
  1635. "id": record["id"],
  1636. "name_zh": record.get("name_zh", "") or "",
  1637. "name_en": record.get("name_en", "") or "",
  1638. "tag": tag_list,
  1639. }
  1640. bd_list.append(bd_item)
  1641. logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
  1642. return bd_list
  1643. except Exception as e:
  1644. logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
  1645. raise e
  1646. @staticmethod
  1647. def _register_data_product(
  1648. data: Dict[str, Any],
  1649. dataflow_name: str,
  1650. name_en: str,
  1651. dataflow_id: Optional[int] = None,
  1652. ) -> None:
  1653. """
  1654. 注册数据产品到数据服务
  1655. 当数据流创建成功后,自动将其注册为数据产品,
  1656. 以便在数据服务模块中展示和管理。
  1657. 从 script_requirement.target_table 中获取 BusinessDomain ID,
  1658. 然后查询 Neo4j 获取对应节点的 name_zh 和 name_en 作为数据产品名称。
  1659. Args:
  1660. data: 数据流配置数据
  1661. dataflow_name: 数据流名称(中文)
  1662. name_en: 数据流英文名
  1663. dataflow_id: 数据流ID(Neo4j节点ID)
  1664. """
  1665. try:
  1666. # 从script_requirement中获取target_table(BusinessDomain ID列表)
  1667. script_requirement = data.get("script_requirement")
  1668. description = data.get("describe", "")
  1669. # 解析 script_requirement
  1670. req_json: Optional[Dict[str, Any]] = None
  1671. if script_requirement:
  1672. if isinstance(script_requirement, dict):
  1673. req_json = script_requirement
  1674. elif isinstance(script_requirement, str):
  1675. try:
  1676. parsed = json.loads(script_requirement)
  1677. if isinstance(parsed, dict):
  1678. req_json = parsed
  1679. except (json.JSONDecodeError, TypeError):
  1680. pass
  1681. # 获取target_table中的BusinessDomain ID列表
  1682. target_bd_ids: List[int] = []
  1683. if req_json:
  1684. target_table_ids = req_json.get("target_table", [])
  1685. if isinstance(target_table_ids, list):
  1686. target_bd_ids = [
  1687. int(bid) for bid in target_table_ids if bid is not None
  1688. ]
  1689. elif target_table_ids is not None:
  1690. target_bd_ids = [int(target_table_ids)]
  1691. # 如果有rule字段,添加到描述中
  1692. rule = req_json.get("rule", "")
  1693. if rule and not description:
  1694. description = rule
  1695. # 如果没有target_table ID,则不注册数据产品
  1696. if not target_bd_ids:
  1697. logger.warning(
  1698. f"数据流 {dataflow_name} 没有指定target_table,跳过数据产品注册"
  1699. )
  1700. return
  1701. # 从Neo4j查询每个BusinessDomain节点的name_zh和name_en
  1702. with connect_graph().session() as session:
  1703. for bd_id in target_bd_ids:
  1704. try:
  1705. query = """
  1706. MATCH (bd:BusinessDomain)
  1707. WHERE id(bd) = $bd_id
  1708. RETURN bd.name_zh as name_zh,
  1709. bd.name_en as name_en,
  1710. bd.describe as describe
  1711. """
  1712. result = session.run(query, bd_id=bd_id).single()
  1713. if not result:
  1714. logger.warning(
  1715. f"未找到ID为 {bd_id} 的BusinessDomain节点,跳过"
  1716. )
  1717. continue
  1718. # 使用BusinessDomain节点的name_zh和name_en
  1719. product_name = result.get("name_zh") or ""
  1720. product_name_en = result.get("name_en") or ""
  1721. # 如果没有name_zh,使用name_en
  1722. if not product_name:
  1723. product_name = product_name_en
  1724. # 如果没有name_en,使用name_zh转换
  1725. if not product_name_en:
  1726. product_name_en = product_name.lower().replace(" ", "_")
  1727. # 目标表名使用BusinessDomain的name_en
  1728. target_table = product_name_en
  1729. # 如果BusinessDomain有describe且当前description为空,使用它
  1730. bd_describe = result.get("describe") or ""
  1731. if bd_describe and not description:
  1732. description = bd_describe
  1733. # 解析目标schema(默认为public)
  1734. target_schema = "public"
  1735. # 调用数据产品服务进行注册
  1736. DataProductService.register_data_product(
  1737. product_name=product_name,
  1738. product_name_en=product_name_en,
  1739. target_table=target_table,
  1740. target_schema=target_schema,
  1741. description=description,
  1742. source_dataflow_id=dataflow_id,
  1743. source_dataflow_name=dataflow_name,
  1744. created_by=data.get("created_by", "dataflow"),
  1745. )
  1746. logger.info(
  1747. f"数据产品注册成功: {product_name} -> "
  1748. f"{target_schema}.{target_table}"
  1749. )
  1750. except Exception as bd_error:
  1751. logger.error(
  1752. f"处理BusinessDomain {bd_id} 失败: {str(bd_error)}"
  1753. )
  1754. # 继续处理下一个
  1755. except Exception as e:
  1756. logger.error(f"注册数据产品失败: {str(e)}")
  1757. raise