common.py 9.5 KB


  1. # common.py
  2. import psycopg2
  3. from neo4j import GraphDatabase
  4. import logging
  5. import importlib.util
  6. from pathlib import Path
  7. import networkx as nx
  8. import os
  9. from datetime import datetime, timedelta
  10. from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
  11. # 创建统一的日志记录器
  12. logger = logging.getLogger("airflow.task")
  13. def get_pg_conn():
  14. """获取PostgreSQL连接"""
  15. return psycopg2.connect(**PG_CONFIG)
  16. def get_neo4j_driver():
  17. """获取Neo4j连接驱动"""
  18. uri = NEO4J_CONFIG['uri']
  19. auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  20. return GraphDatabase.driver(uri, auth=auth)
  21. def update_task_start_time(exec_date, target_table, script_name, start_time):
  22. """更新任务开始时间"""
  23. conn = get_pg_conn()
  24. cursor = conn.cursor()
  25. try:
  26. cursor.execute("""
  27. UPDATE airflow_dag_schedule
  28. SET exec_start_time = %s
  29. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  30. """, (start_time, exec_date, target_table, script_name))
  31. conn.commit()
  32. except Exception as e:
  33. logger.error(f"更新任务开始时间失败: {str(e)}")
  34. conn.rollback()
  35. finally:
  36. cursor.close()
  37. conn.close()
  38. def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
  39. """更新任务完成信息"""
  40. conn = get_pg_conn()
  41. cursor = conn.cursor()
  42. try:
  43. cursor.execute("""
  44. UPDATE airflow_dag_schedule
  45. SET exec_result = %s, exec_end_time = %s, exec_duration = %s
  46. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  47. """, (success, end_time, duration, exec_date, target_table, script_name))
  48. conn.commit()
  49. except Exception as e:
  50. logger.error(f"更新任务完成信息失败: {str(e)}")
  51. conn.rollback()
  52. finally:
  53. cursor.close()
  54. conn.close()
  55. def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
  56. """执行脚本并监控执行情况"""
  57. # 检查script_name是否为空
  58. if not script_name:
  59. logger.error(f"表 {target_table} 的script_name为空,无法执行")
  60. # 记录执行失败
  61. now = datetime.now()
  62. update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
  63. return False
  64. # 记录执行开始时间
  65. start_time = datetime.now()
  66. update_task_start_time(exec_date, target_table, script_name, start_time)
  67. try:
  68. # 执行实际脚本
  69. success = execute_script(script_name, target_table, script_exec_mode)
  70. # 记录结束时间和结果
  71. end_time = datetime.now()
  72. duration = (end_time - start_time).total_seconds()
  73. update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
  74. return success
  75. except Exception as e:
  76. # 处理异常
  77. logger.error(f"执行任务出错: {str(e)}")
  78. end_time = datetime.now()
  79. duration = (end_time - start_time).total_seconds()
  80. update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
  81. raise e
  82. def execute_script(script_name, table_name, execution_mode):
  83. """执行脚本并返回结果"""
  84. if not script_name:
  85. logger.error("未提供脚本名称,无法执行")
  86. return False
  87. try:
  88. # 检查脚本路径
  89. script_path = Path(SCRIPTS_BASE_PATH) / script_name
  90. logger.info(f"准备执行脚本,完整路径: {script_path}")
  91. # 检查脚本路径是否存在
  92. if not os.path.exists(script_path):
  93. logger.error(f"脚本文件不存在: {script_path}")
  94. logger.error(f"请确认脚本文件已部署到正确路径: {SCRIPTS_BASE_PATH}")
  95. # 尝试列出脚本目录中的文件
  96. try:
  97. script_dir = Path(SCRIPTS_BASE_PATH)
  98. if os.path.exists(script_dir):
  99. files = os.listdir(script_dir)
  100. logger.info(f"可用脚本文件: {files}")
  101. else:
  102. logger.error(f"脚本目录不存在: {script_dir}")
  103. except Exception as le:
  104. logger.error(f"尝试列出脚本目录内容时出错: {str(le)}")
  105. return False
  106. logger.info(f"脚本文件存在,开始导入: {script_path}")
  107. # 动态导入模块
  108. try:
  109. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  110. if spec is None:
  111. logger.error(f"无法加载脚本规范: {script_path}")
  112. return False
  113. module = importlib.util.module_from_spec(spec)
  114. spec.loader.exec_module(module)
  115. logger.info(f"成功导入脚本模块: {script_name}")
  116. except ImportError as ie:
  117. logger.error(f"导入脚本时出错: {str(ie)}")
  118. import traceback
  119. logger.error(traceback.format_exc())
  120. return False
  121. except SyntaxError as se:
  122. logger.error(f"脚本语法错误: {str(se)}")
  123. logger.error(f"错误位置: {se.filename}, 行 {se.lineno}, 列 {se.offset}")
  124. return False
  125. # 验证run函数存在
  126. if not hasattr(module, "run"):
  127. available_funcs = [func for func in dir(module) if callable(getattr(module, func)) and not func.startswith("_")]
  128. logger.error(f"脚本 {script_name} 未定义标准入口函数 run(),无法执行")
  129. logger.error(f"可用函数: {available_funcs}")
  130. return False
  131. # 执行run函数
  132. logger.info(f"执行脚本 {script_name} 的run函数,参数: table_name={table_name}, execution_mode={execution_mode}")
  133. result = module.run(table_name=table_name, execution_mode=execution_mode)
  134. logger.info(f"脚本 {script_name} 执行结果: {result}")
  135. return result
  136. except Exception as e:
  137. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  138. import traceback
  139. logger.error(traceback.format_exc())
  140. return False
  141. def generate_optimized_execution_order(table_names, dependency_dict):
  142. """
  143. 生成优化的执行顺序,处理循环依赖
  144. 参数:
  145. table_names: 表名列表
  146. dependency_dict: 依赖关系字典 {表名: [依赖表1, 依赖表2, ...]}
  147. 返回:
  148. list: 优化后的执行顺序列表
  149. """
  150. # 创建有向图
  151. G = nx.DiGraph()
  152. # 添加所有节点
  153. for table_name in table_names:
  154. G.add_node(table_name)
  155. # 添加依赖边
  156. for target, sources in dependency_dict.items():
  157. for source in sources:
  158. if source in table_names: # 确保只考虑目标表集合中的表
  159. # 从依赖指向目标,表示依赖需要先执行
  160. G.add_edge(source, target)
  161. # 检测循环依赖
  162. cycles = list(nx.simple_cycles(G))
  163. if cycles:
  164. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  165. # 打破循环依赖(简单策略:移除每个循环中的一条边)
  166. for cycle in cycles:
  167. # 移除循环中的最后一条边
  168. G.remove_edge(cycle[-1], cycle[0])
  169. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  170. # 生成拓扑排序
  171. try:
  172. execution_order = list(nx.topological_sort(G))
  173. return execution_order
  174. except Exception as e:
  175. logger.error(f"生成执行顺序失败: {str(e)}")
  176. # 返回原始列表作为备选
  177. return table_names
  178. def get_datamodel_dependency_from_neo4j(table_names):
  179. """
  180. 从Neo4j获取DataModel表间的依赖关系
  181. 参数:
  182. table_names: 表名列表
  183. 返回:
  184. dict: 依赖关系字典 {目标表: [依赖表1, 依赖表2, ...]}
  185. """
  186. logger.info(f"开始获取 {len(table_names)} 个表的依赖关系")
  187. # 创建Neo4j连接
  188. driver = get_neo4j_driver()
  189. dependency_dict = {name: [] for name in table_names}
  190. try:
  191. with driver.session() as session:
  192. # 使用一次性查询获取所有表之间的依赖关系
  193. query = """
  194. MATCH (source:DataModel)-[:DERIVED_FROM]->(target:DataModel)
  195. WHERE source.en_name IN $table_names AND target.en_name IN $table_names
  196. RETURN source.en_name AS source, target.en_name AS target
  197. """
  198. result = session.run(query, table_names=table_names)
  199. # 处理结果
  200. for record in result:
  201. source = record.get("source")
  202. target = record.get("target")
  203. if source and target:
  204. # 目标依赖于源
  205. if source in dependency_dict:
  206. dependency_dict[source].append(target)
  207. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  208. except Exception as e:
  209. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  210. finally:
  211. driver.close()
  212. # 记录依赖关系
  213. for table, deps in dependency_dict.items():
  214. if deps:
  215. logger.info(f"表 {table} 依赖于: {deps}")
  216. else:
  217. logger.info(f"表 {table} 没有依赖")
  218. return dependency_dict
  219. def get_today_date():
  220. """获取今天的日期,返回YYYY-MM-DD格式字符串"""
  221. return datetime.now().strftime("%Y-%m-%d")