dataops_productline_execute_dag.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900
  1. """
  2. 统一数据产品线执行器 DAG
  3. 功能:
  4. 1. 面向脚本的作业编排,不再是面向表
  5. 2. 基于dataops_productline_prepare_dag生成的执行计划执行脚本
  6. 3. 支持对脚本执行顺序的优化
  7. 4. 提供详细的执行日志和错误处理
  8. """
  9. from airflow import DAG
  10. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  11. from airflow.operators.empty import EmptyOperator
  12. from airflow.utils.task_group import TaskGroup
  13. from datetime import datetime, timedelta, date
  14. import logging
  15. import networkx as nx
  16. import json
  17. import os
  18. import pendulum
  19. from decimal import Decimal
  20. from utils import (
  21. get_pg_conn,
  22. get_neo4j_driver,
  23. get_today_date,
  24. get_cn_exec_date
  25. )
  26. from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
  27. import pytz
  28. import pandas as pd
  29. import sys
  30. # 创建日志记录器
  31. logger = logging.getLogger(__name__)
  32. # 开启详细诊断日志记录
  33. ENABLE_DEBUG_LOGGING = True
  34. def log_debug(message):
  35. """记录调试日志,但只在启用调试模式时"""
  36. if ENABLE_DEBUG_LOGGING:
  37. logger.info(f"[DEBUG] {message}")
  38. # 在DAG启动时输出诊断信息
  39. log_debug("======== 诊断信息 ========")
  40. log_debug(f"当前工作目录: {os.getcwd()}")
  41. log_debug(f"SCRIPTS_BASE_PATH: {SCRIPTS_BASE_PATH}")
  42. log_debug(f"导入的common模块路径: {get_pg_conn.__module__}")
  43. #############################################
  44. # 通用工具函数
  45. #############################################
  46. def json_serial(obj):
  47. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  48. if isinstance(obj, (datetime, date)):
  49. return obj.isoformat()
  50. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  51. # 添加自定义JSON编码器解决Decimal序列化问题
  52. class DecimalEncoder(json.JSONEncoder):
  53. def default(self, obj):
  54. if isinstance(obj, Decimal):
  55. return float(obj)
  56. # 处理日期类型
  57. elif isinstance(obj, (datetime, date)):
  58. return obj.isoformat()
  59. # 让父类处理其他类型
  60. return super(DecimalEncoder, self).default(obj)
  61. #############################################
  62. # 脚本执行函数
  63. #############################################
  64. def execute_python_script(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
  65. """
  66. 执行Python脚本文件并返回执行结果
  67. 参数:
  68. script_id: 脚本ID
  69. script_name: 脚本文件名(.py文件)
  70. target_table: 目标表名
  71. script_exec_mode: 执行模式
  72. frequency: 执行频率
  73. **kwargs: 其他参数,如source_tables、target_type等
  74. 返回:
  75. bool: 脚本执行结果
  76. """
  77. # 获取执行日期
  78. logical_date = kwargs.get('logical_date', datetime.now())
  79. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  80. # 添加详细日志
  81. logger.info(f"===== 开始执行Python脚本文件 {script_id} =====")
  82. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  83. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  84. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  85. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  86. logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
  87. logger.info(f"【时间参数】execute_python_script: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  88. # 记录额外参数
  89. for key, value in kwargs.items():
  90. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  91. # 检查script_name是否为空
  92. if not script_name:
  93. logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
  94. return False
  95. # 记录执行开始时间
  96. start_time = datetime.now()
  97. try:
  98. # 导入和执行脚本模块
  99. import importlib.util
  100. import sys
  101. script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
  102. if not os.path.exists(script_path):
  103. logger.error(f"脚本文件不存在: {script_path}")
  104. return False
  105. # 动态导入模块
  106. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  107. module = importlib.util.module_from_spec(spec)
  108. spec.loader.exec_module(module)
  109. # 检查并调用标准入口函数run
  110. if hasattr(module, "run"):
  111. logger.info(f"调用脚本文件 {script_name} 的标准入口函数 run()")
  112. # 构建完整的参数字典
  113. run_params = {
  114. "table_name": target_table,
  115. "execution_mode": script_exec_mode,
  116. "exec_date": exec_date,
  117. "frequency": frequency
  118. }
  119. ## 添加可能的额外参数
  120. for key in ['target_type', 'storage_location', 'source_tables']:
  121. if key in kwargs and kwargs[key] is not None:
  122. run_params[key] = kwargs[key]
  123. # 调用脚本的run函数
  124. logger.info(f"调用run函数并传递参数: {run_params}")
  125. result = module.run(**run_params)
  126. logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  127. # 确保result是布尔值
  128. if result is None:
  129. logger.warning(f"脚本返回值为None,转换为False")
  130. result = False
  131. elif not isinstance(result, bool):
  132. original_result = result
  133. result = bool(result)
  134. logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  135. # 记录结束时间和结果
  136. end_time = datetime.now()
  137. duration = (end_time - start_time).total_seconds()
  138. logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  139. return result
  140. else:
  141. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  142. return False
  143. except Exception as e:
  144. # 处理异常
  145. logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
  146. end_time = datetime.now()
  147. duration = (end_time - start_time).total_seconds()
  148. logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  149. logger.info(f"===== 脚本执行异常结束 =====")
  150. import traceback
  151. logger.error(traceback.format_exc())
  152. # 确保不会阻塞DAG
  153. return False
  154. # 使用execute_sql函数代替之前的execute_sql_script
  155. def execute_sql(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
  156. """
  157. 执行SQL脚本并返回执行结果
  158. 参数:
  159. script_id: 脚本ID
  160. script_name: 脚本名称(数据库中的名称)
  161. target_table: 目标表名
  162. script_exec_mode: 执行模式
  163. frequency: 执行频率
  164. **kwargs: 其他参数
  165. 返回:
  166. bool: 脚本执行结果
  167. """
  168. # 获取执行日期
  169. logical_date = kwargs.get('logical_date', datetime.now())
  170. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  171. # 添加详细日志
  172. logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
  173. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  174. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  175. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  176. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  177. logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
  178. logger.info(f"【时间参数】execute_sql: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  179. # 记录额外参数
  180. for key, value in kwargs.items():
  181. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  182. # 记录执行开始时间
  183. start_time = datetime.now()
  184. try:
  185. # 导入和执行execution_sql模块
  186. import importlib.util
  187. import sys
  188. exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
  189. # 对于SQL类型的脚本,我们不检查它是否作为文件存在
  190. # 但是我们需要检查execution_sql.py是否存在
  191. if not os.path.exists(exec_sql_path):
  192. logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
  193. return False
  194. # 动态导入execution_sql模块
  195. try:
  196. spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
  197. exec_sql_module = importlib.util.module_from_spec(spec)
  198. spec.loader.exec_module(exec_sql_module)
  199. logger.info(f"成功导入 execution_sql 模块")
  200. except Exception as import_err:
  201. logger.error(f"导入 execution_sql 模块时出错: {str(import_err)}")
  202. import traceback
  203. logger.error(traceback.format_exc())
  204. return False
  205. # 检查并调用标准入口函数run
  206. if hasattr(exec_sql_module, "run"):
  207. logger.info(f"调用执行SQL脚本的标准入口函数 run()")
  208. # 构建完整的参数字典
  209. run_params = {
  210. "script_type": "sql",
  211. "target_table": target_table,
  212. "script_name": script_name,
  213. "exec_date": exec_date,
  214. "frequency": frequency,
  215. "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
  216. "execution_mode": script_exec_mode # 传递执行模式参数
  217. }
  218. # 添加可能的额外参数
  219. for key in ['target_type', 'storage_location', 'source_tables']:
  220. if key in kwargs and kwargs[key] is not None:
  221. run_params[key] = kwargs[key]
  222. # 调用execution_sql.py的run函数
  223. logger.info(f"调用SQL执行脚本的run函数并传递参数: {run_params}")
  224. result = exec_sql_module.run(**run_params)
  225. logger.info(f"SQL脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  226. # 确保result是布尔值
  227. if result is None:
  228. logger.warning(f"SQL脚本返回值为None,转换为False")
  229. result = False
  230. elif not isinstance(result, bool):
  231. original_result = result
  232. result = bool(result)
  233. logger.warning(f"SQL脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  234. # 记录结束时间和结果
  235. end_time = datetime.now()
  236. duration = (end_time - start_time).total_seconds()
  237. logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  238. return result
  239. else:
  240. logger.error(f"执行SQL脚本 execution_sql.py 中未定义标准入口函数 run(),无法执行")
  241. return False
  242. except Exception as e:
  243. # 处理异常
  244. logger.error(f"执行SQL脚本 {script_id} 出错: {str(e)}")
  245. end_time = datetime.now()
  246. duration = (end_time - start_time).total_seconds()
  247. logger.error(f"SQL脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  248. logger.info(f"===== SQL脚本执行异常结束 =====")
  249. import traceback
  250. logger.error(traceback.format_exc())
  251. # 确保不会阻塞DAG
  252. return False
  253. # 使用execute_python函数代替之前的execute_python_script
  254. def execute_python(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
  255. """
  256. 执行Python脚本并返回执行结果
  257. 参数:
  258. script_id: 脚本ID
  259. script_name: 脚本名称(数据库中的名称)
  260. target_table: 目标表名
  261. script_exec_mode: 执行模式
  262. frequency: 执行频率
  263. **kwargs: 其他参数
  264. 返回:
  265. bool: 脚本执行结果
  266. """
  267. # 获取执行日期
  268. logical_date = kwargs.get('logical_date', datetime.now())
  269. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  270. # 添加详细日志
  271. logger.info(f"===== 开始执行Python脚本 {script_id} =====")
  272. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  273. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  274. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  275. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  276. logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
  277. logger.info(f"【时间参数】execute_python: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  278. # 记录额外参数
  279. for key, value in kwargs.items():
  280. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  281. # 记录执行开始时间
  282. start_time = datetime.now()
  283. try:
  284. # 导入和执行execution_python模块
  285. import importlib.util
  286. import sys
  287. exec_python_path = os.path.join(SCRIPTS_BASE_PATH, "execution_python.py")
  288. # 对于Python类型的脚本,我们不检查它是否作为文件存在
  289. # 但是我们需要检查execution_python.py是否存在
  290. if not os.path.exists(exec_python_path):
  291. logger.error(f"Python执行脚本文件不存在: {exec_python_path}")
  292. return False
  293. # 动态导入execution_python模块
  294. try:
  295. spec = importlib.util.spec_from_file_location("execution_python", exec_python_path)
  296. exec_python_module = importlib.util.module_from_spec(spec)
  297. spec.loader.exec_module(exec_python_module)
  298. logger.info(f"成功导入 execution_python 模块")
  299. except Exception as import_err:
  300. logger.error(f"导入 execution_python 模块时出错: {str(import_err)}")
  301. import traceback
  302. logger.error(traceback.format_exc())
  303. return False
  304. # 检查并调用标准入口函数run
  305. if hasattr(exec_python_module, "run"):
  306. logger.info(f"调用执行Python脚本的标准入口函数 run()")
  307. # 构建完整的参数字典
  308. run_params = {
  309. "script_type": "python",
  310. "target_table": target_table,
  311. "script_name": script_name,
  312. "exec_date": exec_date,
  313. "frequency": frequency,
  314. "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签
  315. "execution_mode": script_exec_mode # 传递执行模式参数
  316. }
  317. # 添加可能的额外参数
  318. for key in ['target_type', 'storage_location', 'source_tables']:
  319. if key in kwargs and kwargs[key] is not None:
  320. run_params[key] = kwargs[key]
  321. # 调用execution_python.py的run函数
  322. logger.info(f"调用Python执行脚本的run函数并传递参数: {run_params}")
  323. result = exec_python_module.run(**run_params)
  324. logger.info(f"Python脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  325. # 确保result是布尔值
  326. if result is None:
  327. logger.warning(f"Python脚本返回值为None,转换为False")
  328. result = False
  329. elif not isinstance(result, bool):
  330. original_result = result
  331. result = bool(result)
  332. logger.warning(f"Python脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  333. # 记录结束时间和结果
  334. end_time = datetime.now()
  335. duration = (end_time - start_time).total_seconds()
  336. logger.info(f"Python脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  337. return result
  338. else:
  339. logger.error(f"执行Python脚本 execution_python.py 中未定义标准入口函数 run(),无法执行")
  340. return False
  341. except Exception as e:
  342. # 处理异常
  343. logger.error(f"执行Python脚本 {script_id} 出错: {str(e)}")
  344. end_time = datetime.now()
  345. duration = (end_time - start_time).total_seconds()
  346. logger.error(f"Python脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  347. logger.info(f"===== Python脚本执行异常结束 =====")
  348. import traceback
  349. logger.error(traceback.format_exc())
  350. # 确保不会阻塞DAG
  351. return False
  352. #############################################
  353. # 执行计划获取和处理函数
  354. #############################################
  355. def get_execution_plan_from_db(ds):
  356. """
  357. 从数据库获取产品线执行计划
  358. 参数:
  359. ds (str): 执行日期,格式为'YYYY-MM-DD'
  360. 返回:
  361. dict: 执行计划字典,如果找不到则返回None
  362. """
  363. # 记录输入参数详细信息
  364. if isinstance(ds, datetime):
  365. if ds.tzinfo:
  366. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 带时区: {ds.tzinfo}")
  367. else:
  368. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 无时区")
  369. else:
  370. logger.debug(f"【执行日期】get_execution_plan_from_db接收到: {ds}, 类型: {type(ds)}")
  371. logger.info(f"尝试从数据库获取执行日期 {ds} 的产品线执行计划")
  372. conn = get_pg_conn()
  373. cursor = conn.cursor()
  374. execution_plan = None
  375. try:
  376. # 查询条件a: 当前日期=表的exec_date,如果有多条记录,取logical_date最大的一条
  377. cursor.execute("""
  378. SELECT plan
  379. FROM airflow_exec_plans
  380. WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date = %s
  381. ORDER BY logical_date DESC
  382. LIMIT 1
  383. """, (ds,))
  384. result = cursor.fetchone()
  385. if result:
  386. # 获取计划
  387. plan_json = result[0]
  388. # 处理plan_json可能已经是dict的情况
  389. if isinstance(plan_json, dict):
  390. execution_plan = plan_json
  391. else:
  392. execution_plan = json.loads(plan_json)
  393. logger.info(f"找到当前日期 exec_date={ds} 的执行计划记录")
  394. return execution_plan
  395. # 查询条件b: 找不到当前日期的记录,查找exec_date<当前ds的最新记录
  396. logger.info(f"未找到当前日期 exec_date={ds} 的执行计划记录,尝试查找历史记录")
  397. cursor.execute("""
  398. SELECT plan, exec_date
  399. FROM airflow_exec_plans
  400. WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date < %s
  401. ORDER BY exec_date DESC, logical_date DESC
  402. LIMIT 1
  403. """, (ds,))
  404. result = cursor.fetchone()
  405. if result:
  406. # 获取计划和exec_date
  407. plan_json, plan_ds = result
  408. # 处理plan_json可能已经是dict的情况
  409. if isinstance(plan_json, dict):
  410. execution_plan = plan_json
  411. else:
  412. execution_plan = json.loads(plan_json)
  413. logger.info(f"找到历史执行计划记录,exec_date: {plan_ds}")
  414. return execution_plan
  415. # 找不到任何执行计划记录
  416. logger.error(f"在数据库中未找到任何执行计划记录,当前DAG exec_date={ds}")
  417. return None
  418. except Exception as e:
  419. logger.error(f"从数据库获取执行计划时出错: {str(e)}")
  420. import traceback
  421. logger.error(traceback.format_exc())
  422. return None
  423. finally:
  424. cursor.close()
  425. conn.close()
  426. def check_execution_plan(**kwargs):
  427. """
  428. 检查执行计划是否存在且有效
  429. 返回False将阻止所有下游任务执行
  430. """
  431. dag_run = kwargs.get('dag_run')
  432. logical_date = dag_run.logical_date
  433. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  434. # 检查是否是手动触发
  435. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  436. if is_manual_trigger:
  437. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  438. # 记录重要的时间参数
  439. logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  440. logger.info("检查数据库中的执行计划是否存在且有效")
  441. # 从数据库获取执行计划
  442. execution_plan = get_execution_plan_from_db(exec_date)
  443. # 检查是否成功获取到执行计划
  444. if not execution_plan:
  445. logger.error(f"未找到执行日期 {exec_date} 的执行计划")
  446. return False
  447. # 检查执行计划是否包含必要字段
  448. if "exec_date" not in execution_plan:
  449. logger.error("执行计划缺少exec_date字段")
  450. return False
  451. if not isinstance(execution_plan.get("scripts", []), list):
  452. logger.error("执行计划的scripts字段无效")
  453. return False
  454. if not isinstance(execution_plan.get("script_dependencies", {}), dict):
  455. logger.error("执行计划的script_dependencies字段无效")
  456. return False
  457. # 检查是否有脚本数据
  458. scripts = execution_plan.get("scripts", [])
  459. if not scripts:
  460. logger.warning("执行计划不包含任何脚本")
  461. # 如果没有脚本,则阻止下游任务执行
  462. return False
  463. logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本")
  464. # 保存执行计划到XCom以便下游任务使用
  465. kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
  466. return True
  467. def optimize_execution_order(scripts, script_dependencies):
  468. """
  469. 使用NetworkX优化脚本执行顺序
  470. 参数:
  471. scripts (list): 脚本信息列表
  472. script_dependencies (dict): 脚本依赖关系字典
  473. 返回:
  474. list: 优化后的脚本执行顺序(脚本ID列表)
  475. """
  476. logger.info("开始使用NetworkX优化脚本执行顺序")
  477. # 构建依赖图
  478. G = nx.DiGraph()
  479. # 添加所有脚本作为节点
  480. for script in scripts:
  481. script_id = script['script_id']
  482. G.add_node(script_id)
  483. # 添加依赖边
  484. for script_id, dependencies in script_dependencies.items():
  485. for dep_id in dependencies:
  486. # 添加从script_id到dep_id的边,表示script_id依赖于dep_id
  487. G.add_edge(script_id, dep_id)
  488. logger.debug(f"添加依赖边: {script_id} -> {dep_id}")
  489. # 检查是否有循环依赖
  490. try:
  491. cycles = list(nx.simple_cycles(G))
  492. if cycles:
  493. logger.warning(f"检测到循环依赖: {cycles}")
  494. # 处理循环依赖,可以通过删除一些边来打破循环
  495. for cycle in cycles:
  496. # 选择一条边删除,这里简单地选择第一条边
  497. if len(cycle) > 1:
  498. G.remove_edge(cycle[0], cycle[1])
  499. logger.warning(f"删除边 {cycle[0]} -> {cycle[1]} 以打破循环")
  500. except Exception as e:
  501. logger.error(f"检测循环依赖时出错: {str(e)}")
  502. # 使用拓扑排序获取执行顺序
  503. try:
  504. # 反转图,因为我们的边表示"依赖于"关系,而拓扑排序需要"优先于"关系
  505. reverse_G = G.reverse()
  506. execution_order = list(nx.topological_sort(reverse_G))
  507. # 反转结果,使上游任务先执行
  508. execution_order.reverse()
  509. logger.info(f"NetworkX优化后的脚本执行顺序: {execution_order}")
  510. return execution_order
  511. except Exception as e:
  512. logger.error(f"生成脚本执行顺序时出错: {str(e)}")
  513. # 出错时返回原始脚本ID列表,不进行优化
  514. return [script['script_id'] for script in scripts]
  515. def create_execution_plan(**kwargs):
  516. """
  517. 创建或获取执行计划
  518. """
  519. try:
  520. dag_run = kwargs.get('dag_run')
  521. logical_date = dag_run.logical_date
  522. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  523. # 检查是否是手动触发
  524. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  525. if is_manual_trigger:
  526. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  527. # 记录重要的时间参数
  528. logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}")
  529. # 从XCom获取执行计划
  530. execution_plan = kwargs['ti'].xcom_pull(task_ids='check_execution_plan', key='execution_plan')
  531. # 如果找不到执行计划,则从数据库获取
  532. if not execution_plan:
  533. logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
  534. execution_plan = get_execution_plan_from_db(exec_date)
  535. if not execution_plan:
  536. logger.error(f"执行日期 {exec_date} 没有找到执行计划")
  537. return None
  538. # 验证执行计划结构
  539. scripts = execution_plan.get("scripts", [])
  540. script_dependencies = execution_plan.get("script_dependencies", {})
  541. execution_order = execution_plan.get("execution_order", [])
  542. # 如果执行计划中没有execution_order或为空,使用NetworkX优化
  543. if not execution_order:
  544. logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
  545. execution_order = optimize_execution_order(scripts, script_dependencies)
  546. execution_plan["execution_order"] = execution_order
  547. # 保存完整的执行计划到XCom
  548. kwargs['ti'].xcom_push(key='full_execution_plan', value=execution_plan)
  549. logger.info(f"成功处理执行计划,包含 {len(scripts)} 个脚本")
  550. return execution_plan
  551. except Exception as e:
  552. logger.error(f"创建执行计划时出错: {str(e)}")
  553. import traceback
  554. logger.error(traceback.format_exc())
  555. return None
  556. # 创建DAG
  557. with DAG(
  558. "dataops_productline_execute_dag",
  559. start_date=datetime(2024, 1, 1),
  560. schedule_interval="@daily", # 设置为每日调度
  561. catchup=False,
  562. default_args={
  563. 'owner': 'airflow',
  564. 'depends_on_past': False,
  565. 'email_on_failure': False,
  566. 'email_on_retry': False,
  567. 'retries': 1,
  568. 'retry_delay': timedelta(minutes=5)
  569. },
  570. params={
  571. 'MANUAL_TRIGGER': False,
  572. }
  573. ) as dag:
  574. # 记录DAG实例化时的重要信息
  575. now = datetime.now()
  576. now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
  577. default_exec_date = get_today_date()
  578. logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期(用于初始化,非实际执行日期): {default_exec_date}")
  579. #############################################
  580. # 准备阶段: 检查并创建执行计划
  581. #############################################
  582. with TaskGroup("prepare_phase") as prepare_group:
  583. # 检查执行计划是否存在
  584. check_plan = ShortCircuitOperator(
  585. task_id="check_execution_plan",
  586. python_callable=check_execution_plan,
  587. provide_context=True
  588. )
  589. # 创建执行计划
  590. create_plan = PythonOperator(
  591. task_id="create_execution_plan",
  592. python_callable=create_execution_plan,
  593. provide_context=True
  594. )
  595. # 设置任务依赖
  596. check_plan >> create_plan
  597. #############################################
  598. # 执行阶段: 按依赖关系执行脚本
  599. #############################################
  600. with TaskGroup("execution_phase") as execution_group:
  601. try:
  602. # 获取当前DAG的执行日期
  603. exec_date = get_today_date() # 使用当天日期作为默认值
  604. logger.info(f"当前DAG执行日期 ds={exec_date},尝试从数据库获取执行计划")
  605. # 从数据库获取执行计划
  606. execution_plan = get_execution_plan_from_db(exec_date)
  607. # 检查是否成功获取到执行计划
  608. if execution_plan is None:
  609. error_msg = f"无法从数据库获取有效的执行计划,当前DAG exec_date={exec_date}"
  610. logger.error(error_msg)
  611. # 使用全局变量而不是异常来强制DAG失败
  612. raise ValueError(error_msg)
  613. # 提取信息
  614. exec_date = execution_plan.get("exec_date", exec_date)
  615. scripts = execution_plan.get("scripts", [])
  616. script_dependencies = execution_plan.get("script_dependencies", {})
  617. execution_order = execution_plan.get("execution_order", [])
  618. # 如果执行计划中没有execution_order或为空,使用NetworkX优化
  619. if not execution_order:
  620. logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
  621. execution_order = optimize_execution_order(scripts, script_dependencies)
  622. logger.info(f"执行计划: exec_date={exec_date}, scripts数量={len(scripts)}")
  623. # 如果执行计划为空(没有脚本),也应该失败
  624. if not scripts:
  625. error_msg = f"执行计划中没有任何脚本,当前DAG exec_date={exec_date}"
  626. logger.error(error_msg)
  627. raise ValueError(error_msg)
  628. # 1. 创建开始和结束任务
  629. start_execution = EmptyOperator(
  630. task_id="start_execution"
  631. )
  632. execution_completed = EmptyOperator(
  633. task_id="execution_completed",
  634. trigger_rule="none_failed_min_one_success" # 只要有一个任务成功且没有失败的任务就标记为完成
  635. )
  636. # 创建脚本任务字典,用于管理任务依赖
  637. task_dict = {}
  638. # 2. 先创建所有脚本任务,不设置依赖关系
  639. for script in scripts:
  640. script_id = script['script_id']
  641. script_name = script.get("script_name")
  642. target_table = script.get("target_table")
  643. script_type = script.get("script_type", "python")
  644. script_exec_mode = script.get("script_exec_mode", "append")
  645. source_tables = script.get("source_tables", [])
  646. target_table_label = script.get("target_table_label", "")
  647. # 使用描述性的任务ID,包含脚本名称和目标表
  648. # 提取文件名
  649. if "/" in script_name:
  650. script_file = script_name.split("/")[-1] # 获取文件名部分
  651. else:
  652. script_file = script_name
  653. # 确保任务ID不包含不允许的特殊字符
  654. safe_script_name = script_file.replace(" ", "_")
  655. safe_target_table = target_table.replace("-", "_").replace(" ", "_")
  656. # 按照指定格式创建任务ID
  657. task_id = f"{safe_script_name}-TO-{safe_target_table}"
  658. # 构建op_kwargs参数
  659. op_kwargs = {
  660. "script_id": script_id,
  661. "script_name": script_name,
  662. "target_table": target_table,
  663. "script_exec_mode": script_exec_mode,
  664. "source_tables": source_tables,
  665. "frequency": script.get("frequency", "daily"), # 显式添加frequency参数
  666. "target_table_label": target_table_label,
  667. # logical_date会在任务执行时由Airflow自动添加
  668. }
  669. # 添加特殊参数(如果有)
  670. for key in ['target_type', 'storage_location']:
  671. if key in script and script[key] is not None:
  672. op_kwargs[key] = script[key]
  673. # 根据脚本类型和目标表标签选择执行函数
  674. if script_type.lower() == 'sql' and target_table_label == 'DataModel':
  675. # 使用SQL脚本执行函数
  676. logger.info(f"脚本 {script_id} 是SQL类型且目标表标签为DataModel,使用execute_sql函数执行")
  677. python_callable = execute_sql
  678. elif script_type.lower() == 'python' and target_table_label == 'DataModel':
  679. # 使用Python脚本执行函数
  680. logger.info(f"脚本 {script_id} 是Python类型且目标表标签为DataModel,使用execute_python函数执行")
  681. python_callable = execute_python
  682. elif script_type.lower() == 'python_script':
  683. # 使用Python脚本文件执行函数
  684. logger.info(f"脚本 {script_id} 是python_script类型,使用execute_python_script函数执行")
  685. python_callable = execute_python_script
  686. else:
  687. # 默认使用Python脚本文件执行函数
  688. logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
  689. python_callable = execute_python_script
  690. # 创建任务
  691. script_task = PythonOperator(
  692. task_id=task_id,
  693. python_callable=python_callable,
  694. op_kwargs=op_kwargs,
  695. retries=TASK_RETRY_CONFIG["retries"],
  696. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  697. )
  698. # 将任务添加到字典
  699. task_dict[script_id] = script_task
  700. # 3. 设置开始任务与所有无依赖的脚本任务的关系
  701. no_dep_scripts = []
  702. for script_id, dependencies in script_dependencies.items():
  703. if not dependencies: # 如果没有依赖
  704. if script_id in task_dict:
  705. no_dep_scripts.append(script_id)
  706. start_execution >> task_dict[script_id]
  707. logger.info(f"设置无依赖脚本: start_execution >> {script_id}")
  708. # 4. 设置脚本间的依赖关系
  709. for script_id, dependencies in script_dependencies.items():
  710. for dep_id in dependencies:
  711. if script_id in task_dict and dep_id in task_dict:
  712. # 正确的依赖关系:依赖任务 >> 当前任务
  713. task_dict[dep_id] >> task_dict[script_id]
  714. logger.info(f"设置脚本依赖: {dep_id} >> {script_id}")
  715. # 5. 找出所有叶子节点(没有下游任务的节点)并连接到execution_completed
  716. # 首先,构建一个下游节点集合
  717. has_downstream = set()
  718. for script_id, dependencies in script_dependencies.items():
  719. for dep_id in dependencies:
  720. has_downstream.add(dep_id)
  721. # 然后,找出没有下游节点的任务
  722. leaf_nodes = []
  723. for script_id in task_dict:
  724. if script_id not in has_downstream:
  725. leaf_nodes.append(script_id)
  726. task_dict[script_id] >> execution_completed
  727. logger.info(f"将叶子节点连接到completion: {script_id} >> execution_completed")
  728. # 如果没有找到叶子节点,则将所有任务都连接到completion
  729. if not leaf_nodes:
  730. logger.warning("未找到叶子节点,将所有任务连接到completion")
  731. for script_id, task in task_dict.items():
  732. task >> execution_completed
  733. # 设置TaskGroup与prepare_phase的依赖关系
  734. prepare_group >> start_execution
  735. logger.info(f"成功创建 {len(task_dict)} 个脚本执行任务")
  736. except Exception as e:
  737. logger.error(f"加载执行计划或创建任务时出错: {str(e)}")
  738. import traceback
  739. logger.error(traceback.format_exc())
  740. # 添加触发finalize DAG的任务
  741. from airflow.operators.trigger_dagrun import TriggerDagRunOperator
  742. trigger_finalize_dag = TriggerDagRunOperator(
  743. task_id="trigger_finalize_dag",
  744. trigger_dag_id="dataops_productline_finalize_dag",
  745. conf={"execution_date": "{{ ds }}", "parent_execution_date": "{{ execution_date }}", "parent_run_id": "{{ run_id }}"},
  746. reset_dag_run=True,
  747. wait_for_completion=False,
  748. poke_interval=60,
  749. )
  750. # 设置依赖关系,确保执行阶段完成后触发finalize DAG
  751. execution_group >> trigger_finalize_dag
  752. logger.info(f"DAG dataops_productline_execute_dag 定义完成")