dataflows.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932
  1. import logging
  2. from typing import Dict, List, Optional, Any, Union
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_client, llm_sql
  6. from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
  7. from app.core.meta_data import translate_and_parse, get_formatted_time
  8. from py2neo import Relationship
  9. from app import db
  10. from sqlalchemy import text
  11. logger = logging.getLogger(__name__)
  12. class DataFlowService:
  13. """数据流服务类,处理数据流相关的业务逻辑"""
  14. @staticmethod
  15. def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
  16. """
  17. 获取数据流列表
  18. Args:
  19. page: 页码
  20. page_size: 每页大小
  21. search: 搜索关键词
  22. Returns:
  23. 包含数据流列表和分页信息的字典
  24. """
  25. try:
  26. # 从图数据库查询数据流列表
  27. skip_count = (page - 1) * page_size
  28. # 构建搜索条件
  29. where_clause = ""
  30. params = {'skip': skip_count, 'limit': page_size}
  31. if search:
  32. where_clause = "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
  33. params['search'] = search
  34. # 查询数据流列表
  35. query = f"""
  36. MATCH (n:DataFlow)
  37. {where_clause}
  38. RETURN n, id(n) as node_id
  39. ORDER BY n.created_at DESC
  40. SKIP $skip
  41. LIMIT $limit
  42. """
  43. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  44. driver = None
  45. try:
  46. driver = connect_graph()
  47. with driver.session() as session:
  48. list_result = session.run(query, **params).data()
  49. # 查询总数
  50. count_query = f"""
  51. MATCH (n:DataFlow)
  52. {where_clause}
  53. RETURN count(n) as total
  54. """
  55. count_params = {'search': search} if search else {}
  56. count_result = session.run(count_query, **count_params).single()
  57. total = count_result['total'] if count_result else 0
  58. finally:
  59. # 确保 driver 被正确关闭,避免资源泄漏
  60. if driver:
  61. driver.close()
  62. # 格式化结果
  63. dataflows = []
  64. for record in list_result:
  65. node = record['n']
  66. dataflow = dict(node)
  67. dataflow['id'] = record['node_id'] # 使用查询返回的node_id
  68. dataflows.append(dataflow)
  69. return {
  70. 'list': dataflows,
  71. 'pagination': {
  72. 'page': page,
  73. 'page_size': page_size,
  74. 'total': total,
  75. 'total_pages': (total + page_size - 1) // page_size
  76. }
  77. }
  78. except Exception as e:
  79. logger.error(f"获取数据流列表失败: {str(e)}")
  80. raise e
  81. @staticmethod
  82. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  83. """
  84. 根据ID获取数据流详情
  85. Args:
  86. dataflow_id: 数据流ID
  87. Returns:
  88. 数据流详情字典,如果不存在则返回None
  89. """
  90. try:
  91. # 从Neo4j获取基本信息
  92. neo4j_query = """
  93. MATCH (n:DataFlow)
  94. WHERE id(n) = $dataflow_id
  95. OPTIONAL MATCH (n)-[:LABEL]-(la:DataLabel)
  96. RETURN n, id(n) as node_id,
  97. collect(DISTINCT {id: id(la), name: la.name}) as tags
  98. """
  99. with connect_graph().session() as session:
  100. neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
  101. if not neo4j_result:
  102. return None
  103. record = neo4j_result[0]
  104. node = record['n']
  105. dataflow = dict(node)
  106. dataflow['id'] = record['node_id']
  107. dataflow['tags'] = record['tags']
  108. # 从PostgreSQL获取额外信息
  109. pg_query = """
  110. SELECT
  111. source_table,
  112. target_table,
  113. script_name,
  114. script_type,
  115. script_requirement,
  116. script_content,
  117. user_name,
  118. create_time,
  119. update_time,
  120. target_dt_column
  121. FROM dags.data_transform_scripts
  122. WHERE script_name = :script_name
  123. """
  124. with db.engine.connect() as conn:
  125. pg_result = conn.execute(text(pg_query), {"script_name": dataflow.get('name_zh')}).fetchone()
  126. if pg_result:
  127. # 将PostgreSQL数据添加到结果中
  128. dataflow.update({
  129. 'source_table': pg_result.source_table,
  130. 'target_table': pg_result.target_table,
  131. 'script_type': pg_result.script_type,
  132. 'script_requirement': pg_result.script_requirement,
  133. 'script_content': pg_result.script_content,
  134. 'created_by': pg_result.user_name,
  135. 'pg_created_at': pg_result.create_time,
  136. 'pg_updated_at': pg_result.update_time,
  137. 'target_dt_column': pg_result.target_dt_column
  138. })
  139. return dataflow
  140. except Exception as e:
  141. logger.error(f"获取数据流详情失败: {str(e)}")
  142. raise e
  143. @staticmethod
  144. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  145. """
  146. 创建新的数据流
  147. Args:
  148. data: 数据流配置数据
  149. Returns:
  150. 创建的数据流信息
  151. """
  152. try:
  153. # 验证必填字段
  154. required_fields = ['name_zh', 'describe']
  155. for field in required_fields:
  156. if field not in data:
  157. raise ValueError(f"缺少必填字段: {field}")
  158. dataflow_name = data['name_zh']
  159. # 使用LLM翻译名称生成英文名
  160. try:
  161. result_list = translate_and_parse(dataflow_name)
  162. name_en = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
  163. except Exception as e:
  164. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  165. name_en = dataflow_name.lower().replace(' ', '_')
  166. # 准备节点数据
  167. node_data = {
  168. 'name_zh': dataflow_name,
  169. 'name_en': name_en,
  170. 'category': data.get('category', ''),
  171. 'organization': data.get('organization', ''),
  172. 'leader': data.get('leader', ''),
  173. 'frequency': data.get('frequency', ''),
  174. 'tag': data.get('tag', ''),
  175. 'describe': data.get('describe', ''),
  176. 'status': data.get('status', 'inactive'),
  177. 'update_mode': data.get('update_mode', 'append'),
  178. 'created_at': get_formatted_time(),
  179. 'updated_at': get_formatted_time()
  180. }
  181. # 创建或获取数据流节点
  182. dataflow_id = get_node('DataFlow', name=dataflow_name)
  183. if dataflow_id:
  184. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  185. dataflow_id = create_or_get_node('DataFlow', **node_data)
  186. # 处理标签关系
  187. tag_id = data.get('tag')
  188. if tag_id is not None:
  189. try:
  190. DataFlowService._handle_tag_relationship(dataflow_id, tag_id)
  191. except Exception as e:
  192. logger.warning(f"处理标签关系时出错: {str(e)}")
  193. # 成功创建图数据库节点后,写入PG数据库
  194. try:
  195. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  196. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  197. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  198. try:
  199. DataFlowService._handle_script_relationships(data,dataflow_name,name_en)
  200. logger.info(f"脚本关系创建成功: {dataflow_name}")
  201. except Exception as script_error:
  202. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  203. except Exception as pg_error:
  204. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  205. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  206. # 在实际应用中,可能需要考虑分布式事务
  207. # 返回创建的数据流信息
  208. # 查询创建的节点获取完整信息
  209. query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
  210. with connect_graph().session() as session:
  211. id_result = session.run(query, name_zh=dataflow_name).single()
  212. if id_result:
  213. dataflow_node = id_result['n']
  214. node_id = id_result['node_id']
  215. # 将节点属性转换为字典
  216. result = dict(dataflow_node)
  217. result['id'] = node_id
  218. else:
  219. # 如果查询失败,返回基本信息
  220. result = {
  221. 'id': dataflow_id if isinstance(dataflow_id, int) else None,
  222. 'name_zh': dataflow_name,
  223. 'name_en': name_en,
  224. 'created_at': get_formatted_time()
  225. }
  226. logger.info(f"创建数据流成功: {dataflow_name}")
  227. return result
  228. except Exception as e:
  229. logger.error(f"创建数据流失败: {str(e)}")
  230. raise e
  231. @staticmethod
  232. def _save_to_pg_database(data: Dict[str, Any], script_name: str, name_en: str):
  233. """
  234. 将脚本信息保存到PG数据库
  235. Args:
  236. data: 包含脚本信息的数据
  237. script_name: 脚本名称
  238. name_en: 英文名称
  239. """
  240. try:
  241. # 提取脚本相关信息
  242. script_requirement = data.get('script_requirement', '')
  243. script_content = data.get('script_content', '')
  244. source_table = data.get('source_table', '').split(':')[-1] if ':' in data.get('source_table', '') else data.get('source_table', '')
  245. target_table = data.get('target_table', '').split(':')[-1] if ':' in data.get('target_table', '') else data.get('target_table', name_en) # 如果没有指定目标表,使用英文名
  246. script_type = data.get('script_type', 'python')
  247. user_name = data.get('created_by', 'system')
  248. target_dt_column = data.get('target_dt_column', '')
  249. # 验证必需字段
  250. if not target_table:
  251. target_table = name_en
  252. if not script_name:
  253. raise ValueError("script_name不能为空")
  254. # 构建插入SQL
  255. insert_sql = text("""
  256. INSERT INTO dags.data_transform_scripts
  257. (source_table, target_table, script_name, script_type, script_requirement,
  258. script_content, user_name, create_time, update_time, target_dt_column)
  259. VALUES
  260. (:source_table, :target_table, :script_name, :script_type, :script_requirement,
  261. :script_content, :user_name, :create_time, :update_time, :target_dt_column)
  262. ON CONFLICT (target_table, script_name)
  263. DO UPDATE SET
  264. source_table = EXCLUDED.source_table,
  265. script_type = EXCLUDED.script_type,
  266. script_requirement = EXCLUDED.script_requirement,
  267. script_content = EXCLUDED.script_content,
  268. user_name = EXCLUDED.user_name,
  269. update_time = EXCLUDED.update_time,
  270. target_dt_column = EXCLUDED.target_dt_column
  271. """)
  272. # 准备参数
  273. current_time = datetime.now()
  274. params = {
  275. 'source_table': source_table,
  276. 'target_table': target_table,
  277. 'script_name': script_name,
  278. 'script_type': script_type,
  279. 'script_requirement': script_requirement,
  280. 'script_content': script_content,
  281. 'user_name': user_name,
  282. 'create_time': current_time,
  283. 'update_time': current_time,
  284. 'target_dt_column': target_dt_column
  285. }
  286. # 执行插入操作
  287. db.session.execute(insert_sql, params)
  288. db.session.commit()
  289. logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")
  290. except Exception as e:
  291. db.session.rollback()
  292. logger.error(f"写入PG数据库失败: {str(e)}")
  293. raise e
  294. @staticmethod
  295. def _handle_children_relationships(dataflow_node, children_ids):
  296. """处理子节点关系"""
  297. logger.debug(f"处理子节点关系,原始children_ids: {children_ids}, 类型: {type(children_ids)}")
  298. # 确保children_ids是列表格式
  299. if not isinstance(children_ids, (list, tuple)):
  300. if children_ids is not None:
  301. children_ids = [children_ids] # 如果是单个值,转换为列表
  302. logger.debug(f"将单个值转换为列表: {children_ids}")
  303. else:
  304. children_ids = [] # 如果是None,转换为空列表
  305. logger.debug("将None转换为空列表")
  306. for child_id in children_ids:
  307. try:
  308. # 查找子节点
  309. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  310. with connect_graph().session() as session:
  311. result = session.run(query, child_id=child_id).data()
  312. if result:
  313. child_node = result[0]['n']
  314. # 获取dataflow_node的ID
  315. dataflow_id = getattr(dataflow_node, 'identity', None)
  316. if dataflow_id is None:
  317. # 如果没有identity属性,从名称查询ID
  318. query_id = "MATCH (n:DataFlow) WHERE n.name_zh = $name_zh RETURN id(n) as node_id"
  319. id_result = session.run(query_id, name_zh=dataflow_node.get('name_zh')).single()
  320. dataflow_id = id_result['node_id'] if id_result else None
  321. # 创建关系 - 使用ID调用relationship_exists
  322. if dataflow_id and not relationship_exists(dataflow_id, 'child', child_id):
  323. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $child_id CREATE (a)-[:child]->(b)",
  324. dataflow_id=dataflow_id, child_id=child_id)
  325. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  326. except Exception as e:
  327. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  328. @staticmethod
  329. def _handle_tag_relationship(dataflow_id, tag_id):
  330. """处理标签关系"""
  331. try:
  332. # 查找标签节点
  333. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  334. with connect_graph().session() as session:
  335. result = session.run(query, tag_id=tag_id).data()
  336. if result:
  337. tag_node = result[0]['n']
  338. # 创建关系 - 使用ID调用relationship_exists
  339. if dataflow_id and not relationship_exists(dataflow_id, 'LABEL', tag_id):
  340. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $tag_id CREATE (a)-[:LABEL]->(b)",
  341. dataflow_id=dataflow_id, tag_id=tag_id)
  342. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  343. except Exception as e:
  344. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  345. @staticmethod
  346. def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
  347. """
  348. 更新数据流
  349. Args:
  350. dataflow_id: 数据流ID
  351. data: 更新的数据
  352. Returns:
  353. 更新后的数据流信息,如果不存在则返回None
  354. """
  355. try:
  356. # 查找节点
  357. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  358. with connect_graph().session() as session:
  359. result = session.run(query, dataflow_id=dataflow_id).data()
  360. if not result:
  361. return None
  362. # 更新节点属性
  363. update_fields = []
  364. params = {'dataflow_id': dataflow_id}
  365. for key, value in data.items():
  366. if key not in ['id', 'created_at']: # 保护字段
  367. if key == 'config' and isinstance(value, dict):
  368. value = json.dumps(value, ensure_ascii=False)
  369. update_fields.append(f"n.{key} = ${key}")
  370. params[key] = value
  371. if update_fields:
  372. params['updated_at'] = get_formatted_time()
  373. update_fields.append("n.updated_at = $updated_at")
  374. update_query = f"""
  375. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  376. SET {', '.join(update_fields)}
  377. RETURN n, id(n) as node_id
  378. """
  379. result = session.run(update_query, **params).data()
  380. if result:
  381. node = result[0]['n']
  382. updated_dataflow = dict(node)
  383. updated_dataflow['id'] = result[0]['node_id'] # 使用查询返回的node_id
  384. logger.info(f"更新数据流成功: ID={dataflow_id}")
  385. return updated_dataflow
  386. return None
  387. except Exception as e:
  388. logger.error(f"更新数据流失败: {str(e)}")
  389. raise e
  390. @staticmethod
  391. def delete_dataflow(dataflow_id: int) -> bool:
  392. """
  393. 删除数据流
  394. Args:
  395. dataflow_id: 数据流ID
  396. Returns:
  397. 删除是否成功
  398. """
  399. try:
  400. # 删除节点及其关系
  401. query = """
  402. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  403. DETACH DELETE n
  404. RETURN count(n) as deleted_count
  405. """
  406. with connect_graph().session() as session:
  407. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  408. result = delete_result['deleted_count'] if delete_result else 0
  409. if result and result > 0:
  410. logger.info(f"删除数据流成功: ID={dataflow_id}")
  411. return True
  412. return False
  413. except Exception as e:
  414. logger.error(f"删除数据流失败: {str(e)}")
  415. raise e
  416. @staticmethod
  417. def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
  418. """
  419. 执行数据流
  420. Args:
  421. dataflow_id: 数据流ID
  422. params: 执行参数
  423. Returns:
  424. 执行结果信息
  425. """
  426. try:
  427. # 检查数据流是否存在
  428. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  429. with connect_graph().session() as session:
  430. result = session.run(query, dataflow_id=dataflow_id).data()
  431. if not result:
  432. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  433. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  434. # TODO: 这里应该实际执行数据流
  435. # 目前返回模拟结果
  436. result = {
  437. 'execution_id': execution_id,
  438. 'dataflow_id': dataflow_id,
  439. 'status': 'running',
  440. 'started_at': datetime.now().isoformat(),
  441. 'params': params or {},
  442. 'progress': 0
  443. }
  444. logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
  445. return result
  446. except Exception as e:
  447. logger.error(f"执行数据流失败: {str(e)}")
  448. raise e
  449. @staticmethod
  450. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  451. """
  452. 获取数据流执行状态
  453. Args:
  454. dataflow_id: 数据流ID
  455. Returns:
  456. 执行状态信息
  457. """
  458. try:
  459. # TODO: 这里应该查询实际的执行状态
  460. # 目前返回模拟状态
  461. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  462. with connect_graph().session() as session:
  463. result = session.run(query, dataflow_id=dataflow_id).data()
  464. if not result:
  465. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  466. status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
  467. return {
  468. 'dataflow_id': dataflow_id,
  469. 'status': status,
  470. 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
  471. 'started_at': datetime.now().isoformat(),
  472. 'completed_at': datetime.now().isoformat() if status == 'completed' else None,
  473. 'error_message': '执行过程中发生错误' if status == 'failed' else None
  474. }
  475. except Exception as e:
  476. logger.error(f"获取数据流状态失败: {str(e)}")
  477. raise e
  478. @staticmethod
  479. def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
  480. """
  481. 获取数据流执行日志
  482. Args:
  483. dataflow_id: 数据流ID
  484. page: 页码
  485. page_size: 每页大小
  486. Returns:
  487. 执行日志列表和分页信息
  488. """
  489. try:
  490. # TODO: 这里应该查询实际的执行日志
  491. # 目前返回模拟日志
  492. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  493. with connect_graph().session() as session:
  494. result = session.run(query, dataflow_id=dataflow_id).data()
  495. if not result:
  496. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  497. mock_logs = [
  498. {
  499. 'id': i,
  500. 'timestamp': datetime.now().isoformat(),
  501. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  502. 'message': f'数据流执行日志消息 {i}',
  503. 'component': ['source', 'transform', 'target'][i % 3]
  504. }
  505. for i in range(1, 101)
  506. ]
  507. # 分页处理
  508. total = len(mock_logs)
  509. start = (page - 1) * page_size
  510. end = start + page_size
  511. logs = mock_logs[start:end]
  512. return {
  513. 'logs': logs,
  514. 'pagination': {
  515. 'page': page,
  516. 'page_size': page_size,
  517. 'total': total,
  518. 'total_pages': (total + page_size - 1) // page_size
  519. }
  520. }
  521. except Exception as e:
  522. logger.error(f"获取数据流日志失败: {str(e)}")
  523. raise e
  524. @staticmethod
  525. def create_script(request_data: Union[Dict[str, Any], str]) -> str:
  526. """
  527. 使用Deepseek模型生成SQL脚本
  528. Args:
  529. request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
  530. Returns:
  531. 生成的SQL脚本内容
  532. """
  533. try:
  534. logger.info(f"开始处理脚本生成请求: {request_data}")
  535. logger.info(f"request_data类型: {type(request_data)}")
  536. # 类型检查和处理
  537. if isinstance(request_data, str):
  538. logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
  539. try:
  540. import json
  541. request_data = json.loads(request_data)
  542. except json.JSONDecodeError as e:
  543. raise ValueError(f"无法解析request_data为JSON: {str(e)}")
  544. if not isinstance(request_data, dict):
  545. raise ValueError(f"request_data必须是字典类型,实际类型: {type(request_data)}")
  546. # 1. 从传入的request_data中解析input, output, request_content内容
  547. input_data = request_data.get('input', '')
  548. output_data = request_data.get('output', '')
  549. request_content = request_data.get('request_data', '')
  550. # 如果request_content是HTML格式,提取纯文本
  551. if request_content and (request_content.startswith('<p>') or '<' in request_content):
  552. # 简单的HTML标签清理
  553. import re
  554. request_content = re.sub(r'<[^>]+>', '', request_content).strip()
  555. if not input_data or not output_data or not request_content:
  556. raise ValueError(f"缺少必要参数:input='{input_data}', output='{output_data}', request_content='{request_content[:100] if request_content else ''}' 不能为空")
  557. logger.info(f"解析得到 - input: {input_data}, output: {output_data}, request_content: {request_content}")
  558. # 2. 解析input中的多个数据表并生成源表DDL
  559. source_tables_ddl = []
  560. input_tables = []
  561. if input_data:
  562. tables = [table.strip() for table in input_data.split(',') if table.strip()]
  563. for table in tables:
  564. ddl = DataFlowService._parse_table_and_get_ddl(table, 'input')
  565. if ddl:
  566. input_tables.append(table)
  567. source_tables_ddl.append(ddl)
  568. else:
  569. logger.warning(f"无法获取输入表 {table} 的DDL结构")
  570. # 3. 解析output中的数据表并生成目标表DDL
  571. target_table_ddl = ""
  572. if output_data:
  573. target_table_ddl = DataFlowService._parse_table_and_get_ddl(output_data.strip(), 'output')
  574. if not target_table_ddl:
  575. logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
  576. # 4. 按照Deepseek-prompt.txt的框架构建提示语
  577. prompt_parts = []
  578. # 开场白 - 角色定义
  579. prompt_parts.append("你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。请为以下需求生成一段标准的 PostgreSQL SQL 脚本:")
  580. # 动态生成源表部分(第1点)
  581. for i, (table, ddl) in enumerate(zip(input_tables, source_tables_ddl), 1):
  582. table_name = table.split(':')[-1] if ':' in table else table
  583. prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
  584. prompt_parts.append(ddl)
  585. prompt_parts.append("") # 添加空行分隔
  586. # 动态生成目标表部分(第2点)
  587. if target_table_ddl:
  588. target_table_name = output_data.split(':')[-1] if ':' in output_data else output_data
  589. next_index = len(input_tables) + 1
  590. prompt_parts.append(f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:")
  591. prompt_parts.append(target_table_ddl)
  592. prompt_parts.append("") # 添加空行分隔
  593. # 动态生成处理逻辑部分(第3点)
  594. next_index = len(input_tables) + 2 if target_table_ddl else len(input_tables) + 1
  595. prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
  596. prompt_parts.append("") # 添加空行分隔
  597. # 固定的技术要求部分(第4-8点)
  598. tech_requirements = [
  599. f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,适合在 Airflow、Python 脚本、或调度系统中调用;",
  600. f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
  601. f"{next_index + 3}.请直接输出SQL,无需进行解释。",
  602. f"{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用\"_\"分隔,采用蛇形命名法。把sql的名字作为注释写在返回的sql中。",
  603. f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期时间now(),不用处理update_time字段"
  604. ]
  605. prompt_parts.extend(tech_requirements)
  606. # 组合完整的提示语
  607. full_prompt = "\n".join(prompt_parts)
  608. logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
  609. logger.info(f"完整提示语内容: {full_prompt}")
  610. # 5. 调用LLM生成SQL脚本
  611. logger.info("开始调用Deepseek模型生成SQL脚本")
  612. script_content = llm_sql(full_prompt)
  613. if not script_content:
  614. raise ValueError("Deepseek模型返回空内容")
  615. # 确保返回的是文本格式
  616. if not isinstance(script_content, str):
  617. script_content = str(script_content)
  618. logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
  619. return script_content
  620. except Exception as e:
  621. logger.error(f"生成SQL脚本失败: {str(e)}")
  622. raise e
  623. @staticmethod
  624. def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
  625. """
  626. 解析表格式(A:B)并从Neo4j查询元数据生成DDL
  627. Args:
  628. table_str: 表格式字符串,格式为"label:name_en"
  629. table_type: 表类型,用于日志记录(input/output)
  630. Returns:
  631. DDL格式的表结构字符串
  632. """
  633. try:
  634. # 解析A:B格式
  635. if ':' not in table_str:
  636. logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
  637. return ""
  638. parts = table_str.split(':', 1)
  639. if len(parts) != 2:
  640. logger.error(f"表格式解析失败: {table_str}")
  641. return ""
  642. label = parts[0].strip()
  643. name_en = parts[1].strip()
  644. if not label or not name_en:
  645. logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
  646. return ""
  647. logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
  648. # 从Neo4j查询节点及其关联的元数据
  649. with connect_graph().session() as session:
  650. # 查询节点及其关联的元数据
  651. cypher = f"""
  652. MATCH (n:{label} {{name_en: $name_en}})
  653. OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
  654. RETURN n, collect(m) as metadata
  655. """
  656. result = session.run(cypher, name_en=name_en)
  657. record = result.single()
  658. if not record:
  659. logger.error(f"未找到节点: label={label}, name_en={name_en}")
  660. return ""
  661. node = record['n']
  662. metadata = record['metadata']
  663. logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
  664. # 生成DDL格式的表结构
  665. ddl_lines = []
  666. ddl_lines.append(f"CREATE TABLE {name_en} (")
  667. if metadata:
  668. column_definitions = []
  669. for meta in metadata:
  670. if meta: # 确保meta不为空
  671. meta_props = dict(meta)
  672. column_name = meta_props.get('name_en', meta_props.get('name_zh', 'unknown_column'))
  673. data_type = meta_props.get('data_type', 'VARCHAR(255)')
  674. comment = meta_props.get('name_zh', '')
  675. # 构建列定义
  676. column_def = f" {column_name} {data_type}"
  677. if comment:
  678. column_def += f" COMMENT '{comment}'"
  679. column_definitions.append(column_def)
  680. if column_definitions:
  681. ddl_lines.append(",\n".join(column_definitions))
  682. else:
  683. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  684. else:
  685. # 如果没有元数据,添加默认列
  686. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  687. ddl_lines.append(");")
  688. # 添加表注释
  689. node_props = dict(node)
  690. table_comment = node_props.get('name_zh', node_props.get('describe', name_en))
  691. if table_comment and table_comment != name_en:
  692. ddl_lines.append(f"COMMENT ON TABLE {name_en} IS '{table_comment}';")
  693. ddl_content = "\n".join(ddl_lines)
  694. logger.info(f"{table_type}表DDL生成成功: {name_en}")
  695. logger.debug(f"生成的DDL: {ddl_content}")
  696. return ddl_content
  697. except Exception as e:
  698. logger.error(f"解析表格式和生成DDL失败: {str(e)}")
  699. return ""
  700. @staticmethod
  701. def _handle_script_relationships(data: Dict[str, Any],dataflow_name:str,name_en:str):
  702. """
  703. 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的DERIVED_FROM关系
  704. Args:
  705. data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode
  706. """
  707. try:
  708. # 从data中读取键值对
  709. script_name = dataflow_name,
  710. script_type = data.get('script_type', 'sql')
  711. schedule_status = data.get('status', 'inactive')
  712. source_table_full = data.get('source_table', '')
  713. target_table_full = data.get('target_table', '')
  714. update_mode = data.get('update_mode', 'full')
  715. # 处理source_table和target_table的格式
  716. source_table = source_table_full.split(':')[-1] if ':' in source_table_full else source_table_full
  717. target_table = target_table_full.split(':')[-1] if ':' in target_table_full else target_table_full
  718. source_label = source_table_full.split(':')[0] if ':' in source_table_full else source_table_full
  719. target_label = target_table_full.split(':')[0] if ':' in target_table_full else target_table_full
  720. # 验证必要字段
  721. if not source_table or not target_table:
  722. logger.warning(f"source_table或target_table为空,跳过关系创建: source_table={source_table}, target_table={target_table}")
  723. return
  724. logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
  725. with connect_graph().session() as session:
  726. # 创建或获取source和target节点
  727. create_nodes_query = f"""
  728. MERGE (source:{source_label} {{name: $source_table}})
  729. ON CREATE SET source.created_at = $created_at,
  730. source.type = 'source'
  731. WITH source
  732. MERGE (target:{target_label} {{name: $target_table}})
  733. ON CREATE SET target.created_at = $created_at,
  734. target.type = 'target'
  735. RETURN source, target, id(source) as source_id, id(target) as target_id
  736. """
  737. # 执行创建节点的查询
  738. result = session.run(create_nodes_query,
  739. source_table=source_table,
  740. target_table=target_table,
  741. created_at=get_formatted_time()).single()
  742. if result:
  743. source_id = result['source_id']
  744. target_id = result['target_id']
  745. # 检查并创建关系
  746. create_relationship_query = f"""
  747. MATCH (source:{source_label}), (target:{target_label})
  748. WHERE id(source) = $source_id AND id(target) = $target_id
  749. AND NOT EXISTS((target)-[:DERIVED_FROM]->(source))
  750. CREATE (target)-[r:DERIVED_FROM]->(source)
  751. SET r.script_name = $script_name,
  752. r.script_type = $script_type,
  753. r.schedule_status = $schedule_status,
  754. r.update_mode = $update_mode,
  755. r.created_at = $created_at,
  756. r.updated_at = $created_at
  757. RETURN r
  758. """
  759. relationship_result = session.run(create_relationship_query,
  760. source_id=source_id,
  761. target_id=target_id,
  762. script_name=script_name,
  763. script_type=script_type,
  764. schedule_status=schedule_status,
  765. update_mode=update_mode,
  766. created_at=get_formatted_time()).single()
  767. if relationship_result:
  768. logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})")
  769. else:
  770. logger.info(f"DERIVED_FROM关系已存在: {target_table} -> {source_table}")
  771. else:
  772. logger.error(f"创建表节点失败: source_table={source_table}, target_table={target_table}")
  773. except Exception as e:
  774. logger.error(f"处理脚本关系失败: {str(e)}")
  775. raise e