dag_data_model_yearly.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. # dag_data_model_yearly.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime
  7. from utils import (
  8. get_enabled_tables, is_data_model_table, run_model_script,
  9. get_model_dependency_graph, process_model_tables
  10. )
  11. from config import NEO4J_CONFIG
  12. import pendulum
  13. import logging
  14. # 创建日志记录器
  15. logger = logging.getLogger(__name__)
  16. def is_first_day_of_year():
  17. return True
  18. # 生产环境中应使用实际判断
  19. # return pendulum.now().month == 1 and pendulum.now().day == 1
  20. with DAG("dag_data_model_yearly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  21. logger.info("初始化 dag_data_model_yearly DAG")
  22. # 等待月模型 DAG 完成
  23. wait_for_monthly = ExternalTaskSensor(
  24. task_id="wait_for_monthly_model",
  25. external_dag_id="dag_data_model_monthly",
  26. external_task_id="monthly_processing_completed", # 指定完成标记任务
  27. mode="poke",
  28. timeout=3600,
  29. poke_interval=30
  30. )
  31. logger.info("创建月模型等待任务 - wait_for_monthly_model")
  32. # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
  33. yearly_completed = EmptyOperator(
  34. task_id="yearly_processing_completed",
  35. dag=dag
  36. )
  37. logger.info("创建任务完成标记 - yearly_processing_completed")
  38. # 检查今天是否是年初
  39. if is_first_day_of_year():
  40. logger.info("今天是年初,开始处理年模型")
  41. # 获取启用的 yearly 模型表
  42. try:
  43. enabled_tables = get_enabled_tables("yearly")
  44. # 使用公共函数处理模型表
  45. process_model_tables(enabled_tables, "yearly", wait_for_monthly, yearly_completed, dag)
  46. except Exception as e:
  47. logger.error(f"获取 yearly 模型表时出错: {str(e)}")
  48. # 出错时也要确保完成标记被触发
  49. wait_for_monthly >> yearly_completed
  50. raise
  51. else:
  52. # 如果不是年初,直接将等待任务与完成标记相连接,跳过处理
  53. logger.info("今天不是年初,跳过年模型处理")
  54. wait_for_monthly >> yearly_completed