dag_data_model_weekly.py 1.2 KB

123456789101112131415161718192021222324252627282930313233
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from airflow.sensors.external_task import ExternalTaskSensor
  4. from datetime import datetime
  5. from utils import get_subscribed_tables, get_neo4j_dependencies
  6. import pendulum
  7. def process_weekly_model(table_name, execution_mode):
  8. deps = get_neo4j_dependencies(table_name)
  9. print(f"Processing weekly model for {table_name} with dependencies: {deps}, mode: {execution_mode}")
  10. def is_monday():
  11. return pendulum.now().day_of_week == 0
  12. with DAG("dag_data_model_weekly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  13. wait_for_daily = ExternalTaskSensor(
  14. task_id="wait_for_daily_model",
  15. external_dag_id="dag_data_model_daily",
  16. external_task_id=None,
  17. mode="poke",
  18. timeout=3600,
  19. poke_interval=30
  20. )
  21. if is_monday():
  22. weekly_tables = get_subscribed_tables('weekly')
  23. for item in weekly_tables:
  24. t = PythonOperator(
  25. task_id=f"process_weekly_{item['table_name']}",
  26. python_callable=process_weekly_model,
  27. op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
  28. )
  29. wait_for_daily >> t