dataflows.py 68 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695
  1. import logging
  2. from typing import Dict, List, Optional, Any, Union
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_sql
  6. from app.core.graph.graph_operations import (
  7. connect_graph,
  8. create_or_get_node,
  9. get_node,
  10. relationship_exists,
  11. )
  12. from app.core.meta_data import translate_and_parse, get_formatted_time
  13. from app import db
  14. from sqlalchemy import text
  15. logger = logging.getLogger(__name__)
  16. class DataFlowService:
  17. """数据流服务类,处理数据流相关的业务逻辑"""
  18. @staticmethod
  19. def get_dataflows(
  20. page: int = 1,
  21. page_size: int = 10,
  22. search: str = '',
  23. ) -> Dict[str, Any]:
  24. """
  25. 获取数据流列表
  26. Args:
  27. page: 页码
  28. page_size: 每页大小
  29. search: 搜索关键词
  30. Returns:
  31. 包含数据流列表和分页信息的字典
  32. """
  33. try:
  34. # 从图数据库查询数据流列表
  35. skip_count = (page - 1) * page_size
  36. # 构建搜索条件
  37. where_clause = ""
  38. params: Dict[str, Union[int, str]] = {
  39. 'skip': skip_count,
  40. 'limit': page_size,
  41. }
  42. if search:
  43. where_clause = (
  44. "WHERE n.name_zh CONTAINS $search OR "
  45. "n.description CONTAINS $search"
  46. )
  47. params['search'] = search
  48. # 查询数据流列表(包含标签数组)
  49. # 使用WITH子句先分页,再聚合标签,避免分页结果不准确
  50. query = f"""
  51. MATCH (n:DataFlow)
  52. {where_clause}
  53. WITH n
  54. ORDER BY n.created_at DESC
  55. SKIP $skip
  56. LIMIT $limit
  57. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  58. RETURN n, id(n) as node_id,
  59. n.created_at as created_at,
  60. collect({{
  61. id: id(label),
  62. name_zh: label.name_zh,
  63. name_en: label.name_en
  64. }}) as tags
  65. ORDER BY created_at DESC
  66. """
  67. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  68. try:
  69. with connect_graph().session() as session:
  70. list_result = session.run(query, params).data()
  71. # 查询总数
  72. count_query = f"""
  73. MATCH (n:DataFlow)
  74. {where_clause}
  75. RETURN count(n) as total
  76. """
  77. count_params = {'search': search} if search else {}
  78. count_result = session.run(
  79. count_query, count_params).single()
  80. total = count_result['total'] if count_result else 0
  81. except Exception as e:
  82. # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭
  83. # driver,因为connect_graph可能返回单例或新实例。如果是新实例,
  84. # 我们没有引用它去关闭;若connect_graph每次返回新实例且需要关闭,
  85. # 之前的代码是对的。如果connect_graph返回单例,则不应关闭。
  86. # 用户反馈:The driver.close() call prematurely closes a shared
  87. # driver instance,所以直接使用 session,并不关闭 driver。
  88. logger.error(f"查询数据流失败: {str(e)}")
  89. raise e
  90. # 格式化结果
  91. dataflows = []
  92. for record in list_result:
  93. node = record['n']
  94. dataflow = dict(node)
  95. dataflow['id'] = record['node_id'] # 使用查询返回的node_id
  96. # 处理标签数组,过滤掉空标签
  97. tags = record.get('tags', [])
  98. dataflow['tag'] = [
  99. tag for tag in tags
  100. if tag.get('id') is not None
  101. ]
  102. dataflows.append(dataflow)
  103. return {
  104. 'list': dataflows,
  105. 'pagination': {
  106. 'page': page,
  107. 'page_size': page_size,
  108. 'total': total,
  109. 'total_pages': (total + page_size - 1) // page_size
  110. }
  111. }
  112. except Exception as e:
  113. logger.error(f"获取数据流列表失败: {str(e)}")
  114. raise e
  115. @staticmethod
  116. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  117. """
  118. 根据ID获取数据流详情
  119. Args:
  120. dataflow_id: 数据流ID
  121. Returns:
  122. 数据流详情字典,如果不存在则返回None
  123. """
  124. try:
  125. # 从Neo4j获取DataFlow节点的所有属性(包含标签数组)
  126. neo4j_query = """
  127. MATCH (n:DataFlow)
  128. WHERE id(n) = $dataflow_id
  129. OPTIONAL MATCH (n)-[:LABEL]->(label:DataLabel)
  130. RETURN n, id(n) as node_id,
  131. collect({
  132. id: id(label),
  133. name_zh: label.name_zh,
  134. name_en: label.name_en
  135. }) as tags
  136. """
  137. with connect_graph().session() as session:
  138. neo4j_result = session.run(
  139. neo4j_query, dataflow_id=dataflow_id).data()
  140. if not neo4j_result:
  141. logger.warning(f"未找到ID为 {dataflow_id} 的DataFlow节点")
  142. return None
  143. record = neo4j_result[0]
  144. node = record['n']
  145. # 将节点属性转换为字典
  146. dataflow = dict(node)
  147. dataflow['id'] = record['node_id']
  148. # 处理标签数组,过滤掉空标签
  149. tags = record.get('tags', [])
  150. dataflow['tag'] = [
  151. tag for tag in tags
  152. if tag.get('id') is not None
  153. ]
  154. # 处理 script_requirement:如果是JSON字符串,解析为对象
  155. script_requirement_str = dataflow.get('script_requirement', '')
  156. if script_requirement_str:
  157. try:
  158. # 尝试解析JSON字符串
  159. script_requirement_obj = json.loads(
  160. script_requirement_str)
  161. dataflow['script_requirement'] = script_requirement_obj
  162. logger.debug(
  163. "成功解析script_requirement: %s",
  164. script_requirement_obj,
  165. )
  166. except (json.JSONDecodeError, TypeError) as e:
  167. logger.warning(f"script_requirement解析失败,保持原值: {e}")
  168. # 保持原值(字符串)
  169. dataflow['script_requirement'] = script_requirement_str
  170. else:
  171. # 如果为空,设置为None
  172. dataflow['script_requirement'] = None
  173. logger.info(
  174. "成功获取DataFlow详情,ID: %s, 名称: %s",
  175. dataflow_id,
  176. dataflow.get('name_zh'),
  177. )
  178. return dataflow
  179. except Exception as e:
  180. logger.error(f"获取数据流详情失败: {str(e)}")
  181. raise e
  182. @staticmethod
  183. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  184. """
  185. 创建新的数据流
  186. Args:
  187. data: 数据流配置数据
  188. Returns:
  189. 创建的数据流信息
  190. """
  191. try:
  192. # 验证必填字段
  193. required_fields = ['name_zh', 'describe']
  194. for field in required_fields:
  195. if field not in data:
  196. raise ValueError(f"缺少必填字段: {field}")
  197. dataflow_name = data['name_zh']
  198. # 使用LLM翻译名称生成英文名
  199. try:
  200. result_list = translate_and_parse(dataflow_name)
  201. name_en = (
  202. result_list[0]
  203. if result_list
  204. else dataflow_name.lower().replace(' ', '_')
  205. )
  206. except Exception as e:
  207. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  208. name_en = dataflow_name.lower().replace(' ', '_')
  209. # 处理 script_requirement,将其转换为 JSON 字符串
  210. script_requirement = data.get('script_requirement', None)
  211. if script_requirement is not None:
  212. # 如果是字典或列表,转换为 JSON 字符串
  213. if isinstance(script_requirement, (dict, list)):
  214. script_requirement_str = json.dumps(
  215. script_requirement, ensure_ascii=False)
  216. else:
  217. # 如果已经是字符串,直接使用
  218. script_requirement_str = str(script_requirement)
  219. else:
  220. script_requirement_str = ''
  221. # 准备节点数据(tag不作为节点属性存储,而是通过LABEL关系关联)
  222. node_data = {
  223. 'name_zh': dataflow_name,
  224. 'name_en': name_en,
  225. 'category': data.get('category', ''),
  226. 'organization': data.get('organization', ''),
  227. 'leader': data.get('leader', ''),
  228. 'frequency': data.get('frequency', ''),
  229. 'describe': data.get('describe', ''),
  230. 'status': data.get('status', 'inactive'),
  231. 'update_mode': data.get('update_mode', 'append'),
  232. 'script_type': data.get('script_type', 'python'),
  233. 'script_requirement': script_requirement_str,
  234. 'created_at': get_formatted_time(),
  235. 'updated_at': get_formatted_time()
  236. }
  237. # 创建或获取数据流节点
  238. dataflow_id = get_node('DataFlow', name=dataflow_name)
  239. if dataflow_id:
  240. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  241. dataflow_id = create_or_get_node('DataFlow', **node_data)
  242. # 处理标签关系(支持多标签数组)
  243. tag_list = data.get('tag', [])
  244. if tag_list:
  245. try:
  246. DataFlowService._handle_tag_relationships(
  247. dataflow_id, tag_list)
  248. except Exception as e:
  249. logger.warning(f"处理标签关系时出错: {str(e)}")
  250. # 成功创建图数据库节点后,写入PG数据库
  251. try:
  252. DataFlowService._save_to_pg_database(
  253. data, dataflow_name, name_en)
  254. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  255. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  256. try:
  257. DataFlowService._handle_script_relationships(
  258. data, dataflow_name, name_en)
  259. logger.info(f"脚本关系创建成功: {dataflow_name}")
  260. except Exception as script_error:
  261. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  262. except Exception as pg_error:
  263. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  264. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  265. # 在实际应用中,可能需要考虑分布式事务
  266. # 返回创建的数据流信息
  267. # 查询创建的节点获取完整信息
  268. query = (
  269. "MATCH (n:DataFlow {name_zh: $name_zh}) "
  270. "RETURN n, id(n) as node_id"
  271. )
  272. with connect_graph().session() as session:
  273. id_result = session.run(query, name_zh=dataflow_name).single()
  274. if id_result:
  275. dataflow_node = id_result['n']
  276. node_id = id_result['node_id']
  277. # 将节点属性转换为字典
  278. result = dict(dataflow_node)
  279. result['id'] = node_id
  280. else:
  281. # 如果查询失败,返回基本信息
  282. result = {
  283. 'id': (
  284. dataflow_id
  285. if isinstance(dataflow_id, int)
  286. else None
  287. ),
  288. 'name_zh': dataflow_name,
  289. 'name_en': name_en,
  290. 'created_at': get_formatted_time(),
  291. }
  292. logger.info(f"创建数据流成功: {dataflow_name}")
  293. return result
  294. except Exception as e:
  295. logger.error(f"创建数据流失败: {str(e)}")
  296. raise e
  297. @staticmethod
  298. def _save_to_pg_database(
  299. data: Dict[str, Any],
  300. script_name: str,
  301. name_en: str,
  302. ):
  303. """
  304. 将脚本信息保存到PG数据库
  305. Args:
  306. data: 包含脚本信息的数据
  307. script_name: 脚本名称
  308. name_en: 英文名称
  309. """
  310. try:
  311. # 提取脚本相关信息
  312. # 处理 script_requirement,确保保存为 JSON 字符串
  313. script_requirement_raw = data.get('script_requirement', None)
  314. # 用于保存从 script_requirement 中提取的 rule
  315. rule_from_requirement = ''
  316. if script_requirement_raw is not None:
  317. # 如果是字典,提取 rule 字段
  318. if isinstance(script_requirement_raw, dict):
  319. rule_from_requirement = script_requirement_raw.get(
  320. 'rule', '')
  321. script_requirement = json.dumps(
  322. script_requirement_raw, ensure_ascii=False)
  323. elif isinstance(script_requirement_raw, list):
  324. script_requirement = json.dumps(
  325. script_requirement_raw, ensure_ascii=False)
  326. else:
  327. # 如果已经是字符串,尝试解析以提取 rule
  328. script_requirement = str(script_requirement_raw)
  329. try:
  330. parsed_req = json.loads(script_requirement)
  331. if isinstance(parsed_req, dict):
  332. rule_from_requirement = parsed_req.get('rule', '')
  333. except (json.JSONDecodeError, TypeError):
  334. pass
  335. else:
  336. script_requirement = ''
  337. # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
  338. script_content = data.get('script_content', '')
  339. if not script_content and rule_from_requirement:
  340. script_content = rule_from_requirement
  341. logger.info(
  342. "script_content为空,使用从script_requirement提取的rule: %s",
  343. rule_from_requirement,
  344. )
  345. # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误)
  346. source_table_raw = data.get('source_table') or ''
  347. source_table = (
  348. source_table_raw.split(':')[-1]
  349. if ':' in source_table_raw
  350. else source_table_raw
  351. )
  352. target_table_raw = data.get('target_table') or ''
  353. target_table = (
  354. target_table_raw.split(':')[-1]
  355. if ':' in target_table_raw
  356. else (target_table_raw or name_en)
  357. )
  358. script_type = data.get('script_type', 'python')
  359. user_name = data.get('created_by', 'system')
  360. target_dt_column = data.get('target_dt_column', '')
  361. # 验证必需字段
  362. if not target_table:
  363. target_table = name_en
  364. if not script_name:
  365. raise ValueError("script_name不能为空")
  366. # 构建插入SQL
  367. insert_sql = text(
  368. """
  369. INSERT INTO dags.data_transform_scripts
  370. (source_table, target_table, script_name, script_type,
  371. script_requirement, script_content, user_name, create_time,
  372. update_time, target_dt_column)
  373. VALUES
  374. (:source_table, :target_table, :script_name, :script_type,
  375. :script_requirement, :script_content, :user_name,
  376. :create_time, :update_time, :target_dt_column)
  377. ON CONFLICT (target_table, script_name)
  378. DO UPDATE SET
  379. source_table = EXCLUDED.source_table,
  380. script_type = EXCLUDED.script_type,
  381. script_requirement = EXCLUDED.script_requirement,
  382. script_content = EXCLUDED.script_content,
  383. user_name = EXCLUDED.user_name,
  384. update_time = EXCLUDED.update_time,
  385. target_dt_column = EXCLUDED.target_dt_column
  386. """
  387. )
  388. # 准备参数
  389. current_time = datetime.now()
  390. params = {
  391. 'source_table': source_table,
  392. 'target_table': target_table,
  393. 'script_name': script_name,
  394. 'script_type': script_type,
  395. 'script_requirement': script_requirement,
  396. 'script_content': script_content,
  397. 'user_name': user_name,
  398. 'create_time': current_time,
  399. 'update_time': current_time,
  400. 'target_dt_column': target_dt_column
  401. }
  402. # 执行插入操作
  403. db.session.execute(insert_sql, params)
  404. # 新增:保存到task_list表
  405. try:
  406. # 1. 解析script_requirement并构建详细的任务描述
  407. task_description_md = script_requirement
  408. try:
  409. # 尝试解析JSON
  410. try:
  411. req_json = json.loads(script_requirement)
  412. except (json.JSONDecodeError, TypeError):
  413. req_json = None
  414. if isinstance(req_json, dict):
  415. # 1. 从script_requirement中提取rule字段作为request_content_str
  416. request_content_str = req_json.get('rule', '')
  417. # 2. 从script_requirement中提取source_table和
  418. # target_table字段信息
  419. source_table_ids = req_json.get('source_table', [])
  420. target_table_ids = req_json.get('target_table', [])
  421. # 确保是列表格式
  422. if not isinstance(source_table_ids, list):
  423. source_table_ids = [
  424. source_table_ids] if source_table_ids else []
  425. if not isinstance(target_table_ids, list):
  426. target_table_ids = [
  427. target_table_ids] if target_table_ids else []
  428. # 合并所有BusinessDomain ID
  429. all_bd_ids = source_table_ids + target_table_ids
  430. # 4. 从data参数中提取update_mode
  431. update_mode = data.get('update_mode', 'append')
  432. # 生成Business Domain DDLs
  433. source_ddls = []
  434. target_ddls = []
  435. data_source_info = None
  436. if all_bd_ids:
  437. try:
  438. with connect_graph().session() as session:
  439. # 处理source tables
  440. for bd_id in source_table_ids:
  441. ddl_info = (
  442. DataFlowService
  443. ._generate_businessdomain_ddl(
  444. session,
  445. bd_id,
  446. is_target=False,
  447. )
  448. )
  449. if ddl_info:
  450. source_ddls.append(ddl_info['ddl'])
  451. # 3. 如果BELONGS_TO关系连接的是
  452. # "数据资源",获取数据源信息
  453. if (
  454. ddl_info.get('data_source')
  455. and not data_source_info
  456. ):
  457. data_source_info = ddl_info[
  458. 'data_source'
  459. ]
  460. # 处理target tables(5. 目标表缺省要有create_time字段)
  461. for bd_id in target_table_ids:
  462. ddl_info = (
  463. DataFlowService
  464. ._generate_businessdomain_ddl(
  465. session,
  466. bd_id,
  467. is_target=True,
  468. update_mode=update_mode,
  469. )
  470. )
  471. if ddl_info:
  472. target_ddls.append(ddl_info['ddl'])
  473. # 同样检查BELONGS_TO关系,获取数据源信息
  474. if (
  475. ddl_info.get('data_source')
  476. and not data_source_info
  477. ):
  478. data_source_info = ddl_info[
  479. 'data_source'
  480. ]
  481. except Exception as neo_e:
  482. logger.error(
  483. f"获取BusinessDomain DDL失败: {str(neo_e)}")
  484. # 构建Markdown格式的任务描述
  485. task_desc_parts = [f"# Task: {script_name}\n"]
  486. # 添加数据源信息
  487. if data_source_info:
  488. task_desc_parts.append("## Data Source")
  489. task_desc_parts.append(
  490. f"- **Type**: "
  491. f"{data_source_info.get('type', 'N/A')}"
  492. )
  493. task_desc_parts.append(
  494. f"- **Host**: "
  495. f"{data_source_info.get('host', 'N/A')}"
  496. )
  497. task_desc_parts.append(
  498. f"- **Port**: "
  499. f"{data_source_info.get('port', 'N/A')}"
  500. )
  501. task_desc_parts.append(
  502. f"- **Database**: "
  503. f"{data_source_info.get('database', 'N/A')}\n"
  504. )
  505. # 添加源表DDL
  506. if source_ddls:
  507. task_desc_parts.append("## Source Tables (DDL)")
  508. for ddl in source_ddls:
  509. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  510. # 添加目标表DDL
  511. if target_ddls:
  512. task_desc_parts.append("## Target Tables (DDL)")
  513. for ddl in target_ddls:
  514. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  515. # 添加更新模式说明
  516. task_desc_parts.append("## Update Mode")
  517. if update_mode == 'append':
  518. task_desc_parts.append("- **Mode**: Append (追加模式)")
  519. task_desc_parts.append(
  520. "- **Description**: 新数据将追加到目标表,不删除现有数据\n")
  521. else:
  522. task_desc_parts.append(
  523. "- **Mode**: Full Refresh (全量更新)")
  524. task_desc_parts.append(
  525. "- **Description**: 目标表将被清空后重新写入数据\n")
  526. # 添加请求内容(rule)
  527. if request_content_str:
  528. task_desc_parts.append("## Request Content")
  529. task_desc_parts.append(f"{request_content_str}\n")
  530. # 添加实施步骤(根据任务类型优化)
  531. task_desc_parts.append("## Implementation Steps")
  532. # 判断是否为远程数据源导入任务
  533. if data_source_info:
  534. # 从远程数据源导入数据的简化步骤
  535. task_desc_parts.append(
  536. "1. Create an n8n workflow to execute the "
  537. "data import task"
  538. )
  539. task_desc_parts.append(
  540. "2. Configure the workflow to call "
  541. "`import_resource_data.py` Python script"
  542. )
  543. task_desc_parts.append(
  544. "3. Pass the following parameters to the "
  545. "Python execution node:"
  546. )
  547. task_desc_parts.append(
  548. " - `--source-config`: JSON configuration "
  549. "for the remote data source"
  550. )
  551. task_desc_parts.append(
  552. " - `--target-table`: Target table name "
  553. "(data resource English name)"
  554. )
  555. task_desc_parts.append(
  556. f" - `--update-mode`: {update_mode}")
  557. task_desc_parts.append(
  558. "4. The Python script will automatically:")
  559. task_desc_parts.append(
  560. " - Connect to the remote data source")
  561. task_desc_parts.append(
  562. " - Extract data from the source table")
  563. task_desc_parts.append(
  564. f" - Write data to target table using "
  565. f"{update_mode} mode"
  566. )
  567. else:
  568. # 数据转换任务的完整步骤
  569. task_desc_parts.append(
  570. "1. Extract data from source tables as "
  571. "specified in the DDL"
  572. )
  573. task_desc_parts.append(
  574. "2. Apply transformation logic according "
  575. "to the rule:"
  576. )
  577. if request_content_str:
  578. task_desc_parts.append(
  579. f" - Rule: {request_content_str}")
  580. task_desc_parts.append(
  581. "3. Generate Python program to implement the "
  582. "data transformation logic"
  583. )
  584. task_desc_parts.append(
  585. f"4. Write transformed data to target table "
  586. f"using {update_mode} mode"
  587. )
  588. task_desc_parts.append(
  589. "5. Create an n8n workflow to schedule and "
  590. "execute the Python program"
  591. )
  592. task_description_md = "\n".join(task_desc_parts)
  593. except Exception as parse_e:
  594. logger.warning(f"解析任务描述详情失败,使用原始描述: {str(parse_e)}")
  595. task_description_md = script_requirement
  596. # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
  597. code_path = 'app/core/data_flow'
  598. task_insert_sql = text(
  599. "INSERT INTO public.task_list\n"
  600. "(task_name, task_description, status, code_name, "
  601. "code_path, create_by, create_time, update_time)\n"
  602. "VALUES\n"
  603. "(:task_name, :task_description, :status, :code_name, "
  604. ":code_path, :create_by, :create_time, :update_time)"
  605. )
  606. task_params = {
  607. 'task_name': script_name,
  608. 'task_description': task_description_md,
  609. 'status': 'pending',
  610. 'code_name': script_name,
  611. 'code_path': code_path,
  612. 'create_by': 'cursor',
  613. 'create_time': current_time,
  614. 'update_time': current_time
  615. }
  616. # 使用嵌套事务,确保task_list插入失败不影响主流程
  617. with db.session.begin_nested():
  618. db.session.execute(task_insert_sql, task_params)
  619. logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
  620. except Exception as task_error:
  621. # 记录错误但不中断主流程
  622. logger.error(f"写入task_list表失败: {str(task_error)}")
  623. # 如果要求必须成功写入任务列表,则这里应该raise task_error
  624. # raise task_error
  625. db.session.commit()
  626. logger.info(
  627. "成功将脚本信息写入PG数据库: target_table=%s, script_name=%s",
  628. target_table,
  629. script_name,
  630. )
  631. except Exception as e:
  632. db.session.rollback()
  633. logger.error(f"写入PG数据库失败: {str(e)}")
  634. raise e
  635. @staticmethod
  636. def _handle_children_relationships(dataflow_node, children_ids):
  637. """处理子节点关系"""
  638. logger.debug(
  639. "处理子节点关系,原始children_ids: %s, 类型: %s",
  640. children_ids,
  641. type(children_ids),
  642. )
  643. # 确保children_ids是列表格式
  644. if not isinstance(children_ids, (list, tuple)):
  645. if children_ids is not None:
  646. children_ids = [children_ids] # 如果是单个值,转换为列表
  647. logger.debug(f"将单个值转换为列表: {children_ids}")
  648. else:
  649. children_ids = [] # 如果是None,转换为空列表
  650. logger.debug("将None转换为空列表")
  651. for child_id in children_ids:
  652. try:
  653. # 查找子节点
  654. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  655. with connect_graph().session() as session:
  656. result = session.run(query, child_id=child_id).data()
  657. if result:
  658. # 获取dataflow_node的ID
  659. dataflow_id = getattr(dataflow_node, 'identity', None)
  660. if dataflow_id is None:
  661. # 如果没有identity属性,从名称查询ID
  662. query_id = (
  663. "MATCH (n:DataFlow) WHERE n.name_zh = "
  664. "$name_zh RETURN id(n) as node_id"
  665. )
  666. id_result = session.run(
  667. query_id,
  668. name_zh=dataflow_node.get('name_zh'),
  669. ).single()
  670. dataflow_id = (
  671. id_result['node_id'] if id_result else None
  672. )
  673. # 创建关系 - 使用ID调用relationship_exists
  674. if dataflow_id and not relationship_exists(
  675. dataflow_id, 'child', child_id
  676. ):
  677. session.run(
  678. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  679. "AND id(b) = $child_id "
  680. "CREATE (a)-[:child]->(b)",
  681. dataflow_id=dataflow_id,
  682. child_id=child_id,
  683. )
  684. logger.info(
  685. f"创建子节点关系: {dataflow_id} -> {child_id}")
  686. except Exception as e:
  687. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  688. @staticmethod
  689. def _handle_tag_relationships(dataflow_id, tag_list):
  690. """
  691. 处理多标签关系
  692. Args:
  693. dataflow_id: 数据流节点ID
  694. tag_list: 标签列表,可以是ID数组或包含id字段的对象数组
  695. """
  696. # 确保tag_list是列表格式
  697. if not isinstance(tag_list, list):
  698. tag_list = [tag_list] if tag_list else []
  699. for tag_item in tag_list:
  700. tag_id = None
  701. if isinstance(tag_item, dict) and 'id' in tag_item:
  702. tag_id = int(tag_item['id'])
  703. elif isinstance(tag_item, (int, str)):
  704. try:
  705. tag_id = int(tag_item)
  706. except (ValueError, TypeError):
  707. pass
  708. if tag_id:
  709. DataFlowService._handle_single_tag_relationship(
  710. dataflow_id, tag_id)
  711. @staticmethod
  712. def _handle_single_tag_relationship(dataflow_id, tag_id):
  713. """处理单个标签关系"""
  714. try:
  715. # 查找标签节点
  716. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  717. with connect_graph().session() as session:
  718. result = session.run(query, tag_id=tag_id).data()
  719. if result:
  720. # 创建关系 - 使用ID调用relationship_exists
  721. if dataflow_id and not relationship_exists(
  722. dataflow_id, 'LABEL', tag_id
  723. ):
  724. session.run(
  725. "MATCH (a), (b) WHERE id(a) = $dataflow_id "
  726. "AND id(b) = $tag_id "
  727. "CREATE (a)-[:LABEL]->(b)",
  728. dataflow_id=dataflow_id,
  729. tag_id=tag_id,
  730. )
  731. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  732. except Exception as e:
  733. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  734. @staticmethod
  735. def update_dataflow(
  736. dataflow_id: int,
  737. data: Dict[str, Any],
  738. ) -> Optional[Dict[str, Any]]:
  739. """
  740. 更新数据流
  741. Args:
  742. dataflow_id: 数据流ID
  743. data: 更新的数据
  744. Returns:
  745. 更新后的数据流信息,如果不存在则返回None
  746. """
  747. try:
  748. # 提取 tag 数组(不作为节点属性存储)
  749. tag_list = data.pop('tag', None)
  750. # 查找节点
  751. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  752. with connect_graph().session() as session:
  753. result = session.run(query, dataflow_id=dataflow_id).data()
  754. if not result:
  755. return None
  756. # 更新节点属性
  757. update_fields = []
  758. params: Dict[str, Any] = {'dataflow_id': dataflow_id}
  759. for key, value in data.items():
  760. if key not in ['id', 'created_at']: # 保护字段
  761. # 复杂对象序列化为 JSON 字符串
  762. if key in ['config', 'script_requirement']:
  763. if isinstance(value, dict):
  764. value = json.dumps(value, ensure_ascii=False)
  765. update_fields.append(f"n.{key} = ${key}")
  766. params[key] = value
  767. if update_fields:
  768. params['updated_at'] = get_formatted_time()
  769. update_fields.append("n.updated_at = $updated_at")
  770. update_query = f"""
  771. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  772. SET {', '.join(update_fields)}
  773. RETURN n, id(n) as node_id
  774. """
  775. result = session.run(update_query, params).data()
  776. # 处理 tag 关系(支持多标签数组)
  777. if tag_list is not None:
  778. # 确保是列表格式
  779. if not isinstance(tag_list, list):
  780. tag_list = [tag_list] if tag_list else []
  781. # 先删除现有的 LABEL 关系
  782. delete_query = """
  783. MATCH (n:DataFlow)-[r:LABEL]->(:DataLabel)
  784. WHERE id(n) = $dataflow_id
  785. DELETE r
  786. """
  787. session.run(delete_query, dataflow_id=dataflow_id)
  788. logger.info(f"删除数据流 {dataflow_id} 的现有标签关系")
  789. # 为每个 tag 创建新的 LABEL 关系
  790. for tag_item in tag_list:
  791. tag_id = None
  792. if isinstance(tag_item, dict) and 'id' in tag_item:
  793. tag_id = int(tag_item['id'])
  794. elif isinstance(tag_item, (int, str)):
  795. try:
  796. tag_id = int(tag_item)
  797. except (ValueError, TypeError):
  798. pass
  799. if tag_id:
  800. DataFlowService._handle_single_tag_relationship(
  801. dataflow_id, tag_id
  802. )
  803. if result:
  804. node = result[0]['n']
  805. updated_dataflow = dict(node)
  806. # 使用查询返回的node_id
  807. updated_dataflow['id'] = result[0]['node_id']
  808. logger.info(f"更新数据流成功: ID={dataflow_id}")
  809. return updated_dataflow
  810. return None
  811. except Exception as e:
  812. logger.error(f"更新数据流失败: {str(e)}")
  813. raise e
  814. @staticmethod
  815. def delete_dataflow(dataflow_id: int) -> bool:
  816. """
  817. 删除数据流
  818. Args:
  819. dataflow_id: 数据流ID
  820. Returns:
  821. 删除是否成功
  822. """
  823. try:
  824. # 删除节点及其关系
  825. query = """
  826. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  827. DETACH DELETE n
  828. RETURN count(n) as deleted_count
  829. """
  830. with connect_graph().session() as session:
  831. delete_result = session.run(
  832. query, dataflow_id=dataflow_id).single()
  833. result = delete_result['deleted_count'] if delete_result else 0
  834. if result and result > 0:
  835. logger.info(f"删除数据流成功: ID={dataflow_id}")
  836. return True
  837. return False
  838. except Exception as e:
  839. logger.error(f"删除数据流失败: {str(e)}")
  840. raise e
  841. @staticmethod
  842. def execute_dataflow(
  843. dataflow_id: int,
  844. params: Optional[Dict[str, Any]] = None,
  845. ) -> Dict[str, Any]:
  846. """
  847. 执行数据流
  848. Args:
  849. dataflow_id: 数据流ID
  850. params: 执行参数
  851. Returns:
  852. 执行结果信息
  853. """
  854. try:
  855. # 检查数据流是否存在
  856. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  857. with connect_graph().session() as session:
  858. result = session.run(query, dataflow_id=dataflow_id).data()
  859. if not result:
  860. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  861. execution_id = (
  862. f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  863. )
  864. # TODO: 这里应该实际执行数据流
  865. # 目前返回模拟结果
  866. result = {
  867. 'execution_id': execution_id,
  868. 'dataflow_id': dataflow_id,
  869. 'status': 'running',
  870. 'started_at': datetime.now().isoformat(),
  871. 'params': params or {},
  872. 'progress': 0
  873. }
  874. logger.info(
  875. "开始执行数据流: ID=%s, execution_id=%s",
  876. dataflow_id,
  877. execution_id,
  878. )
  879. return result
  880. except Exception as e:
  881. logger.error(f"执行数据流失败: {str(e)}")
  882. raise e
  883. @staticmethod
  884. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  885. """
  886. 获取数据流执行状态
  887. Args:
  888. dataflow_id: 数据流ID
  889. Returns:
  890. 执行状态信息
  891. """
  892. try:
  893. # TODO: 这里应该查询实际的执行状态
  894. # 目前返回模拟状态
  895. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  896. with connect_graph().session() as session:
  897. result = session.run(query, dataflow_id=dataflow_id).data()
  898. if not result:
  899. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  900. status = ['running', 'completed', 'failed', 'pending'][
  901. dataflow_id % 4
  902. ]
  903. return {
  904. 'dataflow_id': dataflow_id,
  905. 'status': status,
  906. 'progress': (
  907. 100
  908. if status == 'completed'
  909. else (dataflow_id * 10) % 100
  910. ),
  911. 'started_at': datetime.now().isoformat(),
  912. 'completed_at': (
  913. datetime.now().isoformat()
  914. if status == 'completed'
  915. else None
  916. ),
  917. 'error_message': (
  918. '执行过程中发生错误' if status == 'failed' else None
  919. ),
  920. }
  921. except Exception as e:
  922. logger.error(f"获取数据流状态失败: {str(e)}")
  923. raise e
  924. @staticmethod
  925. def get_dataflow_logs(
  926. dataflow_id: int,
  927. page: int = 1,
  928. page_size: int = 50,
  929. ) -> Dict[str, Any]:
  930. """
  931. 获取数据流执行日志
  932. Args:
  933. dataflow_id: 数据流ID
  934. page: 页码
  935. page_size: 每页大小
  936. Returns:
  937. 执行日志列表和分页信息
  938. """
  939. try:
  940. # TODO: 这里应该查询实际的执行日志
  941. # 目前返回模拟日志
  942. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  943. with connect_graph().session() as session:
  944. result = session.run(query, dataflow_id=dataflow_id).data()
  945. if not result:
  946. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  947. mock_logs = [
  948. {
  949. 'id': i,
  950. 'timestamp': datetime.now().isoformat(),
  951. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  952. 'message': f'数据流执行日志消息 {i}',
  953. 'component': ['source', 'transform', 'target'][i % 3]
  954. }
  955. for i in range(1, 101)
  956. ]
  957. # 分页处理
  958. total = len(mock_logs)
  959. start = (page - 1) * page_size
  960. end = start + page_size
  961. logs = mock_logs[start:end]
  962. return {
  963. 'logs': logs,
  964. 'pagination': {
  965. 'page': page,
  966. 'page_size': page_size,
  967. 'total': total,
  968. 'total_pages': (total + page_size - 1) // page_size
  969. }
  970. }
  971. except Exception as e:
  972. logger.error(f"获取数据流日志失败: {str(e)}")
  973. raise e
  974. @staticmethod
  975. def create_script(request_data: Union[Dict[str, Any], str]) -> str:
  976. """
  977. 使用Deepseek模型生成SQL脚本
  978. Args:
  979. request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
  980. Returns:
  981. 生成的SQL脚本内容
  982. """
  983. try:
  984. logger.info(f"开始处理脚本生成请求: {request_data}")
  985. logger.info(f"request_data类型: {type(request_data)}")
  986. # 类型检查和处理
  987. if isinstance(request_data, str):
  988. logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
  989. try:
  990. import json
  991. request_data = json.loads(request_data)
  992. except json.JSONDecodeError as e:
  993. raise ValueError(f"无法解析request_data为JSON: {str(e)}")
  994. if not isinstance(request_data, dict):
  995. raise ValueError(
  996. f"request_data必须是字典类型,实际类型: {type(request_data)}")
  997. # 1. 从传入的request_data中解析input, output, request_content内容
  998. input_data = request_data.get('input', '')
  999. output_data = request_data.get('output', '')
  1000. request_content = request_data.get('request_data', '')
  1001. # 如果request_content是HTML格式,提取纯文本
  1002. if request_content and (
  1003. request_content.startswith('<p>') or '<' in request_content
  1004. ):
  1005. # 简单的HTML标签清理
  1006. import re
  1007. request_content = re.sub(
  1008. r'<[^>]+>', '', request_content).strip()
  1009. if not input_data or not output_data or not request_content:
  1010. raise ValueError(
  1011. "缺少必要参数:input='{}', output='{}', "
  1012. "request_content='{}' 不能为空".format(
  1013. input_data,
  1014. output_data,
  1015. request_content[:100] if request_content else '',
  1016. )
  1017. )
  1018. logger.info(
  1019. "解析得到 - input: %s, output: %s, request_content: %s",
  1020. input_data,
  1021. output_data,
  1022. request_content,
  1023. )
  1024. # 2. 解析input中的多个数据表并生成源表DDL
  1025. source_tables_ddl = []
  1026. input_tables = []
  1027. if input_data:
  1028. tables = [table.strip()
  1029. for table in input_data.split(',') if table.strip()]
  1030. for table in tables:
  1031. ddl = DataFlowService._parse_table_and_get_ddl(
  1032. table, 'input')
  1033. if ddl:
  1034. input_tables.append(table)
  1035. source_tables_ddl.append(ddl)
  1036. else:
  1037. logger.warning(f"无法获取输入表 {table} 的DDL结构")
  1038. # 3. 解析output中的数据表并生成目标表DDL
  1039. target_table_ddl = ""
  1040. if output_data:
  1041. target_table_ddl = DataFlowService._parse_table_and_get_ddl(
  1042. output_data.strip(), 'output')
  1043. if not target_table_ddl:
  1044. logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
  1045. # 4. 按照Deepseek-prompt.txt的框架构建提示语
  1046. prompt_parts = []
  1047. # 开场白 - 角色定义
  1048. prompt_parts.append(
  1049. "你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。"
  1050. "请为以下需求生成一段标准的 PostgreSQL SQL 脚本:"
  1051. )
  1052. # 动态生成源表部分(第1点)
  1053. for i, (table, ddl) in enumerate(
  1054. zip(input_tables, source_tables_ddl), 1
  1055. ):
  1056. table_name = table.split(':')[-1] if ':' in table else table
  1057. prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
  1058. prompt_parts.append(ddl)
  1059. prompt_parts.append("") # 添加空行分隔
  1060. # 动态生成目标表部分(第2点)
  1061. if target_table_ddl:
  1062. target_table_name = output_data.split(
  1063. ':')[-1] if ':' in output_data else output_data
  1064. next_index = len(input_tables) + 1
  1065. prompt_parts.append(
  1066. f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:")
  1067. prompt_parts.append(target_table_ddl)
  1068. prompt_parts.append("") # 添加空行分隔
  1069. # 动态生成处理逻辑部分(第3点)
  1070. next_index = (
  1071. len(input_tables) + 2
  1072. if target_table_ddl
  1073. else len(input_tables) + 1
  1074. )
  1075. prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
  1076. prompt_parts.append("") # 添加空行分隔
  1077. # 固定的技术要求部分(第4-8点)
  1078. tech_requirements = [
  1079. (
  1080. f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,"
  1081. "适合在 Airflow、Python 脚本、或调度系统中调用;"
  1082. ),
  1083. f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
  1084. f"{next_index + 3}.请直接输出SQL,无需进行解释。",
  1085. (
  1086. f"{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用\"_\"分隔,"
  1087. "采用蛇形命名法。把sql的名字作为注释写在返回的sql中。"
  1088. ),
  1089. (
  1090. f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期"
  1091. "时间now(),不用处理update_time字段"
  1092. ),
  1093. ]
  1094. prompt_parts.extend(tech_requirements)
  1095. # 组合完整的提示语
  1096. full_prompt = "\n".join(prompt_parts)
  1097. logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
  1098. logger.info(f"完整提示语内容: {full_prompt}")
  1099. # 5. 调用LLM生成SQL脚本
  1100. logger.info("开始调用Deepseek模型生成SQL脚本")
  1101. script_content = llm_sql(full_prompt)
  1102. if not script_content:
  1103. raise ValueError("Deepseek模型返回空内容")
  1104. # 确保返回的是文本格式
  1105. if not isinstance(script_content, str):
  1106. script_content = str(script_content)
  1107. logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
  1108. return script_content
  1109. except Exception as e:
  1110. logger.error(f"生成SQL脚本失败: {str(e)}")
  1111. raise e
  1112. @staticmethod
  1113. def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
  1114. """
  1115. 解析表格式(A:B)并从Neo4j查询元数据生成DDL
  1116. Args:
  1117. table_str: 表格式字符串,格式为"label:name_en"
  1118. table_type: 表类型,用于日志记录(input/output)
  1119. Returns:
  1120. DDL格式的表结构字符串
  1121. """
  1122. try:
  1123. # 解析A:B格式
  1124. if ':' not in table_str:
  1125. logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
  1126. return ""
  1127. parts = table_str.split(':', 1)
  1128. if len(parts) != 2:
  1129. logger.error(f"表格式解析失败: {table_str}")
  1130. return ""
  1131. label = parts[0].strip()
  1132. name_en = parts[1].strip()
  1133. if not label or not name_en:
  1134. logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
  1135. return ""
  1136. logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
  1137. # 从Neo4j查询节点及其关联的元数据
  1138. with connect_graph().session() as session:
  1139. # 查询节点及其关联的元数据
  1140. cypher = f"""
  1141. MATCH (n:{label} {{name_en: $name_en}})
  1142. OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
  1143. RETURN n, collect(m) as metadata
  1144. """
  1145. result = session.run(
  1146. cypher, # type: ignore[arg-type]
  1147. {'name_en': name_en},
  1148. )
  1149. record = result.single()
  1150. if not record:
  1151. logger.error(f"未找到节点: label={label}, name_en={name_en}")
  1152. return ""
  1153. node = record['n']
  1154. metadata = record['metadata']
  1155. logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
  1156. # 生成DDL格式的表结构
  1157. ddl_lines = []
  1158. ddl_lines.append(f"CREATE TABLE {name_en} (")
  1159. if metadata:
  1160. column_definitions = []
  1161. for meta in metadata:
  1162. if meta: # 确保meta不为空
  1163. meta_props = dict(meta)
  1164. column_name = meta_props.get(
  1165. 'name_en',
  1166. meta_props.get('name_zh', 'unknown_column'),
  1167. )
  1168. data_type = meta_props.get(
  1169. 'data_type', 'VARCHAR(255)')
  1170. comment = meta_props.get('name_zh', '')
  1171. # 构建列定义
  1172. column_def = f" {column_name} {data_type}"
  1173. if comment:
  1174. column_def += f" COMMENT '{comment}'"
  1175. column_definitions.append(column_def)
  1176. if column_definitions:
  1177. ddl_lines.append(",\n".join(column_definitions))
  1178. else:
  1179. ddl_lines.append(
  1180. " id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1181. else:
  1182. # 如果没有元数据,添加默认列
  1183. ddl_lines.append(
  1184. " id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1185. ddl_lines.append(");")
  1186. # 添加表注释
  1187. node_props = dict(node)
  1188. table_comment = node_props.get(
  1189. 'name_zh', node_props.get('describe', name_en))
  1190. if table_comment and table_comment != name_en:
  1191. ddl_lines.append(
  1192. f"COMMENT ON TABLE {name_en} IS '{table_comment}';")
  1193. ddl_content = "\n".join(ddl_lines)
  1194. logger.info(f"{table_type}表DDL生成成功: {name_en}")
  1195. logger.debug(f"生成的DDL: {ddl_content}")
  1196. return ddl_content
  1197. except Exception as e:
  1198. logger.error(f"解析表格式和生成DDL失败: {str(e)}")
  1199. return ""
  1200. @staticmethod
  1201. def _generate_businessdomain_ddl(
  1202. session,
  1203. bd_id: int,
  1204. is_target: bool = False,
  1205. update_mode: str = 'append',
  1206. ) -> Optional[Dict[str, Any]]:
  1207. """
  1208. 根据BusinessDomain节点ID生成DDL
  1209. Args:
  1210. session: Neo4j session对象
  1211. bd_id: BusinessDomain节点ID
  1212. is_target: 是否为目标表(目标表需要添加create_time字段)
  1213. update_mode: 更新模式(append或full)
  1214. Returns:
  1215. 包含ddl和data_source信息的字典,如果节点不存在则返回None
  1216. """
  1217. try:
  1218. # 查询BusinessDomain节点、元数据、标签关系和数据源关系
  1219. cypher = """
  1220. MATCH (bd:BusinessDomain)
  1221. WHERE id(bd) = $bd_id
  1222. OPTIONAL MATCH (bd)-[:INCLUDES]->(m:DataMeta)
  1223. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1224. OPTIONAL MATCH (bd)-[:COME_FROM]->(ds:DataSource)
  1225. RETURN bd,
  1226. collect(DISTINCT m) as metadata,
  1227. collect(DISTINCT {
  1228. id: id(label),
  1229. name_zh: label.name_zh,
  1230. name_en: label.name_en
  1231. }) as labels,
  1232. ds.type as ds_type,
  1233. ds.host as ds_host,
  1234. ds.port as ds_port,
  1235. ds.database as ds_database
  1236. """
  1237. result = session.run(cypher, bd_id=bd_id).single()
  1238. if not result or not result['bd']:
  1239. logger.warning(f"未找到ID为 {bd_id} 的BusinessDomain节点")
  1240. return None
  1241. node = result['bd']
  1242. metadata = result['metadata']
  1243. # 处理标签数组,获取第一个有效标签名称用于判断
  1244. labels = result.get('labels', [])
  1245. valid_labels = [
  1246. label for label in labels if label.get('id') is not None
  1247. ]
  1248. label_name = (
  1249. valid_labels[0].get('name_zh') if valid_labels else None
  1250. )
  1251. # 生成DDL
  1252. node_props = dict(node)
  1253. table_name = node_props.get('name_en', f'table_{bd_id}')
  1254. ddl_lines = []
  1255. ddl_lines.append(f"CREATE TABLE {table_name} (")
  1256. column_definitions = []
  1257. # 添加元数据列
  1258. if metadata:
  1259. for meta in metadata:
  1260. if meta:
  1261. meta_props = dict(meta)
  1262. column_name = meta_props.get(
  1263. 'name_en',
  1264. meta_props.get('name_zh', 'unknown_column'),
  1265. )
  1266. data_type = meta_props.get('data_type', 'VARCHAR(255)')
  1267. comment = meta_props.get('name_zh', '')
  1268. column_def = f" {column_name} {data_type}"
  1269. if comment:
  1270. column_def += f" COMMENT '{comment}'"
  1271. column_definitions.append(column_def)
  1272. # 如果没有元数据,添加默认主键
  1273. if not column_definitions:
  1274. column_definitions.append(
  1275. " id BIGINT PRIMARY KEY COMMENT '主键ID'")
  1276. # 5. 如果是目标表,添加create_time字段
  1277. if is_target:
  1278. column_definitions.append(
  1279. " create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP "
  1280. "COMMENT '数据创建时间'"
  1281. )
  1282. ddl_lines.append(",\n".join(column_definitions))
  1283. ddl_lines.append(");")
  1284. # 添加表注释
  1285. table_comment = node_props.get(
  1286. 'name_zh', node_props.get('describe', table_name))
  1287. if table_comment and table_comment != table_name:
  1288. ddl_lines.append(
  1289. f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
  1290. ddl_content = "\n".join(ddl_lines)
  1291. # 3. 检查BELONGS_TO关系是否连接"数据资源",如果是则返回数据源信息
  1292. data_source = None
  1293. if label_name == '数据资源' and result['ds_type']:
  1294. data_source = {
  1295. 'type': result['ds_type'],
  1296. 'host': result['ds_host'],
  1297. 'port': result['ds_port'],
  1298. 'database': result['ds_database']
  1299. }
  1300. logger.info(f"获取到数据源信息: {data_source}")
  1301. logger.debug(
  1302. f"生成BusinessDomain DDL成功: {table_name}, is_target={is_target}")
  1303. return {
  1304. 'ddl': ddl_content,
  1305. 'table_name': table_name,
  1306. 'data_source': data_source
  1307. }
  1308. except Exception as e:
  1309. logger.error(f"生成BusinessDomain DDL失败,ID={bd_id}: {str(e)}")
  1310. return None
  1311. @staticmethod
  1312. def _handle_script_relationships(
  1313. data: Dict[str, Any],
  1314. dataflow_name: str,
  1315. name_en: str,
  1316. ):
  1317. """
  1318. 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的
  1319. DERIVED_FROM关系
  1320. Args:
  1321. data: 包含脚本信息的数据字典,应包含script_name, script_type,
  1322. schedule_status, source_table, target_table, update_mode
  1323. """
  1324. try:
  1325. # 从data中读取键值对
  1326. script_name = dataflow_name,
  1327. script_type = data.get('script_type', 'sql')
  1328. schedule_status = data.get('status', 'inactive')
  1329. source_table_full = data.get('source_table', '')
  1330. target_table_full = data.get('target_table', '')
  1331. update_mode = data.get('update_mode', 'full')
  1332. # 处理source_table和target_table的格式
  1333. source_table = source_table_full.split(
  1334. ':')[-1] if ':' in source_table_full else source_table_full
  1335. target_table = target_table_full.split(
  1336. ':')[-1] if ':' in target_table_full else target_table_full
  1337. source_label = source_table_full.split(
  1338. ':')[0] if ':' in source_table_full else source_table_full
  1339. target_label = target_table_full.split(
  1340. ':')[0] if ':' in target_table_full else target_table_full
  1341. # 验证必要字段
  1342. if not source_table or not target_table:
  1343. logger.warning(
  1344. "source_table或target_table为空,跳过关系创建: "
  1345. "source_table=%s, target_table=%s",
  1346. source_table,
  1347. target_table,
  1348. )
  1349. return
  1350. logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
  1351. with connect_graph().session() as session:
  1352. # 创建或获取source和target节点
  1353. create_nodes_query = f"""
  1354. MERGE (source:{source_label} {{name: $source_table}})
  1355. ON CREATE SET source.created_at = $created_at,
  1356. source.type = 'source'
  1357. WITH source
  1358. MERGE (target:{target_label} {{name: $target_table}})
  1359. ON CREATE SET target.created_at = $created_at,
  1360. target.type = 'target'
  1361. RETURN source, target, id(source) as source_id,
  1362. id(target) as target_id
  1363. """
  1364. # 执行创建节点的查询
  1365. result = session.run(
  1366. create_nodes_query, # type: ignore[arg-type]
  1367. {
  1368. 'source_table': source_table,
  1369. 'target_table': target_table,
  1370. 'created_at': get_formatted_time(),
  1371. },
  1372. ).single()
  1373. if result:
  1374. source_id = result['source_id']
  1375. target_id = result['target_id']
  1376. # 检查并创建关系
  1377. create_relationship_query = f"""
  1378. MATCH (source:{source_label}), (target:{target_label})
  1379. WHERE id(source) = $source_id AND id(target) = $target_id
  1380. AND NOT EXISTS((target)-[:DERIVED_FROM]->(source))
  1381. CREATE (target)-[r:DERIVED_FROM]->(source)
  1382. SET r.script_name = $script_name,
  1383. r.script_type = $script_type,
  1384. r.schedule_status = $schedule_status,
  1385. r.update_mode = $update_mode,
  1386. r.created_at = $created_at,
  1387. r.updated_at = $created_at
  1388. RETURN r
  1389. """
  1390. relationship_result = session.run(
  1391. create_relationship_query, # type: ignore[arg-type]
  1392. {
  1393. 'source_id': source_id,
  1394. 'target_id': target_id,
  1395. 'script_name': script_name,
  1396. 'script_type': script_type,
  1397. 'schedule_status': schedule_status,
  1398. 'update_mode': update_mode,
  1399. 'created_at': get_formatted_time(),
  1400. },
  1401. ).single()
  1402. if relationship_result:
  1403. logger.info(
  1404. "成功创建DERIVED_FROM关系: %s -> %s (script: %s)",
  1405. target_table,
  1406. source_table,
  1407. script_name,
  1408. )
  1409. else:
  1410. logger.info(
  1411. "DERIVED_FROM关系已存在: %s -> %s",
  1412. target_table,
  1413. source_table,
  1414. )
  1415. else:
  1416. logger.error(
  1417. "创建表节点失败: source_table=%s, target_table=%s",
  1418. source_table,
  1419. target_table,
  1420. )
  1421. except Exception as e:
  1422. logger.error(f"处理脚本关系失败: {str(e)}")
  1423. raise e
  1424. @staticmethod
  1425. def get_business_domain_list() -> List[Dict[str, Any]]:
  1426. """
  1427. 获取BusinessDomain节点列表
  1428. Returns:
  1429. BusinessDomain节点列表,每个节点包含 id, name_zh, name_en, tag
  1430. """
  1431. try:
  1432. logger.info("开始查询BusinessDomain节点列表")
  1433. with connect_graph().session() as session:
  1434. # 查询所有BusinessDomain节点及其LABEL关系指向的标签(支持多标签)
  1435. query = """
  1436. MATCH (bd:BusinessDomain)
  1437. OPTIONAL MATCH (bd)-[:LABEL]->(label:DataLabel)
  1438. RETURN id(bd) as id,
  1439. bd.name_zh as name_zh,
  1440. bd.name_en as name_en,
  1441. bd.create_time as create_time,
  1442. collect({
  1443. id: id(label),
  1444. name_zh: label.name_zh,
  1445. name_en: label.name_en
  1446. }) as tags
  1447. ORDER BY create_time DESC
  1448. """
  1449. result = session.run(query)
  1450. bd_list = []
  1451. for record in result:
  1452. # 处理标签数组,过滤掉空标签
  1453. tags = record.get("tags", [])
  1454. tag_list = [
  1455. tag for tag in tags
  1456. if tag.get('id') is not None
  1457. ]
  1458. bd_item = {
  1459. "id": record["id"],
  1460. "name_zh": record.get("name_zh", "") or "",
  1461. "name_en": record.get("name_en", "") or "",
  1462. "tag": tag_list,
  1463. }
  1464. bd_list.append(bd_item)
  1465. logger.info(f"成功查询到 {len(bd_list)} 个BusinessDomain节点")
  1466. return bd_list
  1467. except Exception as e:
  1468. logger.error(f"查询BusinessDomain节点列表失败: {str(e)}")
  1469. raise e