dataflows.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. import logging
  2. from typing import Dict, List, Optional, Any
  3. from datetime import datetime
  4. import json
  5. from app.core.llm.llm_service import llm_client
  6. from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
  7. from app.core.meta_data import translate_and_parse
  8. from app.core.common.functions import get_formatted_time
  9. from py2neo import Relationship
  10. from app import db
  11. from sqlalchemy import text
  12. logger = logging.getLogger(__name__)
  13. class DataFlowService:
  14. """数据流服务类,处理数据流相关的业务逻辑"""
  15. @staticmethod
  16. def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
  17. """
  18. 获取数据流列表
  19. Args:
  20. page: 页码
  21. page_size: 每页大小
  22. search: 搜索关键词
  23. Returns:
  24. 包含数据流列表和分页信息的字典
  25. """
  26. try:
  27. # 从图数据库查询数据流列表
  28. skip_count = (page - 1) * page_size
  29. # 构建搜索条件
  30. where_clause = ""
  31. params = {'skip': skip_count, 'limit': page_size}
  32. if search:
  33. where_clause = "WHERE n.name CONTAINS $search OR n.description CONTAINS $search"
  34. params['search'] = search
  35. # 查询数据流列表
  36. query = f"""
  37. MATCH (n:DataFlow)
  38. {where_clause}
  39. RETURN n
  40. ORDER BY n.created_at DESC
  41. SKIP $skip
  42. LIMIT $limit
  43. """
  44. list_result = connect_graph.run(query, **params).data()
  45. # 查询总数
  46. count_query = f"""
  47. MATCH (n:DataFlow)
  48. {where_clause}
  49. RETURN count(n) as total
  50. """
  51. count_params = {'search': search} if search else {}
  52. total = connect_graph.run(count_query, **count_params).evaluate() or 0
  53. # 格式化结果
  54. dataflows = []
  55. for record in list_result:
  56. node = record['n']
  57. dataflow = dict(node)
  58. dataflow['id'] = node.identity
  59. dataflows.append(dataflow)
  60. return {
  61. 'list': dataflows,
  62. 'pagination': {
  63. 'page': page,
  64. 'page_size': page_size,
  65. 'total': total,
  66. 'total_pages': (total + page_size - 1) // page_size
  67. }
  68. }
  69. except Exception as e:
  70. logger.error(f"获取数据流列表失败: {str(e)}")
  71. raise e
  72. @staticmethod
  73. def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
  74. """
  75. 根据ID获取数据流详情
  76. Args:
  77. dataflow_id: 数据流ID
  78. Returns:
  79. 数据流详情字典,如果不存在则返回None
  80. """
  81. try:
  82. query = """
  83. MATCH (n:DataFlow)
  84. WHERE id(n) = $dataflow_id
  85. OPTIONAL MATCH (n)-[:label]-(la:data_label)
  86. OPTIONAL MATCH (n)-[:child]-(child)
  87. OPTIONAL MATCH (parent)-[:child]-(n)
  88. RETURN n,
  89. collect(DISTINCT {id: id(la), name: la.name}) as tags,
  90. collect(DISTINCT {id: id(child), name: child.name}) as children,
  91. collect(DISTINCT {id: id(parent), name: parent.name}) as parents
  92. """
  93. result = connect_graph.run(query, dataflow_id=dataflow_id).data()
  94. if not result:
  95. return None
  96. record = result[0]
  97. node = record['n']
  98. dataflow = dict(node)
  99. dataflow['id'] = node.identity
  100. dataflow['tags'] = record['tags']
  101. dataflow['children'] = record['children']
  102. dataflow['parents'] = record['parents']
  103. return dataflow
  104. except Exception as e:
  105. logger.error(f"获取数据流详情失败: {str(e)}")
  106. raise e
  107. @staticmethod
  108. def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
  109. """
  110. 创建新的数据流
  111. Args:
  112. data: 数据流配置数据
  113. Returns:
  114. 创建的数据流信息
  115. """
  116. try:
  117. # 验证必填字段
  118. required_fields = ['name', 'description']
  119. for field in required_fields:
  120. if field not in data:
  121. raise ValueError(f"缺少必填字段: {field}")
  122. dataflow_name = data['name']
  123. # 使用LLM翻译名称生成英文名
  124. try:
  125. result_list = translate_and_parse(dataflow_name)
  126. en_name = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
  127. except Exception as e:
  128. logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
  129. en_name = dataflow_name.lower().replace(' ', '_')
  130. # 准备节点数据
  131. node_data = {
  132. 'name': dataflow_name,
  133. 'en_name': en_name,
  134. 'description': data.get('description', ''),
  135. 'status': data.get('status', 'inactive'),
  136. 'created_at': get_formatted_time(),
  137. 'updated_at': get_formatted_time(),
  138. 'created_by': data.get('created_by', 'system'),
  139. 'config': json.dumps(data.get('config', {}), ensure_ascii=False)
  140. }
  141. # 创建或获取数据流节点
  142. dataflow_node = get_node('DataFlow', name=dataflow_name)
  143. if dataflow_node:
  144. raise ValueError(f"数据流 '{dataflow_name}' 已存在")
  145. dataflow_node = create_or_get_node('DataFlow', **node_data)
  146. # 处理子节点关系
  147. if data.get('children_ids'):
  148. DataFlowService._handle_children_relationships(dataflow_node, data['children_ids'])
  149. # 处理标签关系
  150. if data.get('tag_id'):
  151. DataFlowService._handle_tag_relationship(dataflow_node, data['tag_id'])
  152. # 成功创建图数据库节点后,写入PG数据库
  153. try:
  154. DataFlowService._save_to_pg_database(data, dataflow_name, en_name)
  155. logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
  156. except Exception as pg_error:
  157. logger.error(f"写入PG数据库失败: {str(pg_error)}")
  158. # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
  159. # 在实际应用中,可能需要考虑分布式事务
  160. # 返回创建的数据流信息
  161. result = dict(dataflow_node)
  162. result['id'] = dataflow_node.identity
  163. logger.info(f"创建数据流成功: {dataflow_name}")
  164. return result
  165. except Exception as e:
  166. logger.error(f"创建数据流失败: {str(e)}")
  167. raise e
  168. @staticmethod
  169. def _save_to_pg_database(data: Dict[str, Any], script_name: str, en_name: str):
  170. """
  171. 将脚本信息保存到PG数据库
  172. Args:
  173. data: 包含脚本信息的数据
  174. script_name: 脚本名称
  175. en_name: 英文名称
  176. """
  177. try:
  178. # 提取脚本相关信息
  179. script_requirement = data.get('script_requirement', '')
  180. script_content = data.get('script_content', '')
  181. source_table = data.get('source_table', '')
  182. target_table = data.get('target_table', en_name) # 如果没有指定目标表,使用英文名
  183. script_type = data.get('script_type', 'python')
  184. user_name = data.get('created_by', 'system')
  185. target_dt_column = data.get('target_dt_column', '')
  186. # 验证必需字段
  187. if not target_table:
  188. target_table = en_name
  189. if not script_name:
  190. raise ValueError("script_name不能为空")
  191. # 构建插入SQL
  192. insert_sql = text("""
  193. INSERT INTO dags.data_transform_scripts
  194. (source_table, target_table, script_name, script_type, script_requirement,
  195. script_content, user_name, create_time, update_time, target_dt_column)
  196. VALUES
  197. (:source_table, :target_table, :script_name, :script_type, :script_requirement,
  198. :script_content, :user_name, :create_time, :update_time, :target_dt_column)
  199. ON CONFLICT (target_table, script_name)
  200. DO UPDATE SET
  201. source_table = EXCLUDED.source_table,
  202. script_type = EXCLUDED.script_type,
  203. script_requirement = EXCLUDED.script_requirement,
  204. script_content = EXCLUDED.script_content,
  205. user_name = EXCLUDED.user_name,
  206. update_time = EXCLUDED.update_time,
  207. target_dt_column = EXCLUDED.target_dt_column
  208. """)
  209. # 准备参数
  210. current_time = datetime.now()
  211. params = {
  212. 'source_table': source_table,
  213. 'target_table': target_table,
  214. 'script_name': script_name,
  215. 'script_type': script_type,
  216. 'script_requirement': script_requirement,
  217. 'script_content': script_content,
  218. 'user_name': user_name,
  219. 'create_time': current_time,
  220. 'update_time': current_time,
  221. 'target_dt_column': target_dt_column
  222. }
  223. # 执行插入操作
  224. db.session.execute(insert_sql, params)
  225. db.session.commit()
  226. logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")
  227. except Exception as e:
  228. db.session.rollback()
  229. logger.error(f"写入PG数据库失败: {str(e)}")
  230. raise e
  231. @staticmethod
  232. def _handle_children_relationships(dataflow_node, children_ids):
  233. """处理子节点关系"""
  234. for child_id in children_ids:
  235. try:
  236. # 查找子节点
  237. query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
  238. result = connect_graph.run(query, child_id=child_id).data()
  239. if result:
  240. child_node = result[0]['n']
  241. # 创建关系
  242. if not relationship_exists(dataflow_node, 'child', child_node):
  243. connect_graph.create(Relationship(dataflow_node, 'child', child_node))
  244. logger.info(f"创建子节点关系: {dataflow_node.identity} -> {child_id}")
  245. except Exception as e:
  246. logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
  247. @staticmethod
  248. def _handle_tag_relationship(dataflow_node, tag_id):
  249. """处理标签关系"""
  250. try:
  251. # 查找标签节点
  252. query = "MATCH (n:data_label) WHERE id(n) = $tag_id RETURN n"
  253. result = connect_graph.run(query, tag_id=tag_id).data()
  254. if result:
  255. tag_node = result[0]['n']
  256. # 创建关系
  257. if not relationship_exists(dataflow_node, 'label', tag_node):
  258. connect_graph.create(Relationship(dataflow_node, 'label', tag_node))
  259. logger.info(f"创建标签关系: {dataflow_node.identity} -> {tag_id}")
  260. except Exception as e:
  261. logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
  262. @staticmethod
  263. def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
  264. """
  265. 更新数据流
  266. Args:
  267. dataflow_id: 数据流ID
  268. data: 更新的数据
  269. Returns:
  270. 更新后的数据流信息,如果不存在则返回None
  271. """
  272. try:
  273. # 查找节点
  274. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  275. result = connect_graph.run(query, dataflow_id=dataflow_id).data()
  276. if not result:
  277. return None
  278. # 更新节点属性
  279. update_fields = []
  280. params = {'dataflow_id': dataflow_id}
  281. for key, value in data.items():
  282. if key not in ['id', 'created_at']: # 保护字段
  283. if key == 'config' and isinstance(value, dict):
  284. value = json.dumps(value, ensure_ascii=False)
  285. update_fields.append(f"n.{key} = ${key}")
  286. params[key] = value
  287. if update_fields:
  288. params['updated_at'] = get_formatted_time()
  289. update_fields.append("n.updated_at = $updated_at")
  290. update_query = f"""
  291. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  292. SET {', '.join(update_fields)}
  293. RETURN n
  294. """
  295. result = connect_graph.run(update_query, **params).data()
  296. if result:
  297. node = result[0]['n']
  298. updated_dataflow = dict(node)
  299. updated_dataflow['id'] = node.identity
  300. logger.info(f"更新数据流成功: ID={dataflow_id}")
  301. return updated_dataflow
  302. return None
  303. except Exception as e:
  304. logger.error(f"更新数据流失败: {str(e)}")
  305. raise e
  306. @staticmethod
  307. def delete_dataflow(dataflow_id: int) -> bool:
  308. """
  309. 删除数据流
  310. Args:
  311. dataflow_id: 数据流ID
  312. Returns:
  313. 删除是否成功
  314. """
  315. try:
  316. # 删除节点及其关系
  317. query = """
  318. MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
  319. DETACH DELETE n
  320. RETURN count(n) as deleted_count
  321. """
  322. result = connect_graph.run(query, dataflow_id=dataflow_id).evaluate()
  323. if result and result > 0:
  324. logger.info(f"删除数据流成功: ID={dataflow_id}")
  325. return True
  326. return False
  327. except Exception as e:
  328. logger.error(f"删除数据流失败: {str(e)}")
  329. raise e
  330. @staticmethod
  331. def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
  332. """
  333. 执行数据流
  334. Args:
  335. dataflow_id: 数据流ID
  336. params: 执行参数
  337. Returns:
  338. 执行结果信息
  339. """
  340. try:
  341. # 检查数据流是否存在
  342. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  343. result = connect_graph.run(query, dataflow_id=dataflow_id).data()
  344. if not result:
  345. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  346. execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
  347. # TODO: 这里应该实际执行数据流
  348. # 目前返回模拟结果
  349. result = {
  350. 'execution_id': execution_id,
  351. 'dataflow_id': dataflow_id,
  352. 'status': 'running',
  353. 'started_at': datetime.now().isoformat(),
  354. 'params': params or {},
  355. 'progress': 0
  356. }
  357. logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
  358. return result
  359. except Exception as e:
  360. logger.error(f"执行数据流失败: {str(e)}")
  361. raise e
  362. @staticmethod
  363. def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
  364. """
  365. 获取数据流执行状态
  366. Args:
  367. dataflow_id: 数据流ID
  368. Returns:
  369. 执行状态信息
  370. """
  371. try:
  372. # TODO: 这里应该查询实际的执行状态
  373. # 目前返回模拟状态
  374. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  375. result = connect_graph.run(query, dataflow_id=dataflow_id).data()
  376. if not result:
  377. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  378. status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
  379. return {
  380. 'dataflow_id': dataflow_id,
  381. 'status': status,
  382. 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
  383. 'started_at': datetime.now().isoformat(),
  384. 'completed_at': datetime.now().isoformat() if status == 'completed' else None,
  385. 'error_message': '执行过程中发生错误' if status == 'failed' else None
  386. }
  387. except Exception as e:
  388. logger.error(f"获取数据流状态失败: {str(e)}")
  389. raise e
  390. @staticmethod
  391. def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
  392. """
  393. 获取数据流执行日志
  394. Args:
  395. dataflow_id: 数据流ID
  396. page: 页码
  397. page_size: 每页大小
  398. Returns:
  399. 执行日志列表和分页信息
  400. """
  401. try:
  402. # TODO: 这里应该查询实际的执行日志
  403. # 目前返回模拟日志
  404. query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
  405. result = connect_graph.run(query, dataflow_id=dataflow_id).data()
  406. if not result:
  407. raise ValueError(f"数据流不存在: ID={dataflow_id}")
  408. mock_logs = [
  409. {
  410. 'id': i,
  411. 'timestamp': datetime.now().isoformat(),
  412. 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
  413. 'message': f'数据流执行日志消息 {i}',
  414. 'component': ['source', 'transform', 'target'][i % 3]
  415. }
  416. for i in range(1, 101)
  417. ]
  418. # 分页处理
  419. total = len(mock_logs)
  420. start = (page - 1) * page_size
  421. end = start + page_size
  422. logs = mock_logs[start:end]
  423. return {
  424. 'logs': logs,
  425. 'pagination': {
  426. 'page': page,
  427. 'page_size': page_size,
  428. 'total': total,
  429. 'total_pages': (total + page_size - 1) // page_size
  430. }
  431. }
  432. except Exception as e:
  433. logger.error(f"获取数据流日志失败: {str(e)}")
  434. raise e
  435. @staticmethod
  436. def create_script(request_data: str) -> str:
  437. """
  438. 使用Deepseek模型生成脚本
  439. Args:
  440. request_data: 请求数据,用户需求的文本描述
  441. Returns:
  442. 生成的脚本内容(TXT格式)
  443. """
  444. try:
  445. # 构建prompt
  446. prompt_parts = []
  447. # 添加系统提示
  448. prompt_parts.append("请根据以下需求生成相应的数据处理脚本:")
  449. # 直接将request_data作为文本描述添加到prompt中
  450. prompt_parts.append(request_data)
  451. # 添加格式要求
  452. prompt_parts.append("\n请生成完整可执行的脚本代码,包含必要的注释和错误处理。")
  453. # 组合prompt
  454. full_prompt = "\n\n".join(prompt_parts)
  455. logger.info(f"开始调用Deepseek模型生成脚本,prompt长度: {len(full_prompt)}")
  456. # 调用LLM服务
  457. script_content = llm_client(full_prompt)
  458. if not script_content:
  459. raise ValueError("Deepseek模型返回空内容")
  460. # 确保返回的是文本格式
  461. if not isinstance(script_content, str):
  462. script_content = str(script_content)
  463. logger.info(f"脚本生成成功,内容长度: {len(script_content)}")
  464. return script_content
  465. except Exception as e:
  466. logger.error(f"生成脚本失败: {str(e)}")
  467. raise e