dag_data_model_monthly.py 1.3 KB

123456789101112131415161718192021222324252627282930313233
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from airflow.sensors.external_task import ExternalTaskSensor
  4. from datetime import datetime
  5. from utils import get_subscribed_tables, get_neo4j_dependencies
  6. import pendulum
  7. def process_monthly_model(table_name, execution_mode):
  8. deps = get_neo4j_dependencies(table_name)
  9. print(f"Processing monthly model for {table_name} with dependencies: {deps}, mode: {execution_mode}")
  10. def is_first_day():
  11. return pendulum.now().day == 1
  12. with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  13. wait_for_weekly = ExternalTaskSensor(
  14. task_id="wait_for_weekly_model",
  15. external_dag_id="dag_data_model_weekly",
  16. external_task_id=None,
  17. mode="poke",
  18. timeout=3600,
  19. poke_interval=30
  20. )
  21. if is_first_day():
  22. monthly_tables = get_subscribed_tables('monthly')
  23. for item in monthly_tables:
  24. t = PythonOperator(
  25. task_id=f"process_monthly_{item['table_name']}",
  26. python_callable=process_monthly_model,
  27. op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
  28. )
  29. wait_for_weekly >> t