dataops_productline_execute_dag.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636
  1. """
  2. 统一数据产品线执行器 DAG
  3. 功能:
  4. 1. 面向脚本的作业编排,不再是面向表
  5. 2. 基于dataops_productline_prepare_dag生成的执行计划执行脚本
  6. 3. 支持对脚本执行顺序的优化
  7. 4. 提供详细的执行日志和错误处理
  8. """
  9. from airflow import DAG
  10. from airflow.operators.python import PythonOperator, ShortCircuitOperator
  11. from airflow.operators.empty import EmptyOperator
  12. from airflow.utils.task_group import TaskGroup
  13. from datetime import datetime, timedelta, date
  14. import logging
  15. import networkx as nx
  16. import json
  17. import os
  18. import pendulum
  19. from decimal import Decimal
  20. from common import (
  21. get_pg_conn,
  22. get_neo4j_driver,
  23. get_today_date
  24. )
  25. from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
  26. import pytz
  27. # 创建日志记录器
  28. logger = logging.getLogger(__name__)
  29. # 开启详细诊断日志记录
  30. ENABLE_DEBUG_LOGGING = True
  31. def log_debug(message):
  32. """记录调试日志,但只在启用调试模式时"""
  33. if ENABLE_DEBUG_LOGGING:
  34. logger.info(f"[DEBUG] {message}")
  35. # 在DAG启动时输出诊断信息
  36. log_debug("======== 诊断信息 ========")
  37. log_debug(f"当前工作目录: {os.getcwd()}")
  38. log_debug(f"SCRIPTS_BASE_PATH: {SCRIPTS_BASE_PATH}")
  39. log_debug(f"导入的common模块路径: {get_pg_conn.__module__}")
  40. #############################################
  41. # 通用工具函数
  42. #############################################
  43. def json_serial(obj):
  44. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  45. if isinstance(obj, (datetime, date)):
  46. return obj.isoformat()
  47. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  48. # 添加自定义JSON编码器解决Decimal序列化问题
  49. class DecimalEncoder(json.JSONEncoder):
  50. def default(self, obj):
  51. if isinstance(obj, Decimal):
  52. return float(obj)
  53. # 处理日期类型
  54. elif isinstance(obj, (datetime, date)):
  55. return obj.isoformat()
  56. # 让父类处理其他类型
  57. return super(DecimalEncoder, self).default(obj)
  58. #############################################
  59. # 脚本执行函数
  60. #############################################
  61. def execute_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
  62. """
  63. 执行单个脚本并返回执行结果
  64. 参数:
  65. script_id: 脚本ID
  66. script_name: 脚本文件名
  67. target_table: 目标表名
  68. exec_date: 执行日期
  69. script_exec_mode: 执行模式
  70. **kwargs: 其他参数,如source_tables、target_type等
  71. 返回:
  72. bool: 脚本执行结果
  73. """
  74. # 添加详细日志
  75. logger.info(f"===== 开始执行脚本 {script_id} =====")
  76. logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
  77. logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
  78. logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
  79. logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
  80. logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
  81. # 记录额外参数
  82. for key, value in kwargs.items():
  83. logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
  84. # 检查script_name是否为空
  85. if not script_name:
  86. logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
  87. return False
  88. # 记录执行开始时间
  89. start_time = datetime.now()
  90. try:
  91. # 导入和执行脚本模块
  92. import importlib.util
  93. import sys
  94. script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
  95. if not os.path.exists(script_path):
  96. logger.error(f"脚本文件不存在: {script_path}")
  97. return False
  98. # 动态导入模块
  99. spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
  100. module = importlib.util.module_from_spec(spec)
  101. spec.loader.exec_module(module)
  102. # 检查并调用标准入口函数run
  103. if hasattr(module, "run"):
  104. logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
  105. # 构建完整的参数字典
  106. run_params = {
  107. "table_name": target_table,
  108. "execution_mode": script_exec_mode,
  109. "exec_date": exec_date
  110. }
  111. ## 添加可能的额外参数
  112. for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
  113. if key in kwargs and kwargs[key] is not None:
  114. run_params[key] = kwargs[key]
  115. # 调用脚本的run函数
  116. logger.info(f"调用run函数并传递参数: {run_params}")
  117. result = module.run(**run_params)
  118. logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
  119. # 确保result是布尔值
  120. if result is None:
  121. logger.warning(f"脚本返回值为None,转换为False")
  122. result = False
  123. elif not isinstance(result, bool):
  124. original_result = result
  125. result = bool(result)
  126. logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
  127. # 记录结束时间和结果
  128. end_time = datetime.now()
  129. duration = (end_time - start_time).total_seconds()
  130. logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
  131. return result
  132. else:
  133. logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
  134. return False
  135. except Exception as e:
  136. # a处理异常
  137. logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
  138. end_time = datetime.now()
  139. duration = (end_time - start_time).total_seconds()
  140. logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
  141. logger.info(f"===== 脚本执行异常结束 =====")
  142. import traceback
  143. logger.error(traceback.format_exc())
  144. # 确保不会阻塞DAG
  145. return False
  146. #############################################
  147. # 执行计划获取和处理函数
  148. #############################################
  149. def get_execution_plan_from_db(ds):
  150. """
  151. 从数据库获取产品线执行计划
  152. 参数:
  153. ds (str): 执行日期,格式为'YYYY-MM-DD'
  154. 返回:
  155. dict: 执行计划字典,如果找不到则返回None
  156. """
  157. # 记录输入参数详细信息
  158. if isinstance(ds, datetime):
  159. if ds.tzinfo:
  160. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 带时区: {ds.tzinfo}")
  161. else:
  162. logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 无时区")
  163. else:
  164. logger.debug(f"【执行日期】get_execution_plan_from_db接收到: {ds}, 类型: {type(ds)}")
  165. logger.info(f"尝试从数据库获取执行日期 {ds} 的产品线执行计划")
  166. conn = get_pg_conn()
  167. cursor = conn.cursor()
  168. execution_plan = None
  169. try:
  170. # 查询条件a: 当前日期=表的exec_date,如果有多条记录,取logical_date最大的一条
  171. cursor.execute("""
  172. SELECT plan
  173. FROM airflow_exec_plans
  174. WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date = %s
  175. ORDER BY logical_date DESC
  176. LIMIT 1
  177. """, (ds,))
  178. result = cursor.fetchone()
  179. if result:
  180. # 获取计划
  181. plan_json = result[0]
  182. # 处理plan_json可能已经是dict的情况
  183. if isinstance(plan_json, dict):
  184. execution_plan = plan_json
  185. else:
  186. execution_plan = json.loads(plan_json)
  187. logger.info(f"找到当前日期 exec_date={ds} 的执行计划记录")
  188. return execution_plan
  189. # 查询条件b: 找不到当前日期的记录,查找exec_date<当前ds的最新记录
  190. logger.info(f"未找到当前日期 exec_date={ds} 的执行计划记录,尝试查找历史记录")
  191. cursor.execute("""
  192. SELECT plan, exec_date
  193. FROM airflow_exec_plans
  194. WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date < %s
  195. ORDER BY exec_date DESC, logical_date DESC
  196. LIMIT 1
  197. """, (ds,))
  198. result = cursor.fetchone()
  199. if result:
  200. # 获取计划和exec_date
  201. plan_json, plan_ds = result
  202. # 处理plan_json可能已经是dict的情况
  203. if isinstance(plan_json, dict):
  204. execution_plan = plan_json
  205. else:
  206. execution_plan = json.loads(plan_json)
  207. logger.info(f"找到历史执行计划记录,exec_date: {plan_ds}")
  208. return execution_plan
  209. # 找不到任何执行计划记录
  210. logger.error(f"在数据库中未找到任何执行计划记录,当前DAG exec_date={ds}")
  211. return None
  212. except Exception as e:
  213. logger.error(f"从数据库获取执行计划时出错: {str(e)}")
  214. import traceback
  215. logger.error(traceback.format_exc())
  216. return None
  217. finally:
  218. cursor.close()
  219. conn.close()
  220. def check_execution_plan(**kwargs):
  221. """
  222. 检查执行计划是否存在且有效
  223. 返回False将阻止所有下游任务执行
  224. """
  225. dag_run = kwargs.get('dag_run')
  226. logical_date = dag_run.logical_date
  227. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  228. exec_date = local_logical_date.strftime('%Y-%m-%d')
  229. # 检查是否是手动触发
  230. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  231. if is_manual_trigger:
  232. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  233. # 记录重要的时间参数
  234. logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  235. logger.info("检查数据库中的执行计划是否存在且有效")
  236. # 从数据库获取执行计划
  237. execution_plan = get_execution_plan_from_db(exec_date)
  238. # 检查是否成功获取到执行计划
  239. if not execution_plan:
  240. logger.error(f"未找到执行日期 {exec_date} 的执行计划")
  241. return False
  242. # 检查执行计划是否包含必要字段
  243. if "exec_date" not in execution_plan:
  244. logger.error("执行计划缺少exec_date字段")
  245. return False
  246. if not isinstance(execution_plan.get("scripts", []), list):
  247. logger.error("执行计划的scripts字段无效")
  248. return False
  249. if not isinstance(execution_plan.get("script_dependencies", {}), dict):
  250. logger.error("执行计划的script_dependencies字段无效")
  251. return False
  252. # 检查是否有脚本数据
  253. scripts = execution_plan.get("scripts", [])
  254. if not scripts:
  255. logger.warning("执行计划不包含任何脚本")
  256. # 如果没有脚本,则阻止下游任务执行
  257. return False
  258. logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本")
  259. # 保存执行计划到XCom以便下游任务使用
  260. kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
  261. return True
  262. def optimize_execution_order(scripts, script_dependencies):
  263. """
  264. 使用NetworkX优化脚本执行顺序
  265. 参数:
  266. scripts (list): 脚本信息列表
  267. script_dependencies (dict): 脚本依赖关系字典
  268. 返回:
  269. list: 优化后的脚本执行顺序(脚本ID列表)
  270. """
  271. logger.info("开始使用NetworkX优化脚本执行顺序")
  272. # 构建依赖图
  273. G = nx.DiGraph()
  274. # 添加所有脚本作为节点
  275. for script in scripts:
  276. script_id = script['script_id']
  277. G.add_node(script_id)
  278. # 添加依赖边
  279. for script_id, dependencies in script_dependencies.items():
  280. for dep_id in dependencies:
  281. # 添加从script_id到dep_id的边,表示script_id依赖于dep_id
  282. G.add_edge(script_id, dep_id)
  283. logger.debug(f"添加依赖边: {script_id} -> {dep_id}")
  284. # 检查是否有循环依赖
  285. try:
  286. cycles = list(nx.simple_cycles(G))
  287. if cycles:
  288. logger.warning(f"检测到循环依赖: {cycles}")
  289. # 处理循环依赖,可以通过删除一些边来打破循环
  290. for cycle in cycles:
  291. # 选择一条边删除,这里简单地选择第一条边
  292. if len(cycle) > 1:
  293. G.remove_edge(cycle[0], cycle[1])
  294. logger.warning(f"删除边 {cycle[0]} -> {cycle[1]} 以打破循环")
  295. except Exception as e:
  296. logger.error(f"检测循环依赖时出错: {str(e)}")
  297. # 使用拓扑排序获取执行顺序
  298. try:
  299. # 反转图,因为我们的边表示"依赖于"关系,而拓扑排序需要"优先于"关系
  300. reverse_G = G.reverse()
  301. execution_order = list(nx.topological_sort(reverse_G))
  302. # 反转结果,使上游任务先执行
  303. execution_order.reverse()
  304. logger.info(f"NetworkX优化后的脚本执行顺序: {execution_order}")
  305. return execution_order
  306. except Exception as e:
  307. logger.error(f"生成脚本执行顺序时出错: {str(e)}")
  308. # 出错时返回原始脚本ID列表,不进行优化
  309. return [script['script_id'] for script in scripts]
  310. def create_execution_plan(**kwargs):
  311. """
  312. 创建或获取执行计划
  313. """
  314. try:
  315. dag_run = kwargs.get('dag_run')
  316. logical_date = dag_run.logical_date
  317. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  318. exec_date = local_logical_date.strftime('%Y-%m-%d')
  319. # 检查是否是手动触发
  320. is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
  321. if is_manual_trigger:
  322. logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
  323. # 记录重要的时间参数
  324. logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  325. # 从XCom获取执行计划
  326. execution_plan = kwargs['ti'].xcom_pull(task_ids='check_execution_plan', key='execution_plan')
  327. # 如果找不到执行计划,则从数据库获取
  328. if not execution_plan:
  329. logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
  330. execution_plan = get_execution_plan_from_db(exec_date)
  331. if not execution_plan:
  332. logger.error(f"执行日期 {exec_date} 没有找到执行计划")
  333. return None
  334. # 验证执行计划结构
  335. scripts = execution_plan.get("scripts", [])
  336. script_dependencies = execution_plan.get("script_dependencies", {})
  337. execution_order = execution_plan.get("execution_order", [])
  338. # 如果执行计划中没有execution_order或为空,使用NetworkX优化
  339. if not execution_order:
  340. logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
  341. execution_order = optimize_execution_order(scripts, script_dependencies)
  342. execution_plan["execution_order"] = execution_order
  343. # 保存完整的执行计划到XCom
  344. kwargs['ti'].xcom_push(key='full_execution_plan', value=execution_plan)
  345. logger.info(f"成功处理执行计划,包含 {len(scripts)} 个脚本")
  346. return execution_plan
  347. except Exception as e:
  348. logger.error(f"创建执行计划时出错: {str(e)}")
  349. import traceback
  350. logger.error(traceback.format_exc())
  351. return None
  352. # 创建DAG
  353. with DAG(
  354. "dataops_productline_execute_dag",
  355. start_date=datetime(2024, 1, 1),
  356. schedule_interval="@daily", # 设置为每日调度
  357. catchup=False,
  358. default_args={
  359. 'owner': 'airflow',
  360. 'depends_on_past': False,
  361. 'email_on_failure': False,
  362. 'email_on_retry': False,
  363. 'retries': 1,
  364. 'retry_delay': timedelta(minutes=5)
  365. },
  366. params={
  367. 'MANUAL_TRIGGER': False,
  368. }
  369. ) as dag:
  370. # 记录DAG实例化时的重要信息
  371. now = datetime.now()
  372. now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
  373. default_exec_date = get_today_date()
  374. logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
  375. #############################################
  376. # 准备阶段: 检查并创建执行计划
  377. #############################################
  378. with TaskGroup("prepare_phase") as prepare_group:
  379. # 检查执行计划是否存在
  380. check_plan = ShortCircuitOperator(
  381. task_id="check_execution_plan",
  382. python_callable=check_execution_plan,
  383. provide_context=True
  384. )
  385. # 创建执行计划
  386. create_plan = PythonOperator(
  387. task_id="create_execution_plan",
  388. python_callable=create_execution_plan,
  389. provide_context=True
  390. )
  391. # 设置任务依赖
  392. check_plan >> create_plan
  393. #############################################
  394. # 执行阶段: 按依赖关系执行脚本
  395. #############################################
  396. with TaskGroup("execution_phase") as execution_group:
  397. try:
  398. # 获取当前DAG的执行日期
  399. exec_date = get_today_date() # 使用当天日期作为默认值
  400. logger.info(f"当前DAG执行日期 ds={exec_date},尝试从数据库获取执行计划")
  401. # 从数据库获取执行计划
  402. execution_plan = get_execution_plan_from_db(exec_date)
  403. # 检查是否成功获取到执行计划
  404. if execution_plan is None:
  405. error_msg = f"无法从数据库获取有效的执行计划,当前DAG exec_date={exec_date}"
  406. logger.error(error_msg)
  407. # 使用全局变量而不是异常来强制DAG失败
  408. raise ValueError(error_msg)
  409. # 提取信息
  410. exec_date = execution_plan.get("exec_date", exec_date)
  411. scripts = execution_plan.get("scripts", [])
  412. script_dependencies = execution_plan.get("script_dependencies", {})
  413. execution_order = execution_plan.get("execution_order", [])
  414. # 如果执行计划中没有execution_order或为空,使用NetworkX优化
  415. if not execution_order:
  416. logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
  417. execution_order = optimize_execution_order(scripts, script_dependencies)
  418. logger.info(f"执行计划: exec_date={exec_date}, scripts数量={len(scripts)}")
  419. # 如果执行计划为空(没有脚本),也应该失败
  420. if not scripts:
  421. error_msg = f"执行计划中没有任何脚本,当前DAG exec_date={exec_date}"
  422. logger.error(error_msg)
  423. raise ValueError(error_msg)
  424. # 1. 创建开始和结束任务
  425. start_execution = EmptyOperator(
  426. task_id="start_execution"
  427. )
  428. execution_completed = EmptyOperator(
  429. task_id="execution_completed",
  430. trigger_rule="none_failed_min_one_success" # 只要有一个任务成功且没有失败的任务就标记为完成
  431. )
  432. # 创建脚本任务字典,用于管理任务依赖
  433. task_dict = {}
  434. # 2. 先创建所有脚本任务,不设置依赖关系
  435. for script in scripts:
  436. script_id = script['script_id']
  437. script_name = script.get("script_name")
  438. target_table = script.get("target_table")
  439. script_type = script.get("script_type", "python")
  440. script_exec_mode = script.get("script_exec_mode", "append")
  441. source_tables = script.get("source_tables", [])
  442. # 使用描述性的任务ID,包含脚本名称和目标表
  443. # 提取文件名
  444. if "/" in script_name:
  445. script_file = script_name.split("/")[-1] # 获取文件名部分
  446. else:
  447. script_file = script_name
  448. # 确保任务ID不包含不允许的特殊字符
  449. safe_script_name = script_file.replace(" ", "_")
  450. safe_target_table = target_table.replace("-", "_").replace(" ", "_")
  451. # 按照指定格式创建任务ID
  452. task_id = f"{safe_script_name}-TO-{safe_target_table}"
  453. # 构建op_kwargs参数
  454. op_kwargs = {
  455. "script_id": script_id,
  456. "script_name": script_name,
  457. "target_table": target_table,
  458. "exec_date": str(exec_date),
  459. "script_exec_mode": script_exec_mode,
  460. "source_tables": source_tables
  461. }
  462. # 添加特殊参数(如果有)
  463. for key in ['target_type', 'storage_location', 'frequency']:
  464. if key in script and script[key] is not None:
  465. op_kwargs[key] = script[key]
  466. # 创建任务
  467. script_task = PythonOperator(
  468. task_id=task_id,
  469. python_callable=execute_script,
  470. op_kwargs=op_kwargs,
  471. retries=TASK_RETRY_CONFIG["retries"],
  472. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  473. )
  474. # 将任务添加到字典
  475. task_dict[script_id] = script_task
  476. # 3. 设置开始任务与所有无依赖的脚本任务的关系
  477. no_dep_scripts = []
  478. for script_id, dependencies in script_dependencies.items():
  479. if not dependencies: # 如果没有依赖
  480. if script_id in task_dict:
  481. no_dep_scripts.append(script_id)
  482. start_execution >> task_dict[script_id]
  483. logger.info(f"设置无依赖脚本: start_execution >> {script_id}")
  484. # 4. 设置脚本间的依赖关系
  485. for script_id, dependencies in script_dependencies.items():
  486. for dep_id in dependencies:
  487. if script_id in task_dict and dep_id in task_dict:
  488. # 正确的依赖关系:依赖任务 >> 当前任务
  489. task_dict[dep_id] >> task_dict[script_id]
  490. logger.info(f"设置脚本依赖: {dep_id} >> {script_id}")
  491. # 5. 找出所有叶子节点(没有下游任务的节点)并连接到execution_completed
  492. # 首先,构建一个下游节点集合
  493. has_downstream = set()
  494. for script_id, dependencies in script_dependencies.items():
  495. for dep_id in dependencies:
  496. has_downstream.add(dep_id)
  497. # 然后,找出没有下游节点的任务
  498. leaf_nodes = []
  499. for script_id in task_dict:
  500. if script_id not in has_downstream:
  501. leaf_nodes.append(script_id)
  502. task_dict[script_id] >> execution_completed
  503. logger.info(f"将叶子节点连接到completion: {script_id} >> execution_completed")
  504. # 如果没有找到叶子节点,则将所有任务都连接到completion
  505. if not leaf_nodes:
  506. logger.warning("未找到叶子节点,将所有任务连接到completion")
  507. for script_id, task in task_dict.items():
  508. task >> execution_completed
  509. # 设置TaskGroup与prepare_phase的依赖关系
  510. prepare_group >> start_execution
  511. logger.info(f"成功创建 {len(task_dict)} 个脚本执行任务")
  512. except Exception as e:
  513. logger.error(f"加载执行计划或创建任务时出错: {str(e)}")
  514. import traceback
  515. logger.error(traceback.format_exc())
  516. # 添加触发finalize DAG的任务
  517. from airflow.operators.trigger_dagrun import TriggerDagRunOperator
  518. trigger_finalize_dag = TriggerDagRunOperator(
  519. task_id="trigger_finalize_dag",
  520. trigger_dag_id="dataops_productline_finalize_dag",
  521. conf={"execution_date": "{{ ds }}", "parent_execution_date": "{{ execution_date }}", "parent_run_id": "{{ run_id }}"},
  522. reset_dag_run=True,
  523. wait_for_completion=False,
  524. poke_interval=60,
  525. )
  526. # 设置依赖关系,确保执行阶段完成后触发finalize DAG
  527. execution_group >> trigger_finalize_dag
  528. logger.info(f"DAG dataops_productline_execute_dag 定义完成")