123456789101112131415161718192021222324252627282930313233 |
- from airflow import DAG
- from airflow.operators.python import PythonOperator
- from airflow.sensors.external_task import ExternalTaskSensor
- from datetime import datetime
- from utils import get_subscribed_tables, get_neo4j_dependencies
- import pendulum
- def process_weekly_model(table_name, execution_mode):
- deps = get_neo4j_dependencies(table_name)
- print(f"Processing weekly model for {table_name} with dependencies: {deps}, mode: {execution_mode}")
- def is_monday():
- return pendulum.now().day_of_week == 0
- with DAG("dag_data_model_weekly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
- wait_for_daily = ExternalTaskSensor(
- task_id="wait_for_daily_model",
- external_dag_id="dag_data_model_daily",
- external_task_id=None,
- mode="poke",
- timeout=3600,
- poke_interval=30
- )
- if is_monday():
- weekly_tables = get_subscribed_tables('weekly')
- for item in weekly_tables:
- t = PythonOperator(
- task_id=f"process_weekly_{item['table_name']}",
- python_callable=process_weekly_model,
- op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
- )
- wait_for_daily >> t
|