dataflows.py 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073
  1. import logging
  2. from typing import Dict, List, Optional, Any, Union
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_client, llm_sql
  6. from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
  7. from app.core.meta_data import translate_and_parse, get_formatted_time
  8. from py2neo import Relationship
  9. from app import db
  10. from sqlalchemy import text
  11. logger = logging.getLogger(__name__)
  12. class DataFlowService:
  13. """数据流服务类,处理数据流相关的业务逻辑"""
  14. @staticmethod
  15. def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
  16. """
  17. 获取数据流列表
  18. Args:
  19. page: 页码
  20. page_size: 每页大小
  21. search: 搜索关键词
  22. Returns:
  23. 包含数据流列表和分页信息的字典
  24. """
  25. try:
  26. # 从图数据库查询数据流列表
  27. skip_count = (page - 1) * page_size
  28. # 构建搜索条件
  29. where_clause = ""
  30. params = {'skip': skip_count, 'limit': page_size}
  31. if search:
  32. where_clause = "WHERE n.name_zh CONTAINS $search OR n.description CONTAINS $search"
  33. params['search'] = search
  34. # 查询数据流列表
  35. query = f"""
  36. MATCH (n:DataFlow)
  37. {where_clause}
  38. RETURN n, id(n) as node_id
  39. ORDER BY n.created_at DESC
  40. SKIP $skip
  41. LIMIT $limit
  42. """
  43. # 获取Neo4j驱动(如果连接失败会抛出ConnectionError异常)
  44. try:
  45. with connect_graph().session() as session:
  46. list_result = session.run(query, **params).data()
  47. # 查询总数
  48. count_query = f"""
  49. MATCH (n:DataFlow)
  50. {where_clause}
  51. RETURN count(n) as total
  52. """
  53. count_params = {'search': search} if search else {}
  54. count_result = session.run(count_query, **count_params).single()
  55. total = count_result['total'] if count_result else 0
  56. except Exception as e:
  57. # 确保 driver 被正确关闭,避免资源泄漏 - 这里不再需要手动关闭driver,因为connect_graph返回的可能是单例或新实例,
  58. # 但如果是新实例,我们没有引用它去关闭。如果connect_graph设计为每次返回新实例且需要关闭,
  59. # 那么之前的代码是对的。如果connect_graph返回单例,则不应关闭。
  60. # 根据用户反馈:The driver.close() call prematurely closes a shared driver instance.
  61. # 所以我们直接使用 session,并不关闭 driver。
  62. logger.error(f"查询数据流失败: {str(e)}")
  63. raise e
  64. # 格式化结果
  65. dataflows = []
  66. for record in list_result:
  67. node = record['n']
  68. dataflow = dict(node)
  69. dataflow['id'] = record['node_id'] # 使用查询返回的node_id
  70. dataflows.append(dataflow)
  71. return {
  72. 'list': dataflows,
  73. 'pagination': {
  74. 'page': page,
  75. 'page_size': page_size,
  76. 'total': total,
  77. 'total_pages': (total + page_size - 1) // page_size
  78. }
  79. }
  80. except Exception as e:
  81. logger.error(f"获取数据流列表失败: {str(e)}")
  82. raise e
  83. @staticmethod
  84. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  85. """
  86. 根据ID获取数据流详情
  87. Args:
  88. dataflow_id: 数据流ID
  89. Returns:
  90. 数据流详情字典,如果不存在则返回None
  91. """
  92. try:
  93. # 从Neo4j获取基本信息
  94. neo4j_query = """
  95. MATCH (n:DataFlow)
  96. WHERE id(n) = $dataflow_id
  97. OPTIONAL MATCH (n)-[:LABEL]-(la:DataLabel)
  98. RETURN n, id(n) as node_id,
  99. collect(DISTINCT {id: id(la), name: la.name}) as tags
  100. """
  101. with connect_graph().session() as session:
  102. neo4j_result = session.run(neo4j_query, dataflow_id=dataflow_id).data()
  103. if not neo4j_result:
  104. return None
  105. record = neo4j_result[0]
  106. node = record['n']
  107. dataflow = dict(node)
  108. dataflow['id'] = record['node_id']
  109. dataflow['tags'] = record['tags']
  110. # 从PostgreSQL获取额外信息
  111. pg_query = """
  112. SELECT
  113. source_table,
  114. target_table,
  115. script_name,
  116. script_type,
  117. script_requirement,
  118. script_content,
  119. user_name,
  120. create_time,
  121. update_time,
  122. target_dt_column
  123. FROM dags.data_transform_scripts
  124. WHERE script_name = :script_name
  125. """
  126. with db.engine.connect() as conn:
  127. pg_result = conn.execute(text(pg_query), {"script_name": dataflow.get('name_zh')}).fetchone()
  128. if pg_result:
  129. # 将PostgreSQL数据添加到结果中
  130. dataflow.update({
  131. 'source_table': pg_result.source_table,
  132. 'target_table': pg_result.target_table,
  133. 'script_type': pg_result.script_type,
  134. 'script_requirement': pg_result.script_requirement,
  135. 'script_content': pg_result.script_content,
  136. 'created_by': pg_result.user_name,
  137. 'pg_created_at': pg_result.create_time,
  138. 'pg_updated_at': pg_result.update_time,
  139. 'target_dt_column': pg_result.target_dt_column
  140. })
  141. return dataflow
  142. except Exception as e:
  143. logger.error(f"获取数据流详情失败: {str(e)}")
  144. raise e
  145. @staticmethod
  146. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  147. """
  148. 创建新的数据流
  149. Args:
  150. data: 数据流配置数据
  151. Returns:
  152. 创建的数据流信息
  153. """
  154. try:
  155. # 验证必填字段
  156. required_fields = ['name_zh', 'describe']
  157. for field in required_fields:
  158. if field not in data:
  159. raise ValueError(f"缺少必填字段: {field}")
  160. dataflow_name = data['name_zh']
  161. # 使用LLM翻译名称生成英文名
  162. try:
  163. result_list = translate_and_parse(dataflow_name)
  164. name_en = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
  165. except Exception as e:
  166. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  167. name_en = dataflow_name.lower().replace(' ', '_')
  168. # 准备节点数据
  169. node_data = {
  170. 'name_zh': dataflow_name,
  171. 'name_en': name_en,
  172. 'category': data.get('category', ''),
  173. 'organization': data.get('organization', ''),
  174. 'leader': data.get('leader', ''),
  175. 'frequency': data.get('frequency', ''),
  176. 'tag': data.get('tag', ''),
  177. 'describe': data.get('describe', ''),
  178. 'status': data.get('status', 'inactive'),
  179. 'update_mode': data.get('update_mode', 'append'),
  180. 'created_at': get_formatted_time(),
  181. 'updated_at': get_formatted_time()
  182. }
  183. # 创建或获取数据流节点
  184. dataflow_id = get_node('DataFlow', name=dataflow_name)
  185. if dataflow_id:
  186. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  187. dataflow_id = create_or_get_node('DataFlow', **node_data)
  188. # 处理标签关系
  189. tag_id = data.get('tag')
  190. if tag_id is not None:
  191. try:
  192. DataFlowService._handle_tag_relationship(dataflow_id, tag_id)
  193. except Exception as e:
  194. logger.warning(f"处理标签关系时出错: {str(e)}")
  195. # 成功创建图数据库节点后,写入PG数据库
  196. try:
  197. DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
  198. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  199. # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
  200. try:
  201. DataFlowService._handle_script_relationships(data,dataflow_name,name_en)
  202. logger.info(f"脚本关系创建成功: {dataflow_name}")
  203. except Exception as script_error:
  204. logger.warning(f"创建脚本关系失败: {str(script_error)}")
  205. except Exception as pg_error:
  206. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  207. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  208. # 在实际应用中,可能需要考虑分布式事务
  209. # 返回创建的数据流信息
  210. # 查询创建的节点获取完整信息
  211. query = "MATCH (n:DataFlow {name_zh: $name_zh}) RETURN n, id(n) as node_id"
  212. with connect_graph().session() as session:
  213. id_result = session.run(query, name_zh=dataflow_name).single()
  214. if id_result:
  215. dataflow_node = id_result['n']
  216. node_id = id_result['node_id']
  217. # 将节点属性转换为字典
  218. result = dict(dataflow_node)
  219. result['id'] = node_id
  220. else:
  221. # 如果查询失败,返回基本信息
  222. result = {
  223. 'id': dataflow_id if isinstance(dataflow_id, int) else None,
  224. 'name_zh': dataflow_name,
  225. 'name_en': name_en,
  226. 'created_at': get_formatted_time()
  227. }
  228. logger.info(f"创建数据流成功: {dataflow_name}")
  229. return result
  230. except Exception as e:
  231. logger.error(f"创建数据流失败: {str(e)}")
  232. raise e
  233. @staticmethod
  234. def _save_to_pg_database(data: Dict[str, Any], script_name: str, name_en: str):
  235. """
  236. 将脚本信息保存到PG数据库
  237. Args:
  238. data: 包含脚本信息的数据
  239. script_name: 脚本名称
  240. name_en: 英文名称
  241. """
  242. try:
  243. # 提取脚本相关信息
  244. script_requirement = data.get('script_requirement', '')
  245. script_content = data.get('script_content', '')
  246. source_table = data.get('source_table', '').split(':')[-1] if ':' in data.get('source_table', '') else data.get('source_table', '')
  247. target_table = data.get('target_table', '').split(':')[-1] if ':' in data.get('target_table', '') else data.get('target_table', name_en) # 如果没有指定目标表,使用英文名
  248. script_type = data.get('script_type', 'python')
  249. user_name = data.get('created_by', 'system')
  250. target_dt_column = data.get('target_dt_column', '')
  251. # 验证必需字段
  252. if not target_table:
  253. target_table = name_en
  254. if not script_name:
  255. raise ValueError("script_name不能为空")
  256. # 构建插入SQL
  257. insert_sql = text("""
  258. INSERT INTO dags.data_transform_scripts
  259. (source_table, target_table, script_name, script_type, script_requirement,
  260. script_content, user_name, create_time, update_time, target_dt_column)
  261. VALUES
  262. (:source_table, :target_table, :script_name, :script_type, :script_requirement,
  263. :script_content, :user_name, :create_time, :update_time, :target_dt_column)
  264. ON CONFLICT (target_table, script_name)
  265. DO UPDATE SET
  266. source_table = EXCLUDED.source_table,
  267. script_type = EXCLUDED.script_type,
  268. script_requirement = EXCLUDED.script_requirement,
  269. script_content = EXCLUDED.script_content,
  270. user_name = EXCLUDED.user_name,
  271. update_time = EXCLUDED.update_time,
  272. target_dt_column = EXCLUDED.target_dt_column
  273. """)
  274. # 准备参数
  275. current_time = datetime.now()
  276. params = {
  277. 'source_table': source_table,
  278. 'target_table': target_table,
  279. 'script_name': script_name,
  280. 'script_type': script_type,
  281. 'script_requirement': script_requirement,
  282. 'script_content': script_content,
  283. 'user_name': user_name,
  284. 'create_time': current_time,
  285. 'update_time': current_time,
  286. 'target_dt_column': target_dt_column
  287. }
  288. # 执行插入操作
  289. db.session.execute(insert_sql, params)
  290. # 新增:保存到task_list表
  291. try:
  292. # 1. 解析script_requirement并构建详细的任务描述
  293. task_description_md = script_requirement
  294. try:
  295. # 尝试解析JSON
  296. import json
  297. try:
  298. req_json = json.loads(script_requirement)
  299. except (json.JSONDecodeError, TypeError):
  300. req_json = None
  301. if isinstance(req_json, dict):
  302. # 提取字段
  303. business_domains = []
  304. bd_str = req_json.get('business_domain', '')
  305. if bd_str:
  306. business_domains = [d.strip() for d in bd_str.split(',') if d.strip()]
  307. data_source = req_json.get('data_source', '')
  308. request_content_str = req_json.get('request_content', '')
  309. # 生成Business Domain DDLs
  310. domain_ddls = []
  311. if business_domains:
  312. try:
  313. with connect_graph().session() as session:
  314. for domain in business_domains:
  315. # 查询BusinessDomain节点及元数据
  316. # 尝试匹配name, name_zh, name_en
  317. cypher = """
  318. MATCH (n:BusinessDomain)
  319. WHERE n.name = $name OR n.name_zh = $name OR n.name_en = $name
  320. OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
  321. RETURN n, collect(m) as metadata
  322. """
  323. result = session.run(cypher, name=domain).single()
  324. if result:
  325. node = result['n']
  326. metadata = result['metadata']
  327. # 生成DDL
  328. node_props = dict(node)
  329. # 优先使用英文名作为表名,如果没有则使用拼音或原始名称
  330. table_name = node_props.get('name_en', domain)
  331. ddl_lines = []
  332. ddl_lines.append(f"CREATE TABLE {table_name} (")
  333. if metadata:
  334. column_definitions = []
  335. for meta in metadata:
  336. if meta:
  337. meta_props = dict(meta)
  338. column_name = meta_props.get('name_en', meta_props.get('name_zh', 'unknown_column'))
  339. data_type = meta_props.get('data_type', 'VARCHAR(255)')
  340. comment = meta_props.get('name_zh', '')
  341. column_def = f" {column_name} {data_type}"
  342. if comment:
  343. column_def += f" COMMENT '{comment}'"
  344. column_definitions.append(column_def)
  345. if column_definitions:
  346. ddl_lines.append(",\n".join(column_definitions))
  347. else:
  348. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  349. else:
  350. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  351. ddl_lines.append(");")
  352. table_comment = node_props.get('name_zh', node_props.get('describe', table_name))
  353. if table_comment and table_comment != table_name:
  354. ddl_lines.append(f"COMMENT ON TABLE {table_name} IS '{table_comment}';")
  355. domain_ddls.append("\n".join(ddl_lines))
  356. except Exception as neo_e:
  357. logger.error(f"获取BusinessDomain DDL失败: {str(neo_e)}")
  358. # 构建Markdown格式
  359. task_desc_parts = [f"# Task: {script_name}\n"]
  360. if data_source:
  361. task_desc_parts.append(f"## Data Source\n{data_source}\n")
  362. if domain_ddls:
  363. task_desc_parts.append("## Business Domain Structures (DDL)")
  364. for ddl in domain_ddls:
  365. task_desc_parts.append(f"```sql\n{ddl}\n```\n")
  366. task_desc_parts.append(f"## Request Content\n{request_content_str}\n")
  367. task_desc_parts.append("## Implementation Steps")
  368. task_desc_parts.append("1. Generate a Python program to implement the logic.")
  369. task_desc_parts.append("2. Generate an n8n workflow to schedule and execute the Python program.")
  370. task_description_md = "\n".join(task_desc_parts)
  371. except Exception as parse_e:
  372. logger.warning(f"解析任务描述详情失败,使用原始描述: {str(parse_e)}")
  373. task_description_md = script_requirement
  374. # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
  375. code_path = 'app/core/data_flow'
  376. task_insert_sql = text("""
  377. INSERT INTO public.task_list
  378. (task_name, task_description, status, code_name, code_path, create_by, create_time, update_time)
  379. VALUES
  380. (:task_name, :task_description, :status, :code_name, :code_path, :create_by, :create_time, :update_time)
  381. """)
  382. task_params = {
  383. 'task_name': script_name,
  384. 'task_description': task_description_md,
  385. 'status': 'pending',
  386. 'code_name': script_name,
  387. 'code_path': code_path,
  388. 'create_by': 'cursor',
  389. 'create_time': current_time,
  390. 'update_time': current_time
  391. }
  392. # 使用嵌套事务,确保task_list插入失败不影响主流程
  393. with db.session.begin_nested():
  394. db.session.execute(task_insert_sql, task_params)
  395. logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
  396. except Exception as task_error:
  397. # 记录错误但不中断主流程
  398. logger.error(f"写入task_list表失败: {str(task_error)}")
  399. # 如果要求必须成功写入任务列表,则这里应该raise task_error
  400. # raise task_error
  401. db.session.commit()
  402. logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")
  403. except Exception as e:
  404. db.session.rollback()
  405. logger.error(f"写入PG数据库失败: {str(e)}")
  406. raise e
  407. @staticmethod
  408. def _handle_children_relationships(dataflow_node, children_ids):
  409. """处理子节点关系"""
  410. logger.debug(f"处理子节点关系,原始children_ids: {children_ids}, 类型: {type(children_ids)}")
  411. # 确保children_ids是列表格式
  412. if not isinstance(children_ids, (list, tuple)):
  413. if children_ids is not None:
  414. children_ids = [children_ids] # 如果是单个值,转换为列表
  415. logger.debug(f"将单个值转换为列表: {children_ids}")
  416. else:
  417. children_ids = [] # 如果是None,转换为空列表
  418. logger.debug("将None转换为空列表")
  419. for child_id in children_ids:
  420. try:
  421. # 查找子节点
  422. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  423. with connect_graph().session() as session:
  424. result = session.run(query, child_id=child_id).data()
  425. if result:
  426. child_node = result[0]['n']
  427. # 获取dataflow_node的ID
  428. dataflow_id = getattr(dataflow_node, 'identity', None)
  429. if dataflow_id is None:
  430. # 如果没有identity属性,从名称查询ID
  431. query_id = "MATCH (n:DataFlow) WHERE n.name_zh = $name_zh RETURN id(n) as node_id"
  432. id_result = session.run(query_id, name_zh=dataflow_node.get('name_zh')).single()
  433. dataflow_id = id_result['node_id'] if id_result else None
  434. # 创建关系 - 使用ID调用relationship_exists
  435. if dataflow_id and not relationship_exists(dataflow_id, 'child', child_id):
  436. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $child_id CREATE (a)-[:child]->(b)",
  437. dataflow_id=dataflow_id, child_id=child_id)
  438. logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
  439. except Exception as e:
  440. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  441. @staticmethod
  442. def _handle_tag_relationship(dataflow_id, tag_id):
  443. """处理标签关系"""
  444. try:
  445. # 查找标签节点
  446. query = "MATCH (n:DataLabel) WHERE id(n) = $tag_id RETURN n"
  447. with connect_graph().session() as session:
  448. result = session.run(query, tag_id=tag_id).data()
  449. if result:
  450. tag_node = result[0]['n']
  451. # 创建关系 - 使用ID调用relationship_exists
  452. if dataflow_id and not relationship_exists(dataflow_id, 'LABEL', tag_id):
  453. session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $tag_id CREATE (a)-[:LABEL]->(b)",
  454. dataflow_id=dataflow_id, tag_id=tag_id)
  455. logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
  456. except Exception as e:
  457. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  458. @staticmethod
  459. def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
  460. """
  461. 更新数据流
  462. Args:
  463. dataflow_id: 数据流ID
  464. data: 更新的数据
  465. Returns:
  466. 更新后的数据流信息,如果不存在则返回None
  467. """
  468. try:
  469. # 查找节点
  470. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  471. with connect_graph().session() as session:
  472. result = session.run(query, dataflow_id=dataflow_id).data()
  473. if not result:
  474. return None
  475. # 更新节点属性
  476. update_fields = []
  477. params = {'dataflow_id': dataflow_id}
  478. for key, value in data.items():
  479. if key not in ['id', 'created_at']: # 保护字段
  480. if key == 'config' and isinstance(value, dict):
  481. value = json.dumps(value, ensure_ascii=False)
  482. update_fields.append(f"n.{key} = ${key}")
  483. params[key] = value
  484. if update_fields:
  485. params['updated_at'] = get_formatted_time()
  486. update_fields.append("n.updated_at = $updated_at")
  487. update_query = f"""
  488. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  489. SET {', '.join(update_fields)}
  490. RETURN n, id(n) as node_id
  491. """
  492. result = session.run(update_query, **params).data()
  493. if result:
  494. node = result[0]['n']
  495. updated_dataflow = dict(node)
  496. updated_dataflow['id'] = result[0]['node_id'] # 使用查询返回的node_id
  497. logger.info(f"更新数据流成功: ID={dataflow_id}")
  498. return updated_dataflow
  499. return None
  500. except Exception as e:
  501. logger.error(f"更新数据流失败: {str(e)}")
  502. raise e
  503. @staticmethod
  504. def delete_dataflow(dataflow_id: int) -> bool:
  505. """
  506. 删除数据流
  507. Args:
  508. dataflow_id: 数据流ID
  509. Returns:
  510. 删除是否成功
  511. """
  512. try:
  513. # 删除节点及其关系
  514. query = """
  515. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  516. DETACH DELETE n
  517. RETURN count(n) as deleted_count
  518. """
  519. with connect_graph().session() as session:
  520. delete_result = session.run(query, dataflow_id=dataflow_id).single()
  521. result = delete_result['deleted_count'] if delete_result else 0
  522. if result and result > 0:
  523. logger.info(f"删除数据流成功: ID={dataflow_id}")
  524. return True
  525. return False
  526. except Exception as e:
  527. logger.error(f"删除数据流失败: {str(e)}")
  528. raise e
  529. @staticmethod
  530. def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
  531. """
  532. 执行数据流
  533. Args:
  534. dataflow_id: 数据流ID
  535. params: 执行参数
  536. Returns:
  537. 执行结果信息
  538. """
  539. try:
  540. # 检查数据流是否存在
  541. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  542. with connect_graph().session() as session:
  543. result = session.run(query, dataflow_id=dataflow_id).data()
  544. if not result:
  545. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  546. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  547. # TODO: 这里应该实际执行数据流
  548. # 目前返回模拟结果
  549. result = {
  550. 'execution_id': execution_id,
  551. 'dataflow_id': dataflow_id,
  552. 'status': 'running',
  553. 'started_at': datetime.now().isoformat(),
  554. 'params': params or {},
  555. 'progress': 0
  556. }
  557. logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
  558. return result
  559. except Exception as e:
  560. logger.error(f"执行数据流失败: {str(e)}")
  561. raise e
  562. @staticmethod
  563. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  564. """
  565. 获取数据流执行状态
  566. Args:
  567. dataflow_id: 数据流ID
  568. Returns:
  569. 执行状态信息
  570. """
  571. try:
  572. # TODO: 这里应该查询实际的执行状态
  573. # 目前返回模拟状态
  574. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  575. with connect_graph().session() as session:
  576. result = session.run(query, dataflow_id=dataflow_id).data()
  577. if not result:
  578. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  579. status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
  580. return {
  581. 'dataflow_id': dataflow_id,
  582. 'status': status,
  583. 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
  584. 'started_at': datetime.now().isoformat(),
  585. 'completed_at': datetime.now().isoformat() if status == 'completed' else None,
  586. 'error_message': '执行过程中发生错误' if status == 'failed' else None
  587. }
  588. except Exception as e:
  589. logger.error(f"获取数据流状态失败: {str(e)}")
  590. raise e
  591. @staticmethod
  592. def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
  593. """
  594. 获取数据流执行日志
  595. Args:
  596. dataflow_id: 数据流ID
  597. page: 页码
  598. page_size: 每页大小
  599. Returns:
  600. 执行日志列表和分页信息
  601. """
  602. try:
  603. # TODO: 这里应该查询实际的执行日志
  604. # 目前返回模拟日志
  605. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  606. with connect_graph().session() as session:
  607. result = session.run(query, dataflow_id=dataflow_id).data()
  608. if not result:
  609. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  610. mock_logs = [
  611. {
  612. 'id': i,
  613. 'timestamp': datetime.now().isoformat(),
  614. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  615. 'message': f'数据流执行日志消息 {i}',
  616. 'component': ['source', 'transform', 'target'][i % 3]
  617. }
  618. for i in range(1, 101)
  619. ]
  620. # 分页处理
  621. total = len(mock_logs)
  622. start = (page - 1) * page_size
  623. end = start + page_size
  624. logs = mock_logs[start:end]
  625. return {
  626. 'logs': logs,
  627. 'pagination': {
  628. 'page': page,
  629. 'page_size': page_size,
  630. 'total': total,
  631. 'total_pages': (total + page_size - 1) // page_size
  632. }
  633. }
  634. except Exception as e:
  635. logger.error(f"获取数据流日志失败: {str(e)}")
  636. raise e
  637. @staticmethod
  638. def create_script(request_data: Union[Dict[str, Any], str]) -> str:
  639. """
  640. 使用Deepseek模型生成SQL脚本
  641. Args:
  642. request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
  643. Returns:
  644. 生成的SQL脚本内容
  645. """
  646. try:
  647. logger.info(f"开始处理脚本生成请求: {request_data}")
  648. logger.info(f"request_data类型: {type(request_data)}")
  649. # 类型检查和处理
  650. if isinstance(request_data, str):
  651. logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
  652. try:
  653. import json
  654. request_data = json.loads(request_data)
  655. except json.JSONDecodeError as e:
  656. raise ValueError(f"无法解析request_data为JSON: {str(e)}")
  657. if not isinstance(request_data, dict):
  658. raise ValueError(f"request_data必须是字典类型,实际类型: {type(request_data)}")
  659. # 1. 从传入的request_data中解析input, output, request_content内容
  660. input_data = request_data.get('input', '')
  661. output_data = request_data.get('output', '')
  662. request_content = request_data.get('request_data', '')
  663. # 如果request_content是HTML格式,提取纯文本
  664. if request_content and (request_content.startswith('<p>') or '<' in request_content):
  665. # 简单的HTML标签清理
  666. import re
  667. request_content = re.sub(r'<[^>]+>', '', request_content).strip()
  668. if not input_data or not output_data or not request_content:
  669. raise ValueError(f"缺少必要参数:input='{input_data}', output='{output_data}', request_content='{request_content[:100] if request_content else ''}' 不能为空")
  670. logger.info(f"解析得到 - input: {input_data}, output: {output_data}, request_content: {request_content}")
  671. # 2. 解析input中的多个数据表并生成源表DDL
  672. source_tables_ddl = []
  673. input_tables = []
  674. if input_data:
  675. tables = [table.strip() for table in input_data.split(',') if table.strip()]
  676. for table in tables:
  677. ddl = DataFlowService._parse_table_and_get_ddl(table, 'input')
  678. if ddl:
  679. input_tables.append(table)
  680. source_tables_ddl.append(ddl)
  681. else:
  682. logger.warning(f"无法获取输入表 {table} 的DDL结构")
  683. # 3. 解析output中的数据表并生成目标表DDL
  684. target_table_ddl = ""
  685. if output_data:
  686. target_table_ddl = DataFlowService._parse_table_and_get_ddl(output_data.strip(), 'output')
  687. if not target_table_ddl:
  688. logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
  689. # 4. 按照Deepseek-prompt.txt的框架构建提示语
  690. prompt_parts = []
  691. # 开场白 - 角色定义
  692. prompt_parts.append("你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。请为以下需求生成一段标准的 PostgreSQL SQL 脚本:")
  693. # 动态生成源表部分(第1点)
  694. for i, (table, ddl) in enumerate(zip(input_tables, source_tables_ddl), 1):
  695. table_name = table.split(':')[-1] if ':' in table else table
  696. prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
  697. prompt_parts.append(ddl)
  698. prompt_parts.append("") # 添加空行分隔
  699. # 动态生成目标表部分(第2点)
  700. if target_table_ddl:
  701. target_table_name = output_data.split(':')[-1] if ':' in output_data else output_data
  702. next_index = len(input_tables) + 1
  703. prompt_parts.append(f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:")
  704. prompt_parts.append(target_table_ddl)
  705. prompt_parts.append("") # 添加空行分隔
  706. # 动态生成处理逻辑部分(第3点)
  707. next_index = len(input_tables) + 2 if target_table_ddl else len(input_tables) + 1
  708. prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
  709. prompt_parts.append("") # 添加空行分隔
  710. # 固定的技术要求部分(第4-8点)
  711. tech_requirements = [
  712. f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,适合在 Airflow、Python 脚本、或调度系统中调用;",
  713. f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
  714. f"{next_index + 3}.请直接输出SQL,无需进行解释。",
  715. f"{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用\"_\"分隔,采用蛇形命名法。把sql的名字作为注释写在返回的sql中。",
  716. f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期时间now(),不用处理update_time字段"
  717. ]
  718. prompt_parts.extend(tech_requirements)
  719. # 组合完整的提示语
  720. full_prompt = "\n".join(prompt_parts)
  721. logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
  722. logger.info(f"完整提示语内容: {full_prompt}")
  723. # 5. 调用LLM生成SQL脚本
  724. logger.info("开始调用Deepseek模型生成SQL脚本")
  725. script_content = llm_sql(full_prompt)
  726. if not script_content:
  727. raise ValueError("Deepseek模型返回空内容")
  728. # 确保返回的是文本格式
  729. if not isinstance(script_content, str):
  730. script_content = str(script_content)
  731. logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
  732. return script_content
  733. except Exception as e:
  734. logger.error(f"生成SQL脚本失败: {str(e)}")
  735. raise e
  736. @staticmethod
  737. def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
  738. """
  739. 解析表格式(A:B)并从Neo4j查询元数据生成DDL
  740. Args:
  741. table_str: 表格式字符串,格式为"label:name_en"
  742. table_type: 表类型,用于日志记录(input/output)
  743. Returns:
  744. DDL格式的表结构字符串
  745. """
  746. try:
  747. # 解析A:B格式
  748. if ':' not in table_str:
  749. logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
  750. return ""
  751. parts = table_str.split(':', 1)
  752. if len(parts) != 2:
  753. logger.error(f"表格式解析失败: {table_str}")
  754. return ""
  755. label = parts[0].strip()
  756. name_en = parts[1].strip()
  757. if not label or not name_en:
  758. logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
  759. return ""
  760. logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
  761. # 从Neo4j查询节点及其关联的元数据
  762. with connect_graph().session() as session:
  763. # 查询节点及其关联的元数据
  764. cypher = f"""
  765. MATCH (n:{label} {{name_en: $name_en}})
  766. OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
  767. RETURN n, collect(m) as metadata
  768. """
  769. result = session.run(cypher, name_en=name_en)
  770. record = result.single()
  771. if not record:
  772. logger.error(f"未找到节点: label={label}, name_en={name_en}")
  773. return ""
  774. node = record['n']
  775. metadata = record['metadata']
  776. logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
  777. # 生成DDL格式的表结构
  778. ddl_lines = []
  779. ddl_lines.append(f"CREATE TABLE {name_en} (")
  780. if metadata:
  781. column_definitions = []
  782. for meta in metadata:
  783. if meta: # 确保meta不为空
  784. meta_props = dict(meta)
  785. column_name = meta_props.get('name_en', meta_props.get('name_zh', 'unknown_column'))
  786. data_type = meta_props.get('data_type', 'VARCHAR(255)')
  787. comment = meta_props.get('name_zh', '')
  788. # 构建列定义
  789. column_def = f" {column_name} {data_type}"
  790. if comment:
  791. column_def += f" COMMENT '{comment}'"
  792. column_definitions.append(column_def)
  793. if column_definitions:
  794. ddl_lines.append(",\n".join(column_definitions))
  795. else:
  796. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  797. else:
  798. # 如果没有元数据,添加默认列
  799. ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
  800. ddl_lines.append(");")
  801. # 添加表注释
  802. node_props = dict(node)
  803. table_comment = node_props.get('name_zh', node_props.get('describe', name_en))
  804. if table_comment and table_comment != name_en:
  805. ddl_lines.append(f"COMMENT ON TABLE {name_en} IS '{table_comment}';")
  806. ddl_content = "\n".join(ddl_lines)
  807. logger.info(f"{table_type}表DDL生成成功: {name_en}")
  808. logger.debug(f"生成的DDL: {ddl_content}")
  809. return ddl_content
  810. except Exception as e:
  811. logger.error(f"解析表格式和生成DDL失败: {str(e)}")
  812. return ""
  813. @staticmethod
  814. def _handle_script_relationships(data: Dict[str, Any],dataflow_name:str,name_en:str):
  815. """
  816. 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的DERIVED_FROM关系
  817. Args:
  818. data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode
  819. """
  820. try:
  821. # 从data中读取键值对
  822. script_name = dataflow_name,
  823. script_type = data.get('script_type', 'sql')
  824. schedule_status = data.get('status', 'inactive')
  825. source_table_full = data.get('source_table', '')
  826. target_table_full = data.get('target_table', '')
  827. update_mode = data.get('update_mode', 'full')
  828. # 处理source_table和target_table的格式
  829. source_table = source_table_full.split(':')[-1] if ':' in source_table_full else source_table_full
  830. target_table = target_table_full.split(':')[-1] if ':' in target_table_full else target_table_full
  831. source_label = source_table_full.split(':')[0] if ':' in source_table_full else source_table_full
  832. target_label = target_table_full.split(':')[0] if ':' in target_table_full else target_table_full
  833. # 验证必要字段
  834. if not source_table or not target_table:
  835. logger.warning(f"source_table或target_table为空,跳过关系创建: source_table={source_table}, target_table={target_table}")
  836. return
  837. logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
  838. with connect_graph().session() as session:
  839. # 创建或获取source和target节点
  840. create_nodes_query = f"""
  841. MERGE (source:{source_label} {{name: $source_table}})
  842. ON CREATE SET source.created_at = $created_at,
  843. source.type = 'source'
  844. WITH source
  845. MERGE (target:{target_label} {{name: $target_table}})
  846. ON CREATE SET target.created_at = $created_at,
  847. target.type = 'target'
  848. RETURN source, target, id(source) as source_id, id(target) as target_id
  849. """
  850. # 执行创建节点的查询
  851. result = session.run(create_nodes_query,
  852. source_table=source_table,
  853. target_table=target_table,
  854. created_at=get_formatted_time()).single()
  855. if result:
  856. source_id = result['source_id']
  857. target_id = result['target_id']
  858. # 检查并创建关系
  859. create_relationship_query = f"""
  860. MATCH (source:{source_label}), (target:{target_label})
  861. WHERE id(source) = $source_id AND id(target) = $target_id
  862. AND NOT EXISTS((target)-[:DERIVED_FROM]->(source))
  863. CREATE (target)-[r:DERIVED_FROM]->(source)
  864. SET r.script_name = $script_name,
  865. r.script_type = $script_type,
  866. r.schedule_status = $schedule_status,
  867. r.update_mode = $update_mode,
  868. r.created_at = $created_at,
  869. r.updated_at = $created_at
  870. RETURN r
  871. """
  872. relationship_result = session.run(create_relationship_query,
  873. source_id=source_id,
  874. target_id=target_id,
  875. script_name=script_name,
  876. script_type=script_type,
  877. schedule_status=schedule_status,
  878. update_mode=update_mode,
  879. created_at=get_formatted_time()).single()
  880. if relationship_result:
  881. logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})")
  882. else:
  883. logger.info(f"DERIVED_FROM关系已存在: {target_table} -> {source_table}")
  884. else:
  885. logger.error(f"创建表节点失败: source_table={source_table}, target_table={target_table}")
  886. except Exception as e:
  887. logger.error(f"处理脚本关系失败: {str(e)}")
  888. raise e