Sfoglia il codice sorgente

Merge branch 'dev'

wangxq 1 mese fa
parent
commit
3ac997d603
2 ha cambiato i file con 35 aggiunte e 41 eliminazioni
  1. 0 2
      dags/config.py
  2. 35 39
      dags/dag_data_resource.py

+ 0 - 2
dags/config.py

@@ -30,5 +30,3 @@ SCRIPTS_BASE_PATH = "/opt/airflow/dataops/scripts"
 # 本地开发环境脚本路径(如果需要区分环境)
 # LOCAL_SCRIPTS_BASE_PATH = "/path/to/local/scripts"
 
-# 资源表加载并行度
-RESOURCE_LOADING_PARALLEL_DEGREE = 4  # 可根据环境调整

+ 35 - 39
dags/dag_data_resource.py

@@ -11,7 +11,7 @@ import pendulum
 import logging
 import sys
 from airflow.operators.empty import EmptyOperator
-from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, RESOURCE_LOADING_PARALLEL_DEGREE
+from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH
 from neo4j import GraphDatabase
 
 # 创建日志记录器
@@ -95,29 +95,38 @@ def load_table_data(table_name, execution_mode):
         logger.error(traceback.format_exc())
         return False
 
-with DAG("dag_data_resource", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
+with DAG(
+    "dag_data_resource", 
+    start_date=datetime(2024, 1, 1), 
+    schedule_interval="@daily", 
+    catchup=False,
+    # 添加DAG级别的并行度控制,每个DAG运行最多同时执行2个任务
+    max_active_tasks=8
+    # 无论有多少个DAG运行实例(比如昨天、今天的运行),这个DAG定义的所有实例总共最多有5个任务同时运行
+    # concurrency=5
+) as dag:
     today = pendulum.today()
     is_monday = today.day_of_week == 0
-    is_first_day = today.day == 1
+    is_first_day_of_month = today.day == 1
+    is_first_day_of_year = today.month == 1 and today.day == 1
 
     all_resource_tables = []
 
-    # Daily
-    daily_enabled = get_enabled_tables("daily")
-    all_resource_tables.extend(get_resource_subscribed_tables(daily_enabled))
-    all_resource_tables.extend(get_dependency_resource_tables(daily_enabled))
+    # 使用循环处理不同频率的表
+    frequency_configs = [
+        {"name": "daily", "condition": True},
+        {"name": "weekly", "condition": is_monday},
+        {"name": "monthly", "condition": is_first_day_of_month},
+        {"name": "yearly", "condition": is_first_day_of_year}
+    ]
 
-    # Weekly
-    if is_monday:
-        weekly_enabled = get_enabled_tables("weekly")
-        all_resource_tables.extend(get_resource_subscribed_tables(weekly_enabled))
-        all_resource_tables.extend(get_dependency_resource_tables(weekly_enabled))
-
-    # Monthly
-    if is_first_day:
-        monthly_enabled = get_enabled_tables("monthly")
-        all_resource_tables.extend(get_resource_subscribed_tables(monthly_enabled))
-        all_resource_tables.extend(get_dependency_resource_tables(monthly_enabled))
+    for config in frequency_configs:
+        frequency = config["name"]
+        if config["condition"]:
+            enabled_tables = get_enabled_tables(frequency)
+            all_resource_tables.extend(get_resource_subscribed_tables(enabled_tables))
+            all_resource_tables.extend(get_dependency_resource_tables(enabled_tables))
+            logger.info(f"已添加 {frequency} 频率的资源表")
 
     # 去重(按表名)
     unique_resources = {}
@@ -134,26 +143,13 @@ with DAG("dag_data_resource", start_date=datetime(2024, 1, 1), schedule_interval
     # 创建结束任务
     end_loading = EmptyOperator(task_id="finish_resource_loading")
     
-    # 按批次分组进行并行处理
-    batch_size = RESOURCE_LOADING_PARALLEL_DEGREE
-    batched_tables = [resource_tables[i:i+batch_size] for i in range(0, len(resource_tables), batch_size)]
-    
-    logger.info(f"将 {len(resource_tables)} 个资源表分为 {len(batched_tables)} 批处理,每批最多 {batch_size} 个表")
     
-    for batch_idx, batch in enumerate(batched_tables):
-        batch_tasks = []
-        for item in batch:
-            task = PythonOperator(
-                task_id=f"load_{item['table_name']}",
-                python_callable=load_table_data,
-                op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
-            )
-            batch_tasks.append(task)
-            
-            # 设置起始依赖
-            start_loading >> task
-            
-            # 设置结束依赖
-            task >> end_loading
+    for item in resource_tables:
+        task = PythonOperator(
+            task_id=f"load_{item['table_name']}",
+            python_callable=load_table_data,
+            op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
+        )
         
-        logger.info(f"批次 {batch_idx+1}: 创建了 {len(batch_tasks)} 个表加载任务")
+        # 设置依赖
+        start_loading >> task >> end_loading