|
@@ -11,7 +11,7 @@ import pendulum
|
|
|
import logging
|
|
|
import sys
|
|
|
from airflow.operators.empty import EmptyOperator
|
|
|
-from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, RESOURCE_LOADING_PARALLEL_DEGREE
|
|
|
+from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH
|
|
|
from neo4j import GraphDatabase
|
|
|
|
|
|
# 创建日志记录器
|
|
@@ -95,29 +95,38 @@ def load_table_data(table_name, execution_mode):
|
|
|
logger.error(traceback.format_exc())
|
|
|
return False
|
|
|
|
|
|
-with DAG("dag_data_resource", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
|
|
|
+with DAG(
|
|
|
+ "dag_data_resource",
|
|
|
+ start_date=datetime(2024, 1, 1),
|
|
|
+ schedule_interval="@daily",
|
|
|
+ catchup=False,
|
|
|
+ # 添加DAG级别的并行度控制,每个DAG运行最多同时执行2个任务
|
|
|
+ max_active_tasks=8
|
|
|
+ # 无论有多少个DAG运行实例(比如昨天、今天的运行),这个DAG定义的所有实例总共最多有5个任务同时运行
|
|
|
+ # concurrency=5
|
|
|
+) as dag:
|
|
|
today = pendulum.today()
|
|
|
is_monday = today.day_of_week == 0
|
|
|
- is_first_day = today.day == 1
|
|
|
+ is_first_day_of_month = today.day == 1
|
|
|
+ is_first_day_of_year = today.month == 1 and today.day == 1
|
|
|
|
|
|
all_resource_tables = []
|
|
|
|
|
|
- # Daily
|
|
|
- daily_enabled = get_enabled_tables("daily")
|
|
|
- all_resource_tables.extend(get_resource_subscribed_tables(daily_enabled))
|
|
|
- all_resource_tables.extend(get_dependency_resource_tables(daily_enabled))
|
|
|
+ # 使用循环处理不同频率的表
|
|
|
+ frequency_configs = [
|
|
|
+ {"name": "daily", "condition": True},
|
|
|
+ {"name": "weekly", "condition": is_monday},
|
|
|
+ {"name": "monthly", "condition": is_first_day_of_month},
|
|
|
+ {"name": "yearly", "condition": is_first_day_of_year}
|
|
|
+ ]
|
|
|
|
|
|
- # Weekly
|
|
|
- if is_monday:
|
|
|
- weekly_enabled = get_enabled_tables("weekly")
|
|
|
- all_resource_tables.extend(get_resource_subscribed_tables(weekly_enabled))
|
|
|
- all_resource_tables.extend(get_dependency_resource_tables(weekly_enabled))
|
|
|
-
|
|
|
- # Monthly
|
|
|
- if is_first_day:
|
|
|
- monthly_enabled = get_enabled_tables("monthly")
|
|
|
- all_resource_tables.extend(get_resource_subscribed_tables(monthly_enabled))
|
|
|
- all_resource_tables.extend(get_dependency_resource_tables(monthly_enabled))
|
|
|
+ for config in frequency_configs:
|
|
|
+ frequency = config["name"]
|
|
|
+ if config["condition"]:
|
|
|
+ enabled_tables = get_enabled_tables(frequency)
|
|
|
+ all_resource_tables.extend(get_resource_subscribed_tables(enabled_tables))
|
|
|
+ all_resource_tables.extend(get_dependency_resource_tables(enabled_tables))
|
|
|
+ logger.info(f"已添加 {frequency} 频率的资源表")
|
|
|
|
|
|
# 去重(按表名)
|
|
|
unique_resources = {}
|
|
@@ -134,26 +143,13 @@ with DAG("dag_data_resource", start_date=datetime(2024, 1, 1), schedule_interval
|
|
|
# 创建结束任务
|
|
|
end_loading = EmptyOperator(task_id="finish_resource_loading")
|
|
|
|
|
|
- # 按批次分组进行并行处理
|
|
|
- batch_size = RESOURCE_LOADING_PARALLEL_DEGREE
|
|
|
- batched_tables = [resource_tables[i:i+batch_size] for i in range(0, len(resource_tables), batch_size)]
|
|
|
-
|
|
|
- logger.info(f"将 {len(resource_tables)} 个资源表分为 {len(batched_tables)} 批处理,每批最多 {batch_size} 个表")
|
|
|
|
|
|
- for batch_idx, batch in enumerate(batched_tables):
|
|
|
- batch_tasks = []
|
|
|
- for item in batch:
|
|
|
- task = PythonOperator(
|
|
|
- task_id=f"load_{item['table_name']}",
|
|
|
- python_callable=load_table_data,
|
|
|
- op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
|
|
|
- )
|
|
|
- batch_tasks.append(task)
|
|
|
-
|
|
|
- # 设置起始依赖
|
|
|
- start_loading >> task
|
|
|
-
|
|
|
- # 设置结束依赖
|
|
|
- task >> end_loading
|
|
|
+ for item in resource_tables:
|
|
|
+ task = PythonOperator(
|
|
|
+ task_id=f"load_{item['table_name']}",
|
|
|
+ python_callable=load_table_data,
|
|
|
+ op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
|
|
|
+ )
|
|
|
|
|
|
- logger.info(f"批次 {batch_idx+1}: 创建了 {len(batch_tasks)} 个表加载任务")
|
|
|
+ # 设置依赖
|
|
|
+ start_loading >> task >> end_loading
|