dataops_productline_execute_dag.py 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225
  1. """
  2. 统一数据产品线执行器 DAG
  3. 功能:
  4. 1. 面向脚本的作业编排,不再是面向表
  5. 2. 基于dataops_productline_prepare_dag生成的执行计划执行脚本
  6. 3. 支持对脚本执行顺序的优化
  7. 4. 提供详细的执行日志和错误处理
  8. 预期的执行计划模式:
  9. {
  10. "version": "2.0",
  11. "exec_date": "YYYY-MM-DD",
  12. "scripts": [
  13. {
  14. "task_id": "唯一任务ID",
  15. "script_id": "唯一脚本ID",
  16. "script_name": "脚本文件名或标识符",
  17. "script_type": "python|sql|python_script",
  18. "target_type": "structure|null",
  19. "update_mode": "append|full_refresh",
  20. "target_table": "表名",
  21. "source_tables": ["表1", "表2"],
  22. "schedule_status": true,
  23. "storage_location": "/路径/模式" 或 null,
  24. "schedule_frequency": "daily|weekly|monthly|quarterly|yearly",
  25. "target_table_label": "DataModel|DataResource|DataSource"
  26. },
  27. ...
  28. ],
  29. "model_scripts": ["script_id1", "script_id2", ...],
  30. "resource_scripts": ["script_id3", "script_id4", ...],
  31. "execution_order": ["script_id1", "script_id3", "script_id2", "script_id4", ...],
  32. "script_dependencies": {
  33. "script_id1": ["script_id3", "script_id4"],
  34. "script_id2": [],
  35. ...
  36. }
  37. }
  38. """
  39. from airflow import DAG
  40. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  41. from airflow.operators.empty import EmptyOperator
  42. from airflow.utils.task_group import TaskGroup
  43. from datetime import datetime, timedelta, date
  44. from airflow.models import Variable
  45. import logging
  46. import networkx as nx
  47. import json
  48. import os
  49. import pendulum
  50. from decimal import Decimal
  51. from utils import (
  52. get_pg_conn,
  53. get_neo4j_driver,
  54. get_today_date,
  55. get_cn_exec_date
  56. )
  57. from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
  58. import pytz
  59. import pandas as pd
  60. import sys
  61. # 创建日志记录器
  62. logger = logging.getLogger(__name__)
  63. # 开启详细诊断日志记录
  64. ENABLE_DEBUG_LOGGING = True
  65. def log_debug(message):
  66. """记录调试日志,但只在启用调试模式时"""
  67. if ENABLE_DEBUG_LOGGING:
  68. logger.info(f"[DEBUG] {message}")
  69. # 在DAG启动时输出诊断信息
  70. log_debug("======== 诊断信息 ========")
  71. log_debug(f"当前工作目录: {os.getcwd()}")
  72. log_debug(f"SCRIPTS_BASE_PATH: {SCRIPTS_BASE_PATH}")
  73. log_debug(f"导入的common模块路径: {get_pg_conn.__module__}")
  74. #############################################
  75. # 通用工具函数
  76. #############################################
  77. def json_serial(obj):
  78. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  79. if isinstance(obj, (datetime, date)):
  80. return obj.isoformat()
  81. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  82. # 添加自定义JSON编码器解决Decimal序列化问题
  83. class DecimalEncoder(json.JSONEncoder):
  84. def default(self, obj):
  85. if isinstance(obj, Decimal):
  86. return float(obj)
  87. # 处理日期类型
  88. elif isinstance(obj, (datetime, date)):
  89. return obj.isoformat()
  90. # 让父类处理其他类型
  91. return super(DecimalEncoder, self).default(obj)
  92. #############################################
  93. # 脚本执行函数
  94. #############################################
  95. def execute_python_script(script_id, script_name, target_table, update_mode, schedule_frequency, **kwargs):
  96. """
  97. 执行Python脚本文件并返回执行结果
  98. 参数:
  99. script_id: 脚本ID
  100. script_name: 脚本文件名(.py文件)
  101. target_table: 目标表名
  102. update_mode: 执行模式
  103. schedule_frequency: 执行频率
  104. **kwargs: 其他参数,如source_tables、target_type等
  105. 返回:
  106. bool: 脚本执行结果
  107. """
  108. # 获取执行日期
  109. logical_date = kwargs.get('logical_date', datetime.now())
  110. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  111. # 添加详细日志
  112. logger.info(f"===== 开始执行Python脚本文件 {script_id} =====")
  113. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  114. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  115. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  116. logger.info(f"update_mode: {update_mode}, 类型: {type(update_mode)}")
  117. logger.info(f"schedule_frequency: {schedule_frequency}, 类型: {type(schedule_frequency)}")
  118. logger.info(f"【时间参数】execute_python_script: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  119. # 记录额外参数
  120. for key, value in kwargs.items():
  121. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  122. # 检查script_name是否为空
  123. if not script_name:
  124. logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
  125. return False
  126. # 记录执行开始时间
  127. start_time = datetime.now()
  128. try:
  129. # 导入和执行脚本模块
  130. import importlib.util
  131. import sys
  132. script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
  133. if not os.path.exists(script_path):
  134. logger.error(f"脚本文件不存在: {script_path}")
  135. return False
  136. # 动态导入模块
  137. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  138. module = importlib.util.module_from_spec(spec)
  139. spec.loader.exec_module(module)
  140. # 检查并调用标准入口函数run
  141. if hasattr(module, "run"):
  142. logger.info(f"调用脚本文件 {script_name} 的标准入口函数 run()")
  143. # 构建完整的参数字典
  144. run_params = {
  145. "table_name": target_table,
  146. "update_mode": update_mode,
  147. "exec_date": exec_date,
  148. "schedule_frequency": schedule_frequency,
  149. "script_name": script_name,
  150. }
  151. ## 添加可能的额外参数
  152. for key in ['target_type', 'storage_location', 'source_tables']:
  153. if key in kwargs and kwargs[key] is not None:
  154. run_params[key] = kwargs[key]
  155. # 调用脚本的run函数
  156. logger.info(f"调用run函数并传递参数: {run_params}")
  157. result = module.run(**run_params)
  158. logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  159. # 确保result是布尔值
  160. if result is None:
  161. logger.warning(f"脚本返回值为None,转换为False")
  162. result = False
  163. elif not isinstance(result, bool):
  164. original_result = result
  165. result = bool(result)
  166. logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  167. # 记录结束时间和结果
  168. end_time = datetime.now()
  169. duration = (end_time - start_time).total_seconds()
  170. logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  171. return result
  172. else:
  173. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  174. return False
  175. except Exception as e:
  176. # 处理异常
  177. logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
  178. end_time = datetime.now()
  179. duration = (end_time - start_time).total_seconds()
  180. logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  181. logger.info(f"===== 脚本执行异常结束 =====")
  182. import traceback
  183. logger.error(traceback.format_exc())
  184. # 确保不会阻塞DAG
  185. return False
  186. # 使用execute_sql函数代替之前的execute_sql_script
  187. def execute_sql(script_id, script_name, target_table, update_mode, schedule_frequency, **kwargs):
  188. """
  189. 执行SQL脚本并返回执行结果
  190. 参数:
  191. script_id: 脚本ID
  192. script_name: 脚本名称(数据库中的名称)
  193. target_table: 目标表名
  194. update_mode: 执行模式
  195. schedule_frequency: 执行频率
  196. **kwargs: 其他参数
  197. 返回:
  198. bool: 脚本执行结果
  199. """
  200. # 获取执行日期
  201. logical_date = kwargs.get('logical_date', datetime.now())
  202. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  203. # 添加详细日志
  204. logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
  205. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  206. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  207. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  208. logger.info(f"update_mode: {update_mode}, 类型: {type(update_mode)}")
  209. logger.info(f"schedule_frequency: {schedule_frequency}, 类型: {type(schedule_frequency)}")
  210. logger.info(f"【时间参数】execute_sql: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  211. # 记录额外参数
  212. for key, value in kwargs.items():
  213. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  214. # 记录执行开始时间
  215. start_time = datetime.now()
  216. try:
  217. # 导入和执行execution_sql模块
  218. import importlib.util
  219. import sys
  220. exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
  221. # 对于SQL类型的脚本,我们不检查它是否作为文件存在
  222. # 但是我们需要检查execution_sql.py是否存在
  223. if not os.path.exists(exec_sql_path):
  224. logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
  225. return False
  226. # 动态导入execution_sql模块
  227. try:
  228. spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
  229. exec_sql_module = importlib.util.module_from_spec(spec)
  230. spec.loader.exec_module(exec_sql_module)
  231. logger.info(f"成功导入 execution_sql 模块")
  232. except Exception as import_err:
  233. logger.error(f"导入 execution_sql 模块时出错: {str(import_err)}")
  234. import traceback
  235. logger.error(traceback.format_exc())
  236. return False
  237. # 检查并调用标准入口函数run
  238. if hasattr(exec_sql_module, "run"):
  239. logger.info(f"调用执行SQL脚本的标准入口函数 run()")
  240. # 构建完整的参数字典
  241. run_params = {
  242. "script_type": "sql",
  243. "target_table": target_table,
  244. "script_name": script_name,
  245. "exec_date": exec_date,
  246. "schedule_frequency": schedule_frequency,
  247. "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
  248. "update_mode": update_mode # 传递执行模式参数
  249. }
  250. # 添加可能的额外参数
  251. for key in ['target_type', 'storage_location', 'source_tables']:
  252. if key in kwargs and kwargs[key] is not None:
  253. run_params[key] = kwargs[key]
  254. # 调用execution_sql.py的run函数
  255. logger.info(f"调用SQL执行脚本的run函数并传递参数: {run_params}")
  256. result = exec_sql_module.run(**run_params)
  257. logger.info(f"SQL脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  258. # 确保result是布尔值
  259. if result is None:
  260. logger.warning(f"SQL脚本返回值为None,转换为False")
  261. result = False
  262. elif not isinstance(result, bool):
  263. original_result = result
  264. result = bool(result)
  265. logger.warning(f"SQL脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  266. # 记录结束时间和结果
  267. end_time = datetime.now()
  268. duration = (end_time - start_time).total_seconds()
  269. logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  270. return result
  271. else:
  272. logger.error(f"执行SQL脚本 execution_sql.py 中未定义标准入口函数 run(),无法执行")
  273. return False
  274. except Exception as e:
  275. # 处理异常
  276. logger.error(f"执行SQL脚本 {script_id} 出错: {str(e)}")
  277. end_time = datetime.now()
  278. duration = (end_time - start_time).total_seconds()
  279. logger.error(f"SQL脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  280. logger.info(f"===== SQL脚本执行异常结束 =====")
  281. import traceback
  282. logger.error(traceback.format_exc())
  283. # 确保不会阻塞DAG
  284. return False
  285. # 使用execute_python函数代替之前的execute_python_script
  286. def execute_python(script_id, script_name, target_table, update_mode, schedule_frequency, **kwargs):
  287. """
  288. 执行Python脚本并返回执行结果
  289. 参数:
  290. script_id: 脚本ID
  291. script_name: 脚本名称(数据库中的名称)
  292. target_table: 目标表名
  293. update_mode: 执行模式
  294. schedule_frequency: 执行频率
  295. **kwargs: 其他参数
  296. 返回:
  297. bool: 脚本执行结果
  298. """
  299. # 获取执行日期
  300. logical_date = kwargs.get('logical_date', datetime.now())
  301. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  302. # 添加详细日志
  303. logger.info(f"===== 开始执行Python脚本 {script_id} =====")
  304. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  305. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  306. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  307. logger.info(f"update_mode: {update_mode}, 类型: {type(update_mode)}")
  308. logger.info(f"schedule_frequency: {schedule_frequency}, 类型: {type(schedule_frequency)}")
  309. logger.info(f"【时间参数】execute_python: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  310. # 记录额外参数
  311. for key, value in kwargs.items():
  312. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  313. # 记录执行开始时间
  314. start_time = datetime.now()
  315. try:
  316. # 导入和执行execution_python模块
  317. import importlib.util
  318. import sys
  319. exec_python_path = os.path.join(SCRIPTS_BASE_PATH, "execution_python.py")
  320. # 对于Python类型的脚本,我们不检查它是否作为文件存在
  321. # 但是我们需要检查execution_python.py是否存在
  322. if not os.path.exists(exec_python_path):
  323. logger.error(f"Python执行脚本文件不存在: {exec_python_path}")
  324. return False
  325. # 动态导入execution_python模块
  326. try:
  327. spec = importlib.util.spec_from_file_location("execution_python", exec_python_path)
  328. exec_python_module = importlib.util.module_from_spec(spec)
  329. spec.loader.exec_module(exec_python_module)
  330. logger.info(f"成功导入 execution_python 模块")
  331. except Exception as import_err:
  332. logger.error(f"导入 execution_python 模块时出错: {str(import_err)}")
  333. import traceback
  334. logger.error(traceback.format_exc())
  335. return False
  336. # 检查并调用标准入口函数run
  337. if hasattr(exec_python_module, "run"):
  338. logger.info(f"调用执行Python脚本的标准入口函数 run()")
  339. # 构建完整的参数字典
  340. run_params = {
  341. "script_type": "python",
  342. "target_table": target_table,
  343. "script_name": script_name,
  344. "exec_date": exec_date,
  345. "schedule_frequency": schedule_frequency,
  346. "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签
  347. "update_mode": update_mode # 传递执行模式参数
  348. }
  349. # 添加可能的额外参数
  350. for key in ['target_type', 'storage_location', 'source_tables']:
  351. if key in kwargs and kwargs[key] is not None:
  352. run_params[key] = kwargs[key]
  353. # 调用execution_python.py的run函数
  354. logger.info(f"调用Python执行脚本的run函数并传递参数: {run_params}")
  355. result = exec_python_module.run(**run_params)
  356. logger.info(f"Python脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  357. # 确保result是布尔值
  358. if result is None:
  359. logger.warning(f"Python脚本返回值为None,转换为False")
  360. result = False
  361. elif not isinstance(result, bool):
  362. original_result = result
  363. result = bool(result)
  364. logger.warning(f"Python脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  365. # 记录结束时间和结果
  366. end_time = datetime.now()
  367. duration = (end_time - start_time).total_seconds()
  368. logger.info(f"Python脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  369. return result
  370. else:
  371. logger.error(f"执行Python脚本 execution_python.py 中未定义标准入口函数 run(),无法执行")
  372. return False
  373. except Exception as e:
  374. # 处理异常
  375. logger.error(f"执行Python脚本 {script_id} 出错: {str(e)}")
  376. end_time = datetime.now()
  377. duration = (end_time - start_time).total_seconds()
  378. logger.error(f"Python脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  379. logger.info(f"===== Python脚本执行异常结束 =====")
  380. import traceback
  381. logger.error(traceback.format_exc())
  382. # 确保不会阻塞DAG
  383. return False
  384. #############################################
  385. # 执行计划获取和处理函数
  386. #############################################
  387. def get_execution_plan_from_db(ds):
  388. """
  389. 从数据库获取产品线执行计划
  390. 参数:
  391. ds (str): 执行日期,格式为'YYYY-MM-DD'
  392. 返回:
  393. dict: 执行计划字典,如果找不到则返回None
  394. """
  395. # 记录输入参数详细信息
  396. if isinstance(ds, datetime):
  397. if ds.tzinfo:
  398. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 带时区: {ds.tzinfo}")
  399. else:
  400. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 无时区")
  401. else:
  402. logger.debug(f"【执行日期】get_execution_plan_from_db接收到: {ds}, 类型: {type(ds)}")
  403. logger.info(f"尝试从数据库获取执行日期 {ds} 的产品线执行计划")
  404. conn = get_pg_conn()
  405. cursor = conn.cursor()
  406. execution_plan = None
  407. try:
  408. # 查询条件a: 当前日期=表的exec_date,如果有多条记录,取logical_date最大的一条
  409. cursor.execute("""
  410. SELECT plan
  411. FROM airflow_exec_plans
  412. WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date = %s
  413. ORDER BY logical_date DESC
  414. LIMIT 1
  415. """, (ds,))
  416. result = cursor.fetchone()
  417. if result:
  418. # 获取计划
  419. plan_json = result[0]
  420. # 处理plan_json可能已经是dict的情况
  421. if isinstance(plan_json, dict):
  422. execution_plan = plan_json
  423. else:
  424. execution_plan = json.loads(plan_json)
  425. logger.info(f"找到当前日期 exec_date={ds} 的执行计划记录")
  426. return execution_plan
  427. # 查询条件b: 找不到当前日期的记录,查找exec_date<当前ds的最新记录
  428. logger.info(f"未找到当前日期 exec_date={ds} 的执行计划记录,尝试查找历史记录")
  429. cursor.execute("""
  430. SELECT plan, exec_date
  431. FROM airflow_exec_plans
  432. WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date < %s
  433. ORDER BY exec_date DESC, logical_date DESC
  434. LIMIT 1
  435. """, (ds,))
  436. result = cursor.fetchone()
  437. if result:
  438. # 获取计划和exec_date
  439. plan_json, plan_ds = result
  440. # 处理plan_json可能已经是dict的情况
  441. if isinstance(plan_json, dict):
  442. execution_plan = plan_json
  443. else:
  444. execution_plan = json.loads(plan_json)
  445. logger.info(f"找到历史执行计划记录,exec_date: {plan_ds}")
  446. return execution_plan
  447. # 找不到任何执行计划记录
  448. logger.error(f"在数据库中未找到任何执行计划记录,当前DAG exec_date={ds}")
  449. return None
  450. except Exception as e:
  451. logger.error(f"从数据库获取执行计划时出错: {str(e)}")
  452. import traceback
  453. logger.error(traceback.format_exc())
  454. return None
  455. finally:
  456. cursor.close()
  457. conn.close()
  458. def check_execution_plan(**kwargs):
  459. """
  460. 检查执行计划是否存在且有效
  461. 返回False将阻止所有下游任务执行
  462. """
  463. dag_run = kwargs.get('dag_run')
  464. logical_date = dag_run.logical_date
  465. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  466. # 检查是否是手动触发
  467. dag_run = kwargs['dag_run']
  468. logger.info(f"This DAG run was triggered via: {dag_run.run_type}")
  469. if dag_run.external_trigger:
  470. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  471. # 记录重要的时间参数
  472. logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  473. logger.info("检查数据库中的执行计划是否存在且有效")
  474. # 从数据库获取执行计划
  475. execution_plan = get_execution_plan_from_db(exec_date)
  476. # 检查是否成功获取到执行计划
  477. if not execution_plan:
  478. logger.error(f"未找到执行日期 {exec_date} 的执行计划")
  479. return False
  480. # 检查执行计划是否包含必要字段
  481. if "exec_date" not in execution_plan:
  482. logger.error("执行计划缺少exec_date字段")
  483. return False
  484. if not isinstance(execution_plan.get("scripts", []), list):
  485. logger.error("执行计划的scripts字段无效")
  486. return False
  487. if not isinstance(execution_plan.get("resource_scripts", []), list):
  488. logger.error("执行计划的resource_scripts字段无效")
  489. return False
  490. if not isinstance(execution_plan.get("model_scripts", []), list):
  491. logger.error("执行计划的model_scripts字段无效")
  492. return False
  493. # 检查是否有脚本数据
  494. scripts = execution_plan.get("scripts", [])
  495. resource_scripts = execution_plan.get("resource_scripts", [])
  496. model_scripts = execution_plan.get("model_scripts", [])
  497. logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本,{len(resource_scripts)} 个资源脚本和 {len(model_scripts)} 个模型脚本")
  498. # 保存执行计划到XCom以便下游任务使用
  499. kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
  500. return True
  501. def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
  502. """
  503. 将执行计划保存到airflow_exec_plans表
  504. 参数:
  505. execution_plan (dict): 执行计划字典
  506. dag_id (str): DAG的ID
  507. run_id (str): DAG运行的ID
  508. logical_date (datetime): 逻辑日期
  509. ds (str): 日期字符串,格式为YYYY-MM-DD
  510. 返回:
  511. bool: 操作是否成功
  512. """
  513. try:
  514. conn = get_pg_conn()
  515. cursor = conn.cursor()
  516. try:
  517. # 将执行计划转换为JSON字符串
  518. plan_json = json.dumps(execution_plan)
  519. # 获取本地时间
  520. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  521. # 插入记录
  522. cursor.execute("""
  523. INSERT INTO airflow_exec_plans
  524. (dag_id, run_id, logical_date, local_logical_date, exec_date, plan)
  525. VALUES (%s, %s, %s, %s, %s, %s)
  526. """, (dag_id, run_id, logical_date, local_logical_date, ds, plan_json))
  527. conn.commit()
  528. logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, exec_date={ds}")
  529. return True
  530. except Exception as e:
  531. logger.error(f"保存执行计划到数据库时出错: {str(e)}")
  532. conn.rollback()
  533. raise Exception(f"PostgreSQL保存执行计划失败: {str(e)}")
  534. finally:
  535. cursor.close()
  536. conn.close()
  537. except Exception as e:
  538. logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
  539. raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
  540. def generate_task_id(script_name, source_tables, target_table):
  541. """
  542. 根据脚本名和表名生成唯一任务ID
  543. 参数:
  544. script_name (str): 脚本文件名
  545. source_tables (list): 源表列表
  546. target_table (str): 目标表名
  547. 返回:
  548. str: 唯一的任务ID
  549. """
  550. # 移除脚本名的文件扩展名
  551. script_base = os.path.splitext(script_name)[0]
  552. # 对于特殊脚本如load_file.py,直接使用目标表名
  553. if script_name.lower() in ['load_file.py']:
  554. return f"{script_base}_{target_table}"
  555. # 处理源表部分
  556. if source_tables:
  557. # 将所有源表按字母顺序排序并连接
  558. source_part = "_".join(sorted(source_tables))
  559. # 生成任务ID: 脚本名_源表_to_目标表
  560. return f"{script_base}_{source_part}_to_{target_table}"
  561. else:
  562. # 没有源表时,只使用脚本名和目标表
  563. return f"{script_base}_{target_table}"
  564. def prepare_scripts_from_tables(tables_info):
  565. """
  566. 将表信息转换为脚本信息
  567. 参数:
  568. tables_info (list): 表信息列表
  569. 返回:
  570. list: 脚本信息列表
  571. """
  572. scripts = []
  573. for table in tables_info:
  574. target_table = table['target_table']
  575. target_table_label = table.get('target_table_label')
  576. schedule_frequency = table.get('schedule_frequency')
  577. # 处理表的脚本信息
  578. if 'scripts_info' in table and table['scripts_info']:
  579. # 表有多个脚本
  580. for script_name, script_info in table['scripts_info'].items():
  581. source_tables = script_info.get('sources', [])
  582. script_type = script_info.get('script_type', 'python')
  583. update_mode = script_info.get('script_exec_mode', 'append')
  584. # 生成任务ID
  585. task_id = generate_task_id(script_name, source_tables, target_table)
  586. # 创建脚本信息
  587. script = {
  588. "script_id": task_id,
  589. "script_name": script_name,
  590. "source_tables": source_tables,
  591. "target_table": target_table,
  592. "target_table_label": target_table_label,
  593. "script_type": script_type,
  594. "update_mode": update_mode,
  595. "schedule_frequency": schedule_frequency,
  596. "task_id": task_id
  597. }
  598. # 为structure类型添加特殊属性
  599. if table.get('target_type') == "structure":
  600. script["target_type"] = "structure"
  601. script["storage_location"] = table.get('storage_location')
  602. scripts.append(script)
  603. logger.info(f"为表 {target_table} 创建脚本 {script_name},任务ID: {task_id}")
  604. else:
  605. # 表只有单个脚本或没有明确指定脚本信息
  606. script_name = table.get('script_name')
  607. # 如果没有script_name,使用默认值
  608. if not script_name:
  609. script_name = f"{target_table}_script.py"
  610. logger.warning(f"表 {target_table} 没有指定脚本名,使用默认值: {script_name}")
  611. source_tables = table.get('source_tables', [])
  612. script_type = table.get('script_type', 'python')
  613. update_mode = table.get('update_mode', 'append')
  614. # 生成任务ID
  615. task_id = generate_task_id(script_name, source_tables, target_table)
  616. # 创建脚本信息
  617. script = {
  618. "script_id": task_id,
  619. "script_name": script_name,
  620. "source_tables": source_tables,
  621. "target_table": target_table,
  622. "target_table_label": target_table_label,
  623. "script_type": script_type,
  624. "update_mode": update_mode,
  625. "schedule_frequency": schedule_frequency,
  626. "task_id": task_id
  627. }
  628. # 为structure类型添加特殊属性
  629. if table.get('target_type') == "structure":
  630. script["target_type"] = "structure"
  631. script["storage_location"] = table.get('storage_location')
  632. scripts.append(script)
  633. logger.info(f"为表 {target_table} 创建脚本 {script_name},任务ID: {task_id}")
  634. return scripts
  635. def build_script_dependency_graph(scripts):
  636. """
  637. 处理脚本间的依赖关系
  638. 参数:
  639. scripts (list): 脚本信息列表
  640. 返回:
  641. tuple: (依赖关系字典, 图对象)
  642. """
  643. # 打印所有脚本的源表信息,用于调试
  644. logger.info("构建脚本依赖图,当前脚本信息:")
  645. for script in scripts:
  646. script_id = script['script_id']
  647. script_name = script['script_name']
  648. target_table = script['target_table']
  649. source_tables = script['source_tables']
  650. logger.info(f"脚本: {script_id} ({script_name}), 目标表: {target_table}, 源表: {source_tables}")
  651. # 创建目标表到脚本ID的映射
  652. table_to_scripts = {}
  653. for script in scripts:
  654. target_table = script['target_table']
  655. if target_table not in table_to_scripts:
  656. table_to_scripts[target_table] = []
  657. table_to_scripts[target_table].append(script['script_id'])
  658. # 记录表到脚本的映射关系
  659. logger.info("表到脚本的映射关系:")
  660. for table, script_ids in table_to_scripts.items():
  661. logger.info(f"表 {table} 由脚本 {script_ids} 生成")
  662. # 创建脚本依赖关系
  663. script_dependencies = {}
  664. for script in scripts:
  665. script_id = script['script_id']
  666. source_tables = script['source_tables']
  667. target_table = script['target_table']
  668. # 初始化依赖列表
  669. script_dependencies[script_id] = []
  670. # 查找源表对应的脚本
  671. if source_tables:
  672. logger.info(f"处理脚本 {script_id} 的依赖关系,源表: {source_tables}")
  673. for source_table in source_tables:
  674. if source_table in table_to_scripts:
  675. # 添加所有生成源表的脚本作为依赖
  676. for source_script_id in table_to_scripts[source_table]:
  677. if source_script_id != script_id: # 避免自我依赖
  678. script_dependencies[script_id].append(source_script_id)
  679. logger.info(f"添加依赖: {script_id} 依赖于 {source_script_id} (表 {target_table} 依赖于表 {source_table})")
  680. else:
  681. logger.warning(f"源表 {source_table} 没有对应的脚本,无法为脚本 {script_id} 创建依赖")
  682. else:
  683. logger.info(f"脚本 {script_id} 没有源表依赖")
  684. # 尝试从Neo4j额外查询依赖关系(如果脚本没有显式的source_tables)
  685. try:
  686. driver = get_neo4j_driver()
  687. except Exception as e:
  688. logger.error(f"连接Neo4j数据库失败: {str(e)}")
  689. raise Exception(f"无法连接Neo4j数据库: {str(e)}")
  690. try:
  691. with driver.session() as session:
  692. # 验证连接
  693. try:
  694. test_result = session.run("RETURN 1 as test")
  695. test_record = test_result.single()
  696. if not test_record or test_record.get("test") != 1:
  697. logger.error("Neo4j连接测试失败")
  698. raise Exception("Neo4j连接测试失败")
  699. except Exception as e:
  700. logger.error(f"Neo4j连接测试失败: {str(e)}")
  701. raise Exception(f"Neo4j连接测试失败: {str(e)}")
  702. for script in scripts:
  703. script_id = script['script_id']
  704. target_table = script['target_table']
  705. # 只处理没有源表的脚本
  706. if not script['source_tables'] and not script_dependencies[script_id]:
  707. logger.info(f"脚本 {script_id} 没有源表,尝试从Neo4j直接查询表 {target_table} 的依赖")
  708. # 查询表的直接依赖
  709. query = """
  710. MATCH (target {en_name: $table_name})-[rel]->(dep)
  711. RETURN dep.en_name AS dep_name
  712. """
  713. try:
  714. result = session.run(query, table_name=target_table)
  715. records = list(result)
  716. for record in records:
  717. dep_name = record.get("dep_name")
  718. if dep_name and dep_name in table_to_scripts:
  719. for dep_script_id in table_to_scripts[dep_name]:
  720. if dep_script_id != script_id: # 避免自我依赖
  721. script_dependencies[script_id].append(dep_script_id)
  722. logger.info(f"从Neo4j添加额外依赖: {script_id} 依赖于 {dep_script_id} (表 {target_table} 依赖于表 {dep_name})")
  723. except Exception as e:
  724. logger.warning(f"从Neo4j查询表 {target_table} 依赖时出错: {str(e)}")
  725. raise Exception(f"Neo4j查询表依赖失败: {str(e)}")
  726. except Exception as e:
  727. if "Neo4j" in str(e):
  728. # 已经处理过的错误,直接抛出
  729. raise
  730. else:
  731. logger.error(f"访问Neo4j获取额外依赖时出错: {str(e)}")
  732. raise Exception(f"Neo4j依赖查询失败: {str(e)}")
  733. finally:
  734. driver.close()
  735. # 构建依赖图
  736. G = nx.DiGraph()
  737. # 添加所有脚本作为节点
  738. for script in scripts:
  739. G.add_node(script['script_id'])
  740. # 添加依赖边
  741. for script_id, dependencies in script_dependencies.items():
  742. if dependencies:
  743. for dep_id in dependencies:
  744. # 添加从script_id到dep_id的边,表示script_id依赖于dep_id
  745. G.add_edge(script_id, dep_id)
  746. logger.debug(f"添加依赖边: {script_id} -> {dep_id}")
  747. else:
  748. logger.info(f"脚本 {script_id} 没有依赖的上游脚本")
  749. # 确保所有脚本ID都在依赖关系字典中
  750. for script in scripts:
  751. script_id = script['script_id']
  752. if script_id not in script_dependencies:
  753. script_dependencies[script_id] = []
  754. # 记录每个脚本的依赖数量
  755. for script_id, deps in script_dependencies.items():
  756. logger.info(f"脚本 {script_id} 有 {len(deps)} 个依赖: {deps}")
  757. return script_dependencies, G
  758. def optimize_script_execution_order(scripts, script_dependencies, G):
  759. """
  760. 使用NetworkX优化脚本执行顺序
  761. 参数:
  762. scripts (list): 脚本信息列表
  763. script_dependencies (dict): 脚本依赖关系字典
  764. G (nx.DiGraph): 依赖图对象
  765. 返回:
  766. list: 优化后的脚本执行顺序(脚本ID列表)
  767. """
  768. # 检查是否有循环依赖
  769. try:
  770. cycles = list(nx.simple_cycles(G))
  771. if cycles:
  772. logger.warning(f"检测到循环依赖: {cycles}")
  773. # 处理循环依赖,可以通过删除一些边来打破循环
  774. for cycle in cycles:
  775. # 选择一条边删除,这里简单地选择第一条边
  776. if len(cycle) > 1:
  777. G.remove_edge(cycle[0], cycle[1])
  778. logger.warning(f"删除边 {cycle[0]} -> {cycle[1]} 以打破循环")
  779. except Exception as e:
  780. logger.error(f"检测循环依赖时出错: {str(e)}")
  781. # 使用拓扑排序获取执行顺序
  782. try:
  783. # 反转图,因为我们的边表示"依赖于"关系,而拓扑排序需要"优先于"关系
  784. reverse_G = G.reverse()
  785. execution_order = list(nx.topological_sort(reverse_G))
  786. # 反转结果,使上游任务先执行
  787. execution_order.reverse()
  788. logger.info(f"生成优化的脚本执行顺序: {execution_order}")
  789. return execution_order
  790. except Exception as e:
  791. logger.error(f"生成脚本执行顺序时出错: {str(e)}")
  792. # 出错时返回原始脚本ID列表,不进行优化
  793. return [script['script_id'] for script in scripts]
  794. def create_execution_plan(**kwargs):
  795. """
  796. 创建或获取执行计划
  797. """
  798. try:
  799. dag_run = kwargs.get('dag_run')
  800. logical_date = dag_run.logical_date
  801. exec_date, local_logical_date = get_cn_exec_date(logical_date)
  802. logger.info(f"This DAG run was triggered via: {dag_run.run_type}")
  803. # 检查是否是手动触发
  804. if dag_run.external_trigger:
  805. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  806. # 记录重要的时间参数
  807. logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}")
  808. # 从XCom获取执行计划
  809. execution_plan = kwargs['ti'].xcom_pull(task_ids='check_execution_plan', key='execution_plan')
  810. # 如果找不到执行计划,则从数据库获取
  811. if not execution_plan:
  812. logger.info(f"未从XCom中找到执行计划,从数据库获取。使用执行日期: {exec_date}")
  813. execution_plan = get_execution_plan_from_db(exec_date)
  814. if not execution_plan:
  815. logger.error(f"执行日期 {exec_date} 没有找到执行计划")
  816. return None
  817. # 验证执行计划结构
  818. scripts = execution_plan.get("scripts", [])
  819. script_dependencies = execution_plan.get("script_dependencies", {})
  820. execution_order = execution_plan.get("execution_order", [])
  821. # 如果执行计划中没有execution_order或为空,使用NetworkX优化
  822. if not execution_order:
  823. logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
  824. execution_order = optimize_script_execution_order(scripts, script_dependencies)
  825. execution_plan["execution_order"] = execution_order
  826. # 保存完整的执行计划到XCom
  827. kwargs['ti'].xcom_push(key='full_execution_plan', value=execution_plan)
  828. logger.info(f"成功处理执行计划,包含 {len(scripts)} 个脚本")
  829. return execution_plan
  830. except Exception as e:
  831. logger.error(f"创建执行计划时出错: {str(e)}")
  832. import traceback
  833. logger.error(traceback.format_exc())
  834. return None
  835. # 创建DAG
  836. with DAG(
  837. "dataops_productline_execute_dag",
  838. start_date=datetime(2024, 1, 1),
  839. schedule_interval="@daily", # 设置为每日调度
  840. catchup=False,
  841. default_args={
  842. 'owner': 'airflow',
  843. 'depends_on_past': False,
  844. 'email_on_failure': False,
  845. 'email_on_retry': False,
  846. 'retries': 1,
  847. 'retry_delay': timedelta(minutes=5)
  848. },
  849. params={"TRIGGERED_VIA_UI": True},# 触发 UI 弹出配置页面
  850. ) as dag:
  851. # 记录DAG实例化时的重要信息
  852. now = datetime.now()
  853. now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
  854. default_exec_date = get_today_date()
  855. logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期(用于初始化,非实际执行日期): {default_exec_date}")
  856. #############################################
  857. # 准备阶段: 检查并创建执行计划
  858. #############################################
  859. with TaskGroup("prepare_phase") as prepare_group:
  860. # 检查执行计划是否存在
  861. check_plan = ShortCircuitOperator(
  862. task_id="check_execution_plan",
  863. python_callable=check_execution_plan,
  864. provide_context=True
  865. )
  866. # 创建执行计划
  867. create_plan = PythonOperator(
  868. task_id="create_execution_plan",
  869. python_callable=create_execution_plan,
  870. provide_context=True
  871. )
  872. # 设置任务依赖
  873. check_plan >> create_plan
  874. #############################################
  875. # 执行阶段: 按依赖关系执行脚本
  876. #############################################
  877. with TaskGroup("execution_phase") as execution_group:
  878. try:
  879. # 获取当前DAG的执行日期
  880. exec_date = get_today_date() # 使用当天日期作为默认值
  881. logger.info(f"当前DAG执行日期 ds={exec_date},尝试从数据库获取执行计划")
  882. # 从数据库获取执行计划
  883. execution_plan = get_execution_plan_from_db(exec_date)
  884. # 检查是否成功获取到执行计划
  885. if execution_plan is None:
  886. error_msg = f"无法从数据库获取有效的执行计划,当前DAG exec_date={exec_date}"
  887. logger.error(error_msg)
  888. # 使用全局变量而不是异常来强制DAG失败
  889. raise ValueError(error_msg)
  890. # 提取信息
  891. exec_date = execution_plan.get("exec_date", exec_date)
  892. scripts = execution_plan.get("scripts", [])
  893. script_dependencies = execution_plan.get("script_dependencies", {})
  894. execution_order = execution_plan.get("execution_order", [])
  895. # 如果执行计划中没有execution_order或为空,使用NetworkX优化
  896. if not execution_order:
  897. logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
  898. execution_order = optimize_script_execution_order(scripts, script_dependencies, nx.DiGraph())
  899. logger.info(f"执行计划: exec_date={exec_date}, scripts数量={len(scripts)}")
  900. # 如果执行计划为空(没有脚本),也应该失败
  901. if not scripts:
  902. error_msg = f"执行计划中没有任何脚本,当前DAG exec_date={exec_date}"
  903. logger.error(error_msg)
  904. raise ValueError(error_msg)
  905. # 1. 创建开始和结束任务
  906. start_execution = EmptyOperator(
  907. task_id="start_execution"
  908. )
  909. execution_completed = EmptyOperator(
  910. task_id="execution_completed",
  911. trigger_rule="none_failed_min_one_success" # 只要有一个任务成功且没有失败的任务就标记为完成
  912. )
  913. # 创建脚本任务字典,用于管理任务依赖
  914. task_dict = {}
  915. # 2. 先创建所有脚本任务,不设置依赖关系
  916. for script in scripts:
  917. script_id = script['script_id']
  918. script_name = script.get("script_name")
  919. target_table = script.get("target_table")
  920. script_type = script.get("script_type", "python")
  921. update_mode = script.get("update_mode", "append")
  922. source_tables = script.get("source_tables", [])
  923. target_table_label = script.get("target_table_label", "")
  924. # 使用描述性的任务ID,包含脚本名称和目标表
  925. # 提取文件名
  926. if "/" in script_name:
  927. script_file = script_name.split("/")[-1] # 获取文件名部分
  928. else:
  929. script_file = script_name
  930. # 确保任务ID不包含不允许的特殊字符
  931. safe_script_name = script_file.replace(" ", "_")
  932. safe_target_table = target_table.replace("-", "_").replace(" ", "_")
  933. # 按照指定格式创建任务ID
  934. task_id = f"{safe_script_name}-TO-{safe_target_table}"
  935. # 构建op_kwargs参数
  936. op_kwargs = {
  937. "script_id": script_id,
  938. "script_name": script_name,
  939. "target_table": target_table,
  940. "update_mode": update_mode,
  941. "source_tables": source_tables,
  942. "schedule_frequency": script.get("schedule_frequency", "daily"),
  943. "target_table_label": target_table_label,
  944. # logical_date会在任务执行时由Airflow自动添加
  945. }
  946. # 添加特殊参数(如果有)
  947. for key in ['target_type', 'storage_location']:
  948. if key in script and script[key] is not None:
  949. op_kwargs[key] = script[key]
  950. # 根据脚本类型和目标表标签选择执行函数
  951. if script_type.lower() == 'sql' and target_table_label == 'DataModel':
  952. # 使用SQL脚本执行函数
  953. logger.info(f"脚本 {script_id} 是SQL类型且目标表标签为DataModel,使用execute_sql函数执行")
  954. python_callable = execute_sql
  955. elif script_type.lower() == 'python' and target_table_label == 'DataModel':
  956. # 使用Python脚本执行函数
  957. logger.info(f"脚本 {script_id} 是Python类型且目标表标签为DataModel,使用execute_python函数执行")
  958. python_callable = execute_python
  959. elif script_type.lower() == 'python_script':
  960. # 使用Python脚本文件执行函数
  961. logger.info(f"脚本 {script_id} 是python_script类型,使用execute_python_script函数执行")
  962. python_callable = execute_python_script
  963. else:
  964. # 默认使用Python脚本文件执行函数
  965. logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
  966. python_callable = execute_python_script
  967. # 创建任务
  968. script_task = PythonOperator(
  969. task_id=task_id,
  970. python_callable=python_callable,
  971. op_kwargs=op_kwargs,
  972. retries=TASK_RETRY_CONFIG["retries"],
  973. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  974. )
  975. # 将任务添加到字典
  976. task_dict[script_id] = script_task
  977. # 3. 设置开始任务与所有无依赖的脚本任务的关系
  978. no_dep_scripts = []
  979. for script_id, dependencies in script_dependencies.items():
  980. if not dependencies: # 如果没有依赖
  981. if script_id in task_dict:
  982. no_dep_scripts.append(script_id)
  983. start_execution >> task_dict[script_id]
  984. logger.info(f"设置无依赖脚本: start_execution >> {script_id}")
  985. # 4. 设置脚本间的依赖关系
  986. for script_id, dependencies in script_dependencies.items():
  987. for dep_id in dependencies:
  988. if script_id in task_dict and dep_id in task_dict:
  989. # 正确的依赖关系:依赖任务 >> 当前任务
  990. task_dict[dep_id] >> task_dict[script_id]
  991. logger.info(f"设置脚本依赖: {dep_id} >> {script_id}")
  992. # 5. 找出所有叶子节点(没有下游任务的节点)并连接到execution_completed
  993. # 首先,构建一个下游节点集合
  994. has_downstream = set()
  995. for script_id, dependencies in script_dependencies.items():
  996. for dep_id in dependencies:
  997. has_downstream.add(dep_id)
  998. # 然后,找出没有下游节点的任务
  999. leaf_nodes = []
  1000. for script_id in task_dict:
  1001. if script_id not in has_downstream:
  1002. leaf_nodes.append(script_id)
  1003. task_dict[script_id] >> execution_completed
  1004. logger.info(f"将叶子节点连接到completion: {script_id} >> execution_completed")
  1005. # 如果没有找到叶子节点,则将所有任务都连接到completion
  1006. if not leaf_nodes:
  1007. logger.warning("未找到叶子节点,将所有任务连接到completion")
  1008. for script_id, task in task_dict.items():
  1009. task >> execution_completed
  1010. # 设置TaskGroup与prepare_phase的依赖关系
  1011. prepare_group >> start_execution
  1012. logger.info(f"成功创建 {len(task_dict)} 个脚本执行任务")
  1013. except Exception as e:
  1014. logger.error(f"加载执行计划或创建任务时出错: {str(e)}")
  1015. import traceback
  1016. logger.error(traceback.format_exc())
  1017. # 添加触发finalize DAG的任务
  1018. from airflow.operators.trigger_dagrun import TriggerDagRunOperator
  1019. trigger_finalize_dag = TriggerDagRunOperator(
  1020. task_id="trigger_finalize_dag",
  1021. trigger_dag_id="dataops_productline_finalize_dag",
  1022. conf={"execution_date": "{{ ds }}", "parent_execution_date": "{{ execution_date }}", "parent_run_id": "{{ run_id }}"},
  1023. reset_dag_run=True,
  1024. wait_for_completion=False,
  1025. poke_interval=60,
  1026. )
  1027. # 设置依赖关系,确保执行阶段完成后触发finalize DAG
  1028. execution_group >> trigger_finalize_dag
  1029. logger.info(f"DAG dataops_productline_execute_dag 定义完成")