common.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. # common.py
  2. import psycopg2
  3. from neo4j import GraphDatabase
  4. import logging
  5. import importlib.util
  6. from pathlib import Path
  7. import networkx as nx
  8. import os
  9. from datetime import datetime, timedelta
  10. from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
  11. import functools
  12. import time
  13. # 创建统一的日志记录器
  14. logger = logging.getLogger("airflow.task")
  15. def get_pg_conn():
  16. """获取PostgreSQL连接"""
  17. return psycopg2.connect(**PG_CONFIG)
  18. def get_neo4j_driver():
  19. """获取Neo4j连接驱动"""
  20. uri = NEO4J_CONFIG['uri']
  21. auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
  22. return GraphDatabase.driver(uri, auth=auth)
  23. def update_task_start_time(exec_date, target_table, script_name, start_time):
  24. """更新任务开始时间"""
  25. logger.info(f"===== 更新任务开始时间 =====")
  26. logger.info(f"参数: exec_date={exec_date} ({type(exec_date).__name__}), target_table={target_table}, script_name={script_name}")
  27. conn = get_pg_conn()
  28. cursor = conn.cursor()
  29. try:
  30. # 首先检查记录是否存在
  31. cursor.execute("""
  32. SELECT COUNT(*)
  33. FROM airflow_dag_schedule
  34. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  35. """, (exec_date, target_table, script_name))
  36. count = cursor.fetchone()[0]
  37. logger.info(f"查询到符合条件的记录数: {count}")
  38. if count == 0:
  39. logger.warning(f"未找到匹配的记录: exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
  40. logger.info("尝试记录在airflow_dag_schedule表中找到的记录:")
  41. cursor.execute("""
  42. SELECT exec_date, target_table, script_name
  43. FROM airflow_dag_schedule
  44. LIMIT 5
  45. """)
  46. sample_records = cursor.fetchall()
  47. for record in sample_records:
  48. logger.info(f"样本记录: exec_date={record[0]} ({type(record[0]).__name__}), target_table={record[1]}, script_name={record[2]}")
  49. # 执行更新
  50. sql = """
  51. UPDATE airflow_dag_schedule
  52. SET exec_start_time = %s
  53. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  54. """
  55. logger.info(f"执行SQL: {sql}")
  56. logger.info(f"参数: start_time={start_time}, exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
  57. cursor.execute(sql, (start_time, exec_date, target_table, script_name))
  58. affected_rows = cursor.rowcount
  59. logger.info(f"更新影响的行数: {affected_rows}")
  60. conn.commit()
  61. logger.info("事务已提交")
  62. except Exception as e:
  63. logger.error(f"更新任务开始时间失败: {str(e)}")
  64. import traceback
  65. logger.error(f"错误堆栈: {traceback.format_exc()}")
  66. conn.rollback()
  67. logger.info("事务已回滚")
  68. raise
  69. finally:
  70. cursor.close()
  71. conn.close()
  72. logger.info("数据库连接已关闭")
  73. logger.info("===== 更新任务开始时间完成 =====")
  74. def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
  75. """更新任务完成信息"""
  76. logger.info(f"===== 更新任务完成信息 =====")
  77. logger.info(f"参数: exec_date={exec_date} ({type(exec_date).__name__}), target_table={target_table}, script_name={script_name}")
  78. logger.info(f"参数: success={success} ({type(success).__name__}), end_time={end_time}, duration={duration}")
  79. conn = get_pg_conn()
  80. cursor = conn.cursor()
  81. try:
  82. # 首先检查记录是否存在
  83. cursor.execute("""
  84. SELECT COUNT(*)
  85. FROM airflow_dag_schedule
  86. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  87. """, (exec_date, target_table, script_name))
  88. count = cursor.fetchone()[0]
  89. logger.info(f"查询到符合条件的记录数: {count}")
  90. if count == 0:
  91. logger.warning(f"未找到匹配的记录: exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
  92. # 查询表中前几条记录作为参考
  93. cursor.execute("""
  94. SELECT exec_date, target_table, script_name
  95. FROM airflow_dag_schedule
  96. LIMIT 5
  97. """)
  98. sample_records = cursor.fetchall()
  99. logger.info("airflow_dag_schedule表中的样本记录:")
  100. for record in sample_records:
  101. logger.info(f"样本记录: exec_date={record[0]} ({type(record[0]).__name__}), target_table={record[1]}, script_name={record[2]}")
  102. # 确保success是布尔类型
  103. if not isinstance(success, bool):
  104. original_success = success
  105. success = bool(success)
  106. logger.warning(f"success参数不是布尔类型,原始值: {original_success},转换为: {success}")
  107. # 执行更新
  108. sql = """
  109. UPDATE airflow_dag_schedule
  110. SET exec_result = %s, exec_end_time = %s, exec_duration = %s
  111. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  112. """
  113. logger.info(f"执行SQL: {sql}")
  114. logger.info(f"参数: success={success}, end_time={end_time}, duration={duration}, exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
  115. cursor.execute(sql, (success, end_time, duration, exec_date, target_table, script_name))
  116. affected_rows = cursor.rowcount
  117. logger.info(f"更新影响的行数: {affected_rows}")
  118. if affected_rows == 0:
  119. logger.warning("更新操作没有影响任何行,可能是因为条件不匹配")
  120. # 尝试用不同格式的exec_date查询
  121. if isinstance(exec_date, str):
  122. try:
  123. # 尝试解析日期字符串
  124. from datetime import datetime
  125. parsed_date = datetime.strptime(exec_date, "%Y-%m-%d").date()
  126. logger.info(f"尝试使用解析后的日期格式: {parsed_date}")
  127. cursor.execute("""
  128. SELECT COUNT(*)
  129. FROM airflow_dag_schedule
  130. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  131. """, (parsed_date, target_table, script_name))
  132. parsed_count = cursor.fetchone()[0]
  133. logger.info(f"使用解析日期后查询到的记录数: {parsed_count}")
  134. if parsed_count > 0:
  135. # 尝试用解析的日期更新
  136. cursor.execute("""
  137. UPDATE airflow_dag_schedule
  138. SET exec_result = %s, exec_end_time = %s, exec_duration = %s
  139. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  140. """, (success, end_time, duration, parsed_date, target_table, script_name))
  141. new_affected_rows = cursor.rowcount
  142. logger.info(f"使用解析日期后更新影响的行数: {new_affected_rows}")
  143. except Exception as parse_e:
  144. logger.error(f"尝试解析日期格式时出错: {str(parse_e)}")
  145. conn.commit()
  146. logger.info("事务已提交")
  147. except Exception as e:
  148. logger.error(f"更新任务完成信息失败: {str(e)}")
  149. import traceback
  150. logger.error(f"错误堆栈: {traceback.format_exc()}")
  151. conn.rollback()
  152. logger.info("事务已回滚")
  153. raise
  154. finally:
  155. cursor.close()
  156. conn.close()
  157. logger.info("数据库连接已关闭")
  158. logger.info("===== 更新任务完成信息完成 =====")
  159. def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
  160. """执行脚本并监控执行情况"""
  161. # 添加详细日志
  162. logger.info(f"===== 开始监控执行 =====")
  163. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  164. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  165. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  166. logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
  167. # 检查script_name是否为空
  168. if not script_name:
  169. logger.error(f"表 {target_table} 的script_name为空,无法执行")
  170. # 记录执行失败
  171. now = datetime.now()
  172. update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
  173. return False
  174. # 记录执行开始时间
  175. start_time = datetime.now()
  176. # 尝试更新开始时间并记录结果
  177. try:
  178. update_task_start_time(exec_date, target_table, script_name, start_time)
  179. logger.info(f"成功更新任务开始时间: {start_time}")
  180. except Exception as e:
  181. logger.error(f"更新任务开始时间失败: {str(e)}")
  182. try:
  183. # 执行实际脚本
  184. logger.info(f"开始执行脚本: {script_name}")
  185. result = execute_script(script_name, target_table, script_exec_mode)
  186. logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  187. # 确保result是布尔值
  188. if result is None:
  189. logger.warning(f"脚本返回值为None,转换为False")
  190. result = False
  191. elif not isinstance(result, bool):
  192. original_result = result
  193. result = bool(result)
  194. logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  195. # 记录结束时间和结果
  196. end_time = datetime.now()
  197. duration = (end_time - start_time).total_seconds()
  198. # 尝试更新完成状态并记录结果
  199. try:
  200. logger.info(f"尝试更新完成状态: result={result}, end_time={end_time}, duration={duration}")
  201. update_task_completion(exec_date, target_table, script_name, result, end_time, duration)
  202. logger.info(f"成功更新任务完成状态,结果: {result}")
  203. except Exception as e:
  204. logger.error(f"更新任务完成状态失败: {str(e)}")
  205. logger.info(f"===== 监控执行完成 =====")
  206. return result
  207. except Exception as e:
  208. # 处理异常
  209. logger.error(f"执行任务出错: {str(e)}")
  210. end_time = datetime.now()
  211. duration = (end_time - start_time).total_seconds()
  212. # 尝试更新失败状态并记录结果
  213. try:
  214. logger.info(f"尝试更新失败状态: end_time={end_time}, duration={duration}")
  215. update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
  216. logger.info(f"成功更新任务失败状态")
  217. except Exception as update_e:
  218. logger.error(f"更新任务失败状态失败: {str(update_e)}")
  219. logger.info(f"===== 监控执行异常结束 =====")
  220. raise e
  221. def ensure_boolean_result(func):
  222. """装饰器:确保函数返回布尔值"""
  223. @functools.wraps(func)
  224. def wrapper(*args, **kwargs):
  225. try:
  226. result = func(*args, **kwargs)
  227. logger.debug(f"脚本原始返回值: {result} (类型: {type(result).__name__})")
  228. # 处理None值
  229. if result is None:
  230. logger.warning(f"脚本函数 {func.__name__} 返回了None,默认设置为False")
  231. return False
  232. # 处理非布尔值
  233. if not isinstance(result, bool):
  234. try:
  235. # 尝试转换为布尔值
  236. bool_result = bool(result)
  237. logger.warning(f"脚本函数 {func.__name__} 返回非布尔值 {result},已转换为布尔值 {bool_result}")
  238. return bool_result
  239. except Exception as e:
  240. logger.error(f"无法将脚本返回值 {result} 转换为布尔值: {str(e)}")
  241. return False
  242. return result
  243. except Exception as e:
  244. logger.error(f"脚本函数 {func.__name__} 执行出错: {str(e)}")
  245. return False
  246. return wrapper
  247. def execute_script(script_path=None, script_name=None, script_exec_mode=None, table_name=None, execution_mode=None, args=None):
  248. """
  249. 执行指定的脚本,并返回执行结果
  250. 支持两种调用方式:
  251. 1. execute_script(script_path, script_name, script_exec_mode, args={})
  252. 2. execute_script(script_name, table_name, execution_mode)
  253. """
  254. # 确定调用方式并统一参数
  255. if script_path and script_name and script_exec_mode is not None:
  256. # 第一种调用方式
  257. if args is None:
  258. args = {}
  259. elif script_name and table_name and execution_mode is not None:
  260. # 第二种调用方式
  261. script_path = os.path.join(SCRIPTS_BASE_PATH, f"{script_name}.py")
  262. script_exec_mode = execution_mode
  263. args = {"table_name": table_name}
  264. else:
  265. logger.error("参数不正确,无法执行脚本")
  266. return False
  267. try:
  268. # 确保脚本路径存在
  269. if not os.path.exists(script_path):
  270. logger.error(f"脚本路径 {script_path} 不存在")
  271. return False
  272. # 加载脚本模块
  273. spec = importlib.util.spec_from_file_location("script_module", script_path)
  274. module = importlib.util.module_from_spec(spec)
  275. spec.loader.exec_module(module)
  276. # 检查并记录所有可用的函数
  277. module_functions = [f for f in dir(module) if callable(getattr(module, f)) and not f.startswith('_')]
  278. logger.debug(f"模块 {script_name} 中的可用函数: {module_functions}")
  279. # 获取脚本的运行函数
  280. if not hasattr(module, "run"):
  281. logger.error(f"脚本 {script_name} 没有run函数")
  282. return False
  283. # 装饰run函数,确保返回布尔值
  284. original_run = module.run
  285. module.run = ensure_boolean_result(original_run)
  286. logger.info(f"开始执行脚本 {script_name},执行模式: {script_exec_mode}, 参数: {args}")
  287. start_time = time.time()
  288. # 执行脚本
  289. if table_name is not None:
  290. # 第二种调用方式的参数格式
  291. exec_result = module.run(table_name=table_name, execution_mode=script_exec_mode)
  292. else:
  293. # 第一种调用方式的参数格式
  294. exec_result = module.run(script_exec_mode, args)
  295. end_time = time.time()
  296. duration = end_time - start_time
  297. logger.info(f"脚本 {script_name} 执行完成,结果: {exec_result}, 耗时: {duration:.2f}秒")
  298. return exec_result
  299. except Exception as e:
  300. logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
  301. import traceback
  302. logger.error(traceback.format_exc())
  303. return False
  304. def generate_optimized_execution_order(table_names, dependency_dict):
  305. """
  306. 生成优化的执行顺序,处理循环依赖
  307. 参数:
  308. table_names: 表名列表
  309. dependency_dict: 依赖关系字典 {表名: [依赖表1, 依赖表2, ...]}
  310. 返回:
  311. list: 优化后的执行顺序列表
  312. """
  313. # 创建有向图
  314. G = nx.DiGraph()
  315. # 添加所有节点
  316. for table_name in table_names:
  317. G.add_node(table_name)
  318. # 添加依赖边
  319. for target, sources in dependency_dict.items():
  320. for source in sources:
  321. if source in table_names: # 确保只考虑目标表集合中的表
  322. # 从依赖指向目标,表示依赖需要先执行
  323. G.add_edge(source, target)
  324. # 检测循环依赖
  325. cycles = list(nx.simple_cycles(G))
  326. if cycles:
  327. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  328. # 打破循环依赖(简单策略:移除每个循环中的一条边)
  329. for cycle in cycles:
  330. # 移除循环中的最后一条边
  331. G.remove_edge(cycle[-1], cycle[0])
  332. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  333. # 生成拓扑排序
  334. try:
  335. execution_order = list(nx.topological_sort(G))
  336. return execution_order
  337. except Exception as e:
  338. logger.error(f"生成执行顺序失败: {str(e)}")
  339. # 返回原始列表作为备选
  340. return table_names
  341. def get_datamodel_dependency_from_neo4j(table_names):
  342. """
  343. 从Neo4j获取DataModel表间的依赖关系
  344. 参数:
  345. table_names: 表名列表
  346. 返回:
  347. dict: 依赖关系字典 {目标表: [依赖表1, 依赖表2, ...]}
  348. """
  349. logger.info(f"开始获取 {len(table_names)} 个表的依赖关系")
  350. # 创建Neo4j连接
  351. driver = get_neo4j_driver()
  352. dependency_dict = {name: [] for name in table_names}
  353. try:
  354. with driver.session() as session:
  355. # 使用一次性查询获取所有表之间的依赖关系
  356. query = """
  357. MATCH (source:DataModel)-[:DERIVED_FROM]->(target:DataModel)
  358. WHERE source.en_name IN $table_names AND target.en_name IN $table_names
  359. RETURN source.en_name AS source, target.en_name AS target
  360. """
  361. result = session.run(query, table_names=table_names)
  362. # 处理结果
  363. for record in result:
  364. source = record.get("source")
  365. target = record.get("target")
  366. if source and target:
  367. # 目标依赖于源
  368. if source in dependency_dict:
  369. dependency_dict[source].append(target)
  370. logger.debug(f"依赖关系: {source} 依赖于 {target}")
  371. except Exception as e:
  372. logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
  373. finally:
  374. driver.close()
  375. # 记录依赖关系
  376. for table, deps in dependency_dict.items():
  377. if deps:
  378. logger.info(f"表 {table} 依赖于: {deps}")
  379. else:
  380. logger.info(f"表 {table} 没有依赖")
  381. return dependency_dict
  382. def get_today_date():
  383. """获取今天的日期,返回YYYY-MM-DD格式字符串"""
  384. return datetime.now().strftime("%Y-%m-%d")