common.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. # common.py
  2. import psycopg2
  3. from neo4j import GraphDatabase
  4. import logging
  5. import importlib.util
  6. from pathlib import Path
  7. import networkx as nx
  8. import os
  9. from datetime import datetime, timedelta
  10. from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
  11. # 创建统一的日志记录器
  12. logger = logging.getLogger("airflow.task")
  13. def get_pg_conn():
  14. """获取PostgreSQL连接"""
  15. return psycopg2.connect(**PG_CONFIG)
  16. def get_neo4j_driver():
  17. """获取Neo4j连接驱动"""
  18. uri = NEO4J_CONFIG['uri']
  19. auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  20. return GraphDatabase.driver(uri, auth=auth)
  21. def update_task_start_time(exec_date, target_table, script_name, start_time):
  22. """更新任务开始时间"""
  23. conn = get_pg_conn()
  24. cursor = conn.cursor()
  25. try:
  26. cursor.execute("""
  27. UPDATE airflow_dag_schedule
  28. SET exec_start_time = %s
  29. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  30. """, (start_time, exec_date, target_table, script_name))
  31. conn.commit()
  32. except Exception as e:
  33. logger.error(f"更新任务开始时间失败: {str(e)}")
  34. conn.rollback()
  35. finally:
  36. cursor.close()
  37. conn.close()
  38. def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
  39. """更新任务完成信息"""
  40. conn = get_pg_conn()
  41. cursor = conn.cursor()
  42. try:
  43. cursor.execute("""
  44. UPDATE airflow_dag_schedule
  45. SET exec_result = %s, exec_end_time = %s, exec_duration = %s
  46. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  47. """, (success, end_time, duration, exec_date, target_table, script_name))
  48. conn.commit()
  49. except Exception as e:
  50. logger.error(f"更新任务完成信息失败: {str(e)}")
  51. conn.rollback()
  52. finally:
  53. cursor.close()
  54. conn.close()
  55. def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
  56. """执行脚本并监控执行情况"""
  57. # 检查script_name是否为空
  58. if not script_name:
  59. logger.error(f"表 {target_table} 的script_name为空,无法执行")
  60. # 记录执行失败
  61. now = datetime.now()
  62. update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
  63. return False
  64. # 记录执行开始时间
  65. start_time = datetime.now()
  66. update_task_start_time(exec_date, target_table, script_name, start_time)
  67. try:
  68. # 执行实际脚本
  69. success = execute_script(script_name, target_table, script_exec_mode)
  70. # 记录结束时间和结果
  71. end_time = datetime.now()
  72. duration = (end_time - start_time).total_seconds()
  73. update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
  74. return success
  75. except Exception as e:
  76. # 处理异常
  77. logger.error(f"执行任务出错: {str(e)}")
  78. end_time = datetime.now()
  79. duration = (end_time - start_time).total_seconds()
  80. update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
  81. raise e
  82. def execute_script(script_name, table_name, execution_mode):
  83. """执行脚本并返回结果"""
  84. if not script_name:
  85. logger.error("未提供脚本名称,无法执行")
  86. return False
  87. try:
  88. # 直接使用配置的部署路径
  89. script_path = Path(SCRIPTS_BASE_PATH) / script_name
  90. logger.info(f"使用配置的Airflow部署路径: {script_path}")
  91. # 动态导入模块
  92. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  93. module = importlib.util.module_from_spec(spec)
  94. spec.loader.exec_module(module)
  95. # 使用标准入口函数run
  96. if hasattr(module, "run"):
  97. logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
  98. result = module.run(table_name=table_name, execution_mode=execution_mode)
  99. logger.info(f"脚本 {script_name} 执行结果: {result}")
  100. return result
  101. else:
  102. logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),无法执行")
  103. return False
  104. except Exception as e:
  105. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  106. import traceback
  107. logger.error(traceback.format_exc())
  108. return False
  109. def generate_optimized_execution_order(table_names, dependency_dict):
  110. """
  111. 生成优化的执行顺序,处理循环依赖
  112. 参数:
  113. table_names: 表名列表
  114. dependency_dict: 依赖关系字典 {表名: [依赖表1, 依赖表2, ...]}
  115. 返回:
  116. list: 优化后的执行顺序列表
  117. """
  118. # 创建有向图
  119. G = nx.DiGraph()
  120. # 添加所有节点
  121. for table_name in table_names:
  122. G.add_node(table_name)
  123. # 添加依赖边
  124. for target, sources in dependency_dict.items():
  125. for source in sources:
  126. if source in table_names: # 确保只考虑目标表集合中的表
  127. # 从依赖指向目标,表示依赖需要先执行
  128. G.add_edge(source, target)
  129. # 检测循环依赖
  130. cycles = list(nx.simple_cycles(G))
  131. if cycles:
  132. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  133. # 打破循环依赖(简单策略:移除每个循环中的一条边)
  134. for cycle in cycles:
  135. # 移除循环中的最后一条边
  136. G.remove_edge(cycle[-1], cycle[0])
  137. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  138. # 生成拓扑排序
  139. try:
  140. execution_order = list(nx.topological_sort(G))
  141. return execution_order
  142. except Exception as e:
  143. logger.error(f"生成执行顺序失败: {str(e)}")
  144. # 返回原始列表作为备选
  145. return table_names
  146. def get_datamodel_dependency_from_neo4j(table_names):
  147. """
  148. 从Neo4j获取DataModel表间的依赖关系
  149. 参数:
  150. table_names: 表名列表
  151. 返回:
  152. dict: 依赖关系字典 {目标表: [依赖表1, 依赖表2, ...]}
  153. """
  154. logger.info(f"开始获取 {len(table_names)} 个表的依赖关系")
  155. # 创建Neo4j连接
  156. driver = get_neo4j_driver()
  157. dependency_dict = {name: [] for name in table_names}
  158. try:
  159. with driver.session() as session:
  160. # 使用一次性查询获取所有表之间的依赖关系
  161. query = """
  162. MATCH (source:DataModel)-[:DERIVED_FROM]->(target:DataModel)
  163. WHERE source.en_name IN $table_names AND target.en_name IN $table_names
  164. RETURN source.en_name AS source, target.en_name AS target
  165. """
  166. result = session.run(query, table_names=table_names)
  167. # 处理结果
  168. for record in result:
  169. source = record.get("source")
  170. target = record.get("target")
  171. if source and target:
  172. # 目标依赖于源
  173. if source in dependency_dict:
  174. dependency_dict[source].append(target)
  175. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  176. except Exception as e:
  177. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  178. finally:
  179. driver.close()
  180. # 记录依赖关系
  181. for table, deps in dependency_dict.items():
  182. if deps:
  183. logger.info(f"表 {table} 依赖于: {deps}")
  184. else:
  185. logger.info(f"表 {table} 没有依赖")
  186. return dependency_dict
  187. def get_today_date():
  188. """获取今天的日期,返回YYYY-MM-DD格式字符串"""
  189. return datetime.now().strftime("%Y-%m-%d")