dag_data_model_daily.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # dag_data_model_daily.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.sensors.external_task import ExternalTaskSensor
  5. from datetime import datetime
  6. from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph
  7. from config import NEO4J_CONFIG
  8. import pendulum
  9. import logging
  10. # 创建日志记录器
  11. logger = logging.getLogger(__name__)
  12. with DAG("dag_data_model_daily", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  13. logger.info("初始化 dag_data_model_daily DAG")
  14. # 等待资源表 DAG 完成
  15. wait_for_resource = ExternalTaskSensor(
  16. task_id="wait_for_data_resource",
  17. external_dag_id="dag_data_resource",
  18. external_task_id=None,
  19. mode="poke",
  20. timeout=3600,
  21. poke_interval=30
  22. )
  23. logger.info("创建资源表等待任务 - wait_for_data_resource")
  24. # 获取启用的 daily 模型表
  25. try:
  26. enabled_tables = get_enabled_tables("daily")
  27. model_tables = [t for t in enabled_tables if is_data_model_table(t['table_name'])]
  28. logger.info(f"获取到 {len(model_tables)} 个启用的 daily 模型表")
  29. except Exception as e:
  30. logger.error(f"获取 daily 模型表时出错: {str(e)}")
  31. raise
  32. # 获取依赖图
  33. try:
  34. table_names = [t['table_name'] for t in model_tables]
  35. dependency_graph = get_model_dependency_graph(table_names)
  36. logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
  37. except Exception as e:
  38. logger.error(f"构建依赖关系图时出错: {str(e)}")
  39. raise
  40. # 构建 task 对象
  41. task_dict = {}
  42. for item in model_tables:
  43. try:
  44. task = PythonOperator(
  45. task_id=f"process_model_{item['table_name']}",
  46. python_callable=run_model_script,
  47. op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
  48. )
  49. task_dict[item['table_name']] = task
  50. logger.info(f"创建模型处理任务: process_model_{item['table_name']}")
  51. except Exception as e:
  52. logger.error(f"创建任务 process_model_{item['table_name']} 时出错: {str(e)}")
  53. raise
  54. # 建立任务依赖(基于 DERIVED_FROM 图)
  55. dependency_count = 0
  56. for target, upstream_list in dependency_graph.items():
  57. for upstream in upstream_list:
  58. if upstream in task_dict and target in task_dict:
  59. task_dict[upstream] >> task_dict[target]
  60. dependency_count += 1
  61. logger.debug(f"建立依赖关系: {upstream} >> {target}")
  62. else:
  63. logger.warning(f"无法建立依赖关系,缺少任务: {upstream} 或 {target}")
  64. logger.info(f"总共建立了 {dependency_count} 个任务依赖关系")
  65. # 最顶层的 task(没有任何上游)需要依赖资源任务完成
  66. all_upstreams = set()
  67. for upstreams in dependency_graph.values():
  68. all_upstreams.update(upstreams)
  69. top_level_tasks = [t for t in table_names if t not in all_upstreams]
  70. if top_level_tasks:
  71. logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
  72. for name in top_level_tasks:
  73. wait_for_resource >> task_dict[name]
  74. else:
  75. logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")