Bläddra i källkod

已经完成对定时订阅功能的公共函数的优化和测试

wangxq 1 månad sedan
förälder
incheckning
fbebca0978

+ 6 - 0
dags/config.py

@@ -23,6 +23,12 @@ AIRFLOW_CONFIG = {
     "password": "admin",
 }
 
+# 任务重试配置
+TASK_RETRY_CONFIG = {
+    "retries": 2,  # 重试次数
+    "retry_delay_minutes": 1  # 重试延迟(分钟)
+}
+
 # 脚本文件基础路径配置
 # 部署到 Airflow 环境时使用此路径
 SCRIPTS_BASE_PATH = "/opt/airflow/dataops/scripts"

+ 6 - 98
dags/dag_data_model_daily.py

@@ -4,7 +4,10 @@ from airflow.operators.python import PythonOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.sensors.external_task import ExternalTaskSensor
 from datetime import datetime
-from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph
+from utils import (
+    get_enabled_tables, is_data_model_table, run_model_script, 
+    get_model_dependency_graph, process_model_tables
+)
 from config import NEO4J_CONFIG
 import pendulum
 import logging
@@ -78,103 +81,8 @@ with DAG("dag_data_model_daily", start_date=datetime(2024, 1, 1), schedule_inter
     # 获取启用的 daily 模型表
     try:
         enabled_tables = get_enabled_tables("daily")
-        model_tables = [t for t in enabled_tables if is_data_model_table(t['table_name'])]
-        logger.info(f"获取到 {len(model_tables)} 个启用的 daily 模型表")
-        
-        if not model_tables:
-            # 如果没有模型表需要处理,直接将等待任务与完成标记相连接
-            logger.info("没有找到需要处理的模型表,DAG将直接标记为完成")
-            wait_for_resource >> daily_completed
-        else:
-            # 获取表名列表
-            table_names = [t['table_name'] for t in model_tables]
-            
-            # 使用优化函数生成执行顺序,可以处理循环依赖
-            optimized_table_order = generate_optimized_execution_order(table_names)
-            logger.info(f"生成优化执行顺序, 共 {len(optimized_table_order)} 个表")
-            
-            # 获取依赖图 (仍然需要用于设置任务依赖关系)
-            try:
-                dependency_graph = get_model_dependency_graph(table_names)
-                logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
-            except Exception as e:
-                logger.error(f"构建依赖关系图时出错: {str(e)}")
-                # 出错时也要确保完成标记被触发
-                wait_for_resource >> daily_completed
-                raise
-
-            # 构建 task 对象
-            task_dict = {}
-            for table_name in optimized_table_order:
-                # 获取表的配置信息
-                table_config = next((t for t in model_tables if t['table_name'] == table_name), None)
-                if table_config:
-                    try:
-                        task = PythonOperator(
-                            task_id=f"process_model_{table_name}",
-                            python_callable=run_model_script,
-                            op_kwargs={"table_name": table_name, "execution_mode": table_config['execution_mode']},
-                        )
-                        task_dict[table_name] = task
-                        logger.info(f"创建模型处理任务: process_model_{table_name}")
-                    except Exception as e:
-                        logger.error(f"创建任务 process_model_{table_name} 时出错: {str(e)}")
-                        # 出错时也要确保完成标记被触发
-                        wait_for_resource >> daily_completed
-                        raise
-
-            # 建立任务依赖(基于 DERIVED_FROM 图)
-            dependency_count = 0
-            for target, upstream_list in dependency_graph.items():
-                for upstream in upstream_list:
-                    if upstream in task_dict and target in task_dict:
-                        task_dict[upstream] >> task_dict[target]
-                        dependency_count += 1
-                        logger.debug(f"建立依赖关系: {upstream} >> {target}")
-                    else:
-                        logger.warning(f"无法建立依赖关系,缺少任务: {upstream} 或 {target}")
-
-            logger.info(f"总共建立了 {dependency_count} 个任务依赖关系")
-
-            # 最顶层的 task(没有任何上游)需要依赖资源任务完成
-            all_upstreams = set()
-            for upstreams in dependency_graph.values():
-                all_upstreams.update(upstreams)
-            top_level_tasks = [t for t in table_names if t not in all_upstreams]
-            
-            if top_level_tasks:
-                logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
-                for name in top_level_tasks:
-                    if name in task_dict:
-                        wait_for_resource >> task_dict[name]
-            else:
-                logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")
-                # 如果没有顶层任务,直接将等待任务与完成标记相连接
-                wait_for_resource >> daily_completed
-            
-            # 连接所有末端任务(没有下游任务的)到完成标记
-            # 找出所有没有下游任务的任务(即终端任务)
-            terminal_tasks = []
-            for table_name, task in task_dict.items():
-                is_terminal = True
-                for upstream_list in dependency_graph.values():
-                    if table_name in upstream_list:
-                        is_terminal = False
-                        break
-                if is_terminal:
-                    terminal_tasks.append(task)
-                    logger.debug(f"发现终端任务: {table_name}")
-            
-            # 如果有终端任务,将它们连接到完成标记
-            if terminal_tasks:
-                logger.info(f"连接 {len(terminal_tasks)} 个终端任务到完成标记")
-                for task in terminal_tasks:
-                    task >> daily_completed
-            else:
-                # 如果没有终端任务(可能是因为存在循环依赖),直接将等待任务与完成标记相连接
-                logger.warning("没有找到终端任务,直接将等待任务与完成标记相连接")
-                wait_for_resource >> daily_completed
-                
+        # 使用公共函数处理模型表
+        process_model_tables(enabled_tables, "daily", wait_for_resource, daily_completed, dag)
     except Exception as e:
         logger.error(f"获取 daily 模型表时出错: {str(e)}")
         # 出错时也要确保完成标记被触发

+ 39 - 126
dags/dag_data_model_monthly.py

@@ -2,9 +2,12 @@ from airflow import DAG
 from airflow.operators.python import PythonOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime
-from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph, check_table_relationship
-from config import NEO4J_CONFIG
+from datetime import datetime, timedelta
+from utils import (
+    get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph, 
+    check_table_relationship, process_model_tables
+)
+from config import NEO4J_CONFIG, TASK_RETRY_CONFIG
 import pendulum
 import logging
 import networkx as nx
@@ -61,16 +64,16 @@ def is_first_day():
 with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
     logger.info("初始化 dag_data_model_monthly DAG")
     
-    # 等待周模型 DAG 完成
-    wait_for_weekly = ExternalTaskSensor(
-        task_id="wait_for_weekly_model",
-        external_dag_id="dag_data_model_weekly",
-        external_task_id="weekly_processing_completed",  # 指定完成标记任务
+    # 修改依赖关系:直接依赖于daily.py而不是weekly.py
+    wait_for_daily = ExternalTaskSensor(
+        task_id="wait_for_daily_model",
+        external_dag_id="dag_data_model_daily",
+        external_task_id="daily_processing_completed",  # 指定完成标记任务
         mode="poke",
         timeout=3600,
         poke_interval=30
     )
-    logger.info("创建周模型等待任务 - wait_for_weekly_model")
+    logger.info("创建日模型等待任务 - wait_for_daily_model")
     
     # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
     monthly_completed = EmptyOperator(
@@ -85,126 +88,36 @@ with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_int
         # 获取启用的 monthly 模型表
         try:
             enabled_tables = get_enabled_tables("monthly")
-            model_tables = [t for t in enabled_tables if is_data_model_table(t['table_name'])]
-            logger.info(f"获取到 {len(model_tables)} 个启用的 monthly 模型表")
             
-            if not model_tables:
-                # 如果没有模型表需要处理,直接将等待任务与完成标记相连接
-                logger.info("没有找到需要处理的月模型表,DAG将直接标记为完成")
-                wait_for_weekly >> monthly_completed
-            else:
-                # 获取表名列表
-                table_names = [t['table_name'] for t in model_tables]
-                
-                # 特别检查两个表之间的关系
-                if 'book_sale_amt_yearly' in table_names and 'book_sale_amt_monthly' in table_names:
-                    logger.info("特别检查 book_sale_amt_yearly 和 book_sale_amt_monthly 之间的关系")
-                    relationship = check_table_relationship('book_sale_amt_yearly', 'book_sale_amt_monthly')
-                    logger.info(f"关系检查结果: {relationship}")
-                
-                # 使用优化函数生成执行顺序,可以处理循环依赖
-                optimized_table_order = generate_optimized_execution_order(table_names)
-                logger.info(f"生成优化执行顺序, 共 {len(optimized_table_order)} 个表")
-                
-                # 获取依赖图 (仍然需要用于设置任务依赖关系)
-                try:
-                    dependency_graph = get_model_dependency_graph(table_names)
-                    logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
-                except Exception as e:
-                    logger.error(f"构建依赖关系图时出错: {str(e)}")
-                    # 出错时也要确保完成标记被触发
-                    wait_for_weekly >> monthly_completed
-                    raise
-
-                # 构建 task 对象
-                task_dict = {}
-                for table_name in optimized_table_order:
-                    # 获取表的配置信息
-                    table_config = next((t for t in model_tables if t['table_name'] == table_name), None)
-                    if table_config:
-                        try:
-                            task = PythonOperator(
-                                task_id=f"process_monthly_{table_name}",
-                                python_callable=run_model_script,
-                                op_kwargs={"table_name": table_name, "execution_mode": table_config['execution_mode']},
-                            )
-                            task_dict[table_name] = task
-                            logger.info(f"创建模型处理任务: process_monthly_{table_name}")
-                        except Exception as e:
-                            logger.error(f"创建任务 process_monthly_{table_name} 时出错: {str(e)}")
-                            # 出错时也要确保完成标记被触发
-                            wait_for_weekly >> monthly_completed
-                            raise
-
-                # 建立任务依赖(基于 DERIVED_FROM 图)
-                dependency_count = 0
-                logger.info("开始建立任务依赖关系...")
-                for target, upstream_list in dependency_graph.items():
-                    logger.info(f"处理目标表 {target} 的依赖关系")
-                    for upstream in upstream_list:
-                        if upstream in task_dict and target in task_dict:
-                            logger.info(f"建立依赖边: {upstream} >> {target}")
-                            task_dict[upstream] >> task_dict[target]
-                            dependency_count += 1
-                            logger.debug(f"建立依赖关系: {upstream} >> {target}")
-                        else:
-                            missing = []
-                            if upstream not in task_dict:
-                                missing.append(f"上游表 {upstream}")
-                            if target not in task_dict:
-                                missing.append(f"目标表 {target}")
-                            missing_str = " 和 ".join(missing)
-                            logger.warning(f"无法建立依赖关系: {upstream} >> {target},缺少任务: {missing_str}")
-
-                logger.info(f"总共建立了 {dependency_count} 个任务依赖关系")
-                logger.info(f"任务字典中的所有表: {list(task_dict.keys())}")
-
-                # 最顶层的 task(没有任何上游)需要依赖周模型任务完成
-                all_upstreams = set()
-                for upstreams in dependency_graph.values():
-                    all_upstreams.update(upstreams)
-                top_level_tasks = [t for t in table_names if t not in all_upstreams]
-                logger.info(f"所有上游表集合: {all_upstreams}")
-                logger.info(f"识别出的顶层表: {top_level_tasks}")
-                
-                if top_level_tasks:
-                    logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
-                    for name in top_level_tasks:
-                        if name in task_dict:
-                            wait_for_weekly >> task_dict[name]
-                else:
-                    logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")
-                    # 如果没有顶层任务,直接将等待任务与完成标记相连接
-                    wait_for_weekly >> monthly_completed
-                
-                # 连接所有末端任务(没有下游任务的)到完成标记
-                # 找出所有没有下游任务的任务(即终端任务)
-                terminal_tasks = []
-                for table_name, task in task_dict.items():
-                    is_terminal = True
-                    for upstream_list in dependency_graph.values():
-                        if table_name in upstream_list:
-                            is_terminal = False
-                            break
-                    if is_terminal:
-                        terminal_tasks.append(task)
-                        logger.debug(f"发现终端任务: {table_name}")
-                
-                # 如果有终端任务,将它们连接到完成标记
-                if terminal_tasks:
-                    logger.info(f"连接 {len(terminal_tasks)} 个终端任务到完成标记")
-                    for task in terminal_tasks:
-                        task >> monthly_completed
-                else:
-                    # 如果没有终端任务(可能是因为存在循环依赖),直接将等待任务与完成标记相连接
-                    logger.warning("没有找到终端任务,直接将等待任务与完成标记相连接")
-                    wait_for_weekly >> monthly_completed
+            # 特别检查两个表之间的关系(这是monthly.py特有的)
+            table_names = [t['table_name'] for t in enabled_tables if is_data_model_table(t['table_name'])]
+            if 'book_sale_amt_yearly' in table_names and 'book_sale_amt_monthly' in table_names:
+                logger.info("特别检查 book_sale_amt_yearly 和 book_sale_amt_monthly 之间的关系")
+                relationship = check_table_relationship('book_sale_amt_yearly', 'book_sale_amt_monthly')
+                logger.info(f"关系检查结果: {relationship}")
+            
+            # 定义monthly特有的任务选项
+            task_options = {
+                'default': {
+                    'retries': TASK_RETRY_CONFIG["retries"],
+                    'retry_delay': timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
+                },
+                'book_sale_amt_monthly': {
+                    'trigger_rule': "none_failed",
+                    'retries': TASK_RETRY_CONFIG["retries"],
+                    'retry_delay': timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
+                }
+            }
+            
+            # 使用公共函数处理模型表 - 修改依赖任务名称
+            process_model_tables(enabled_tables, "monthly", wait_for_daily, monthly_completed, dag, **task_options)
+            
         except Exception as e:
             logger.error(f"获取 monthly 模型表时出错: {str(e)}")
-            # 出错时也要确保完成标记被触发
-            wait_for_weekly >> monthly_completed
+            # 出错时也要确保完成标记被触发 - 修改依赖任务名称
+            wait_for_daily >> monthly_completed
             raise
     else:
-        # 如果不是月初,直接将等待任务与完成标记相连接,跳过处理
+        # 如果不是月初,直接将等待任务与完成标记相连接,跳过处理 - 修改依赖任务名称
         logger.info("今天不是月初,跳过月模型处理")
-        wait_for_weekly >> monthly_completed
+        wait_for_daily >> monthly_completed

+ 6 - 97
dags/dag_data_model_weekly.py

@@ -4,7 +4,10 @@ from airflow.operators.python import PythonOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.sensors.external_task import ExternalTaskSensor
 from datetime import datetime
-from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph
+from utils import (
+    get_enabled_tables, is_data_model_table, run_model_script, 
+    get_model_dependency_graph, process_model_tables
+)
 from config import NEO4J_CONFIG
 import pendulum
 import logging
@@ -85,102 +88,8 @@ with DAG("dag_data_model_weekly", start_date=datetime(2024, 1, 1), schedule_inte
         # 获取启用的 weekly 模型表
         try:
             enabled_tables = get_enabled_tables("weekly")
-            model_tables = [t for t in enabled_tables if is_data_model_table(t['table_name'])]
-            logger.info(f"获取到 {len(model_tables)} 个启用的 weekly 模型表")
-            
-            if not model_tables:
-                # 如果没有模型表需要处理,直接将等待任务与完成标记相连接
-                logger.info("没有找到需要处理的周模型表,DAG将直接标记为完成")
-                wait_for_daily >> weekly_completed
-            else:
-                # 获取表名列表
-                table_names = [t['table_name'] for t in model_tables]
-                
-                # 使用优化函数生成执行顺序,可以处理循环依赖
-                optimized_table_order = generate_optimized_execution_order(table_names)
-                logger.info(f"生成优化执行顺序, 共 {len(optimized_table_order)} 个表")
-                
-                # 获取依赖图 (仍然需要用于设置任务依赖关系)
-                try:
-                    dependency_graph = get_model_dependency_graph(table_names)
-                    logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
-                except Exception as e:
-                    logger.error(f"构建依赖关系图时出错: {str(e)}")
-                    # 出错时也要确保完成标记被触发
-                    wait_for_daily >> weekly_completed
-                    raise
-
-                # 构建 task 对象
-                task_dict = {}
-                for table_name in optimized_table_order:
-                    # 获取表的配置信息
-                    table_config = next((t for t in model_tables if t['table_name'] == table_name), None)
-                    if table_config:
-                        try:
-                            task = PythonOperator(
-                                task_id=f"process_weekly_{table_name}",
-                                python_callable=run_model_script,
-                                op_kwargs={"table_name": table_name, "execution_mode": table_config['execution_mode']},
-                            )
-                            task_dict[table_name] = task
-                            logger.info(f"创建模型处理任务: process_weekly_{table_name}")
-                        except Exception as e:
-                            logger.error(f"创建任务 process_weekly_{table_name} 时出错: {str(e)}")
-                            # 出错时也要确保完成标记被触发
-                            wait_for_daily >> weekly_completed
-                            raise
-                
-                # 建立任务依赖(基于 DERIVED_FROM 图)
-                dependency_count = 0
-                for target, upstream_list in dependency_graph.items():
-                    for upstream in upstream_list:
-                        if upstream in task_dict and target in task_dict:
-                            task_dict[upstream] >> task_dict[target]
-                            dependency_count += 1
-                            logger.debug(f"建立依赖关系: {upstream} >> {target}")
-                        else:
-                            logger.warning(f"无法建立依赖关系,缺少任务: {upstream} 或 {target}")
-
-                logger.info(f"总共建立了 {dependency_count} 个任务依赖关系")
-
-                # 最顶层的 task(没有任何上游)需要依赖日模型任务完成
-                all_upstreams = set()
-                for upstreams in dependency_graph.values():
-                    all_upstreams.update(upstreams)
-                top_level_tasks = [t for t in table_names if t not in all_upstreams]
-                
-                if top_level_tasks:
-                    logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
-                    for name in top_level_tasks:
-                        if name in task_dict:
-                            wait_for_daily >> task_dict[name]
-                else:
-                    logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")
-                    # 如果没有顶层任务,直接将等待任务与完成标记相连接
-                    wait_for_daily >> weekly_completed
-                
-                # 连接所有末端任务(没有下游任务的)到完成标记
-                # 找出所有没有下游任务的任务(即终端任务)
-                terminal_tasks = []
-                for table_name, task in task_dict.items():
-                    is_terminal = True
-                    for upstream_list in dependency_graph.values():
-                        if table_name in upstream_list:
-                            is_terminal = False
-                            break
-                    if is_terminal:
-                        terminal_tasks.append(task)
-                        logger.debug(f"发现终端任务: {table_name}")
-                
-                # 如果有终端任务,将它们连接到完成标记
-                if terminal_tasks:
-                    logger.info(f"连接 {len(terminal_tasks)} 个终端任务到完成标记")
-                    for task in terminal_tasks:
-                        task >> weekly_completed
-                else:
-                    # 如果没有终端任务(可能是因为存在循环依赖),直接将等待任务与完成标记相连接
-                    logger.warning("没有找到终端任务,直接将等待任务与完成标记相连接")
-                    wait_for_daily >> weekly_completed
+            # 使用公共函数处理模型表
+            process_model_tables(enabled_tables, "weekly", wait_for_daily, weekly_completed, dag)
         except Exception as e:
             logger.error(f"获取 weekly 模型表时出错: {str(e)}")
             # 出错时也要确保完成标记被触发

+ 6 - 87
dags/dag_data_model_yearly.py

@@ -4,7 +4,10 @@ from airflow.operators.python import PythonOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.sensors.external_task import ExternalTaskSensor
 from datetime import datetime
-from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph
+from utils import (
+    get_enabled_tables, is_data_model_table, run_model_script, 
+    get_model_dependency_graph, process_model_tables
+)
 from config import NEO4J_CONFIG
 import pendulum
 import logging
@@ -44,92 +47,8 @@ with DAG("dag_data_model_yearly", start_date=datetime(2024, 1, 1), schedule_inte
         # 获取启用的 yearly 模型表
         try:
             enabled_tables = get_enabled_tables("yearly")
-            model_tables = [t for t in enabled_tables if is_data_model_table(t['table_name'])]
-            logger.info(f"获取到 {len(model_tables)} 个启用的 yearly 模型表")
-            
-            if not model_tables:
-                # 如果没有模型表需要处理,直接将等待任务与完成标记相连接
-                logger.info("没有找到需要处理的年模型表,DAG将直接标记为完成")
-                wait_for_monthly >> yearly_completed
-            else:
-                # 获取依赖图
-                try:
-                    table_names = [t['table_name'] for t in model_tables]
-                    dependency_graph = get_model_dependency_graph(table_names)
-                    logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
-                except Exception as e:
-                    logger.error(f"构建依赖关系图时出错: {str(e)}")
-                    # 出错时也要确保完成标记被触发
-                    wait_for_monthly >> yearly_completed
-                    raise
-
-                # 构建 task 对象
-                task_dict = {}
-                for item in model_tables:
-                    try:
-                        task = PythonOperator(
-                            task_id=f"process_yearly_{item['table_name']}",
-                            python_callable=run_model_script,
-                            op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
-                        )
-                        task_dict[item['table_name']] = task
-                        logger.info(f"创建模型处理任务: process_yearly_{item['table_name']}")
-                    except Exception as e:
-                        logger.error(f"创建任务 process_yearly_{item['table_name']} 时出错: {str(e)}")
-                        # 出错时也要确保完成标记被触发
-                        wait_for_monthly >> yearly_completed
-                        raise
-
-                # 建立任务依赖(基于 DERIVED_FROM 图)
-                dependency_count = 0
-                for target, upstream_list in dependency_graph.items():
-                    for upstream in upstream_list:
-                        if upstream in task_dict and target in task_dict:
-                            task_dict[upstream] >> task_dict[target]
-                            dependency_count += 1
-                            logger.debug(f"建立依赖关系: {upstream} >> {target}")
-                        else:
-                            logger.warning(f"无法建立依赖关系,缺少任务: {upstream} 或 {target}")
-
-                logger.info(f"总共建立了 {dependency_count} 个任务依赖关系")
-
-                # 最顶层的 task(没有任何上游)需要依赖月模型任务完成
-                all_upstreams = set()
-                for upstreams in dependency_graph.values():
-                    all_upstreams.update(upstreams)
-                top_level_tasks = [t for t in table_names if t not in all_upstreams]
-                
-                if top_level_tasks:
-                    logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
-                    for name in top_level_tasks:
-                        wait_for_monthly >> task_dict[name]
-                else:
-                    logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")
-                    # 如果没有顶层任务,直接将等待任务与完成标记相连接
-                    wait_for_monthly >> yearly_completed
-                
-                # 连接所有末端任务(没有下游任务的)到完成标记
-                # 找出所有没有下游任务的任务(即终端任务)
-                terminal_tasks = []
-                for table_name, task in task_dict.items():
-                    is_terminal = True
-                    for upstream_list in dependency_graph.values():
-                        if table_name in upstream_list:
-                            is_terminal = False
-                            break
-                    if is_terminal:
-                        terminal_tasks.append(task)
-                        logger.debug(f"发现终端任务: {table_name}")
-                
-                # 如果有终端任务,将它们连接到完成标记
-                if terminal_tasks:
-                    logger.info(f"连接 {len(terminal_tasks)} 个终端任务到完成标记")
-                    for task in terminal_tasks:
-                        task >> yearly_completed
-                else:
-                    # 如果没有终端任务(可能是因为存在循环依赖),直接将等待任务与完成标记相连接
-                    logger.warning("没有找到终端任务,直接将等待任务与完成标记相连接")
-                    wait_for_monthly >> yearly_completed
+            # 使用公共函数处理模型表
+            process_model_tables(enabled_tables, "yearly", wait_for_monthly, yearly_completed, dag)
         except Exception as e:
             logger.error(f"获取 yearly 模型表时出错: {str(e)}")
             # 出错时也要确保完成标记被触发

+ 1 - 1
dags/dag_data_resource.py

@@ -101,7 +101,7 @@ with DAG(
     schedule_interval="@daily", 
     catchup=False,
     # 添加DAG级别的并行度控制,每个DAG运行最多同时执行2个任务
-    max_active_tasks=8
+    # max_active_tasks=8
     # 无论有多少个DAG运行实例(比如昨天、今天的运行),这个DAG定义的所有实例总共最多有5个任务同时运行
     # concurrency=5
 ) as dag:

+ 287 - 31
dags/utils.py

@@ -421,31 +421,80 @@ def get_model_dependency_graph(table_names: list) -> dict:
     driver = GraphDatabase.driver(uri, auth=auth)
     try:
         with driver.session() as session:
-            for table_name in table_names:
-                # 修改查询,移除对节点类型的限制,但保留对表名集合的过滤
-                query = """
-                    MATCH (t {en_name: $table_name})-[:DERIVED_FROM]->(up)
-                    WHERE up.en_name IN $all_tables
-                    RETURN up.en_name AS upstream
-                """
-                logger.info(f"执行Neo4j查询: 查找 {table_name} 在当前批次中的上游依赖")
-                result = session.run(query, table_name=table_name, all_tables=table_names)
-                deps = [record['upstream'] for record in result if 'upstream' in record]
-                logger.info(f"表 {table_name} 的上游依赖(当前批次内): {deps}")
+            # 使用一次性查询获取所有表之间的依赖关系
+            # 注意:这里查询的是 A-[:DERIVED_FROM]->B 关系,表示A依赖B
+            
+            # 记录原始查询参数用于调试
+            logger.info(f"查询参数 table_names: {table_names}, 类型: {type(table_names)}")
+            
+            # 第一层查询 - 更明确的查询形式
+            query = """
+                MATCH (source)-[r:DERIVED_FROM]->(target)
+                WHERE source.en_name IN $table_names AND target.en_name IN $table_names
+                RETURN source.en_name AS source, target.en_name AS target, r.script_name AS script_name
+            """
+            logger.info(f"执行Neo4j查询: 查找所有表之间的依赖关系")
+            result = session.run(query, table_names=table_names)
+            
+            # 转换结果为列表,确保结果被消费
+            result_records = list(result)
+            logger.info(f"第一层查询返回记录数: {len(result_records)}")
+            
+            # 处理依赖关系
+            found_deps = 0
+            # 初始化依赖字典
+            dependency_dict = {name: [] for name in table_names}
+            
+            # 这里是问题所在 - 需要正确处理记录
+            for record in result_records:
+                # 直接将记录转换为字典,避免访问问题
+                record_dict = dict(record)
                 
-                # 同时查询所有上游依赖(不限于当前批次),用于日志记录
-                all_deps_query = """
-                    MATCH (t {en_name: $table_name})-[:DERIVED_FROM]->(up)
-                    RETURN up.en_name AS upstream
-                """
-                all_deps_result = session.run(all_deps_query, table_name=table_name)
-                all_deps = [record['upstream'] for record in all_deps_result if 'upstream' in record]
-                logger.info(f"表 {table_name} 的所有上游依赖: {all_deps}")
+                # 从字典中获取值
+                source = record_dict.get('source')
+                target = record_dict.get('target') 
+                script_name = record_dict.get('script_name', 'unknown_script')
+                
+                # 确保字段存在且有值
+                if source and target:
+                    logger.info(f"发现依赖关系: {source} -[:DERIVED_FROM]-> {target}, 脚本: {script_name}")
+                    
+                    # 添加依赖关系到字典
+                    if source in dependency_dict:
+                        dependency_dict[source].append(target)
+                        found_deps += 1
+                        
+                        # 添加边到图 - 把被依赖方指向依赖方,表示执行顺序(被依赖方先执行)
+                        G.add_edge(target, source)
+                        logger.info(f"添加执行顺序边: {target} -> {source} (因为{source}依赖{target})")
+            
+            logger.info(f"总共发现 {found_deps} 个依赖关系")
+            
+            # 如果没有找到依赖关系,尝试检查所有可能的表对关系
+            if found_deps == 0:
+                logger.warning("仍未找到依赖关系,尝试检查所有表对之间的关系")
+                logger.info("第三层查询: 开始表对之间的循环检查")
+                logger.info(f"要检查的表对数量: {len(table_names) * (len(table_names) - 1)}")
                 
-                # 添加依赖边
-                for dep in deps:
-                    logger.info(f"添加依赖边: {dep} -> {table_name}")
-                    G.add_edge(dep, table_name)
+                pair_count = 0
+                for source_table in table_names:
+                    for target_table in table_names:
+                        if source_table != target_table:
+                            pair_count += 1
+                            logger.info(f"检查表对[{pair_count}]: {source_table} -> {target_table}")
+                            
+                            check_result = check_table_relationship(source_table, target_table)
+                            
+                            # 检查forward方向的关系
+                            if 'forward' in check_result and check_result['forward']['exists']:
+                                script_name = check_result['forward'].get('script_name', 'unknown_script')
+                                logger.info(f"表对检查发现关系: {source_table} -[:DERIVED_FROM]-> {target_table}, 脚本: {script_name}")
+                                
+                                dependency_dict[source_table].append(target_table)
+                                G.add_edge(target_table, source_table)
+                                found_deps += 1
+                
+                logger.info(f"表对检查后找到 {found_deps} 个依赖关系")
     finally:
         driver.close()
     
@@ -457,15 +506,14 @@ def get_model_dependency_graph(table_names: list) -> dict:
     except Exception as e:
         logger.error(f"检查循环依赖失败: {str(e)}")
     
-    # 转换为字典格式返回
-    dependency_dict = {}
+    # 将图转换为字典格式
+    final_dependency_dict = {}
     for table_name in table_names:
-        predecessors = list(G.predecessors(table_name))
-        dependency_dict[table_name] = predecessors
-        logger.info(f"最终依赖关系 - 表 {table_name} 依赖于: {predecessors}")
+        final_dependency_dict[table_name] = dependency_dict.get(table_name, [])
+        logger.info(f"最终依赖关系 - 表 {table_name} 依赖于: {final_dependency_dict[table_name]}")
     
-    logger.info(f"完整依赖图: {dependency_dict}")
-    return dependency_dict
+    logger.info(f"完整依赖图: {final_dependency_dict}")
+    return final_dependency_dict
 
 
 def generate_optimized_execution_order(table_names: list) -> list:
@@ -630,4 +678,212 @@ def check_table_relationship(table1, table2):
     finally:
         driver.close()
         
-    return relationship_info
+    return relationship_info
+
+def build_model_dependency_dag(table_names, model_tables):
+    """
+    基于表名列表构建模型依赖DAG,返回优化后的执行顺序和依赖关系图
+    
+    参数:
+        table_names: 表名列表
+        model_tables: 表配置列表
+        
+    返回:
+        tuple: (优化后的表执行顺序, 依赖关系图)
+    """
+    # 使用优化函数生成执行顺序,可以处理循环依赖
+    optimized_table_order = generate_optimized_execution_order(table_names)
+    logger.info(f"生成优化执行顺序, 共 {len(optimized_table_order)} 个表")
+    
+    # 获取依赖图
+    dependency_graph = get_model_dependency_graph(table_names)
+    logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
+    
+    return optimized_table_order, dependency_graph
+
+
+def create_task_dict(optimized_table_order, model_tables, dag, execution_type, **task_options):
+    """
+    根据优化后的表执行顺序创建任务字典
+    
+    参数:
+        optimized_table_order: 优化后的表执行顺序
+        model_tables: 表配置列表
+        dag: Airflow DAG对象
+        execution_type: 执行类型(daily, monthly等)
+        task_options: 任务创建的额外选项
+        
+    返回:
+        dict: 任务字典 {表名: 任务对象}
+    """
+    from airflow.operators.python import PythonOperator
+    
+    task_dict = {}
+    for table_name in optimized_table_order:
+        # 获取表的配置信息
+        table_config = next((t for t in model_tables if t['table_name'] == table_name), None)
+        if table_config:
+            try:
+                # 构建基础参数
+                task_params = {
+                    "task_id": f"process_{execution_type}_{table_name}",
+                    "python_callable": run_model_script,
+                    "op_kwargs": {"table_name": table_name, "execution_mode": table_config['execution_mode']},
+                    "dag": dag
+                }
+                
+                # 添加额外选项
+                if task_options:
+                    # 如果有表特定的选项,使用它们
+                    if table_name in task_options:
+                        task_params.update(task_options[table_name])
+                    # 如果有全局选项,使用它们
+                    elif 'default' in task_options:
+                        task_params.update(task_options['default'])
+                
+                task = PythonOperator(**task_params)
+                task_dict[table_name] = task
+                logger.info(f"创建模型处理任务: {task_params['task_id']}")
+            except Exception as e:
+                logger.error(f"创建任务 process_{execution_type}_{table_name} 时出错: {str(e)}")
+                raise
+    return task_dict
+
+
+def build_task_dependencies(task_dict, dependency_graph):
+    """
+    根据依赖图设置任务间的依赖关系
+    
+    参数:
+        task_dict: 任务字典
+        dependency_graph: 依赖关系图
+    
+    返回:
+        tuple: (tasks_with_upstream, tasks_with_downstream, dependency_count)
+    """
+    tasks_with_upstream = set()  # 用于跟踪已经有上游任务的节点
+    dependency_count = 0
+    
+    for target, upstream_list in dependency_graph.items():
+        if target in task_dict:
+            for upstream in upstream_list:
+                if upstream in task_dict:
+                    logger.info(f"建立任务依赖: {upstream} >> {target}")
+                    task_dict[upstream] >> task_dict[target]
+                    tasks_with_upstream.add(target)  # 记录此任务已有上游
+                    dependency_count += 1
+    
+    # 找出有下游任务的节点
+    tasks_with_downstream = set()
+    for target, upstream_list in dependency_graph.items():
+        if target in task_dict:  # 目标任务在当前DAG中
+            for upstream in upstream_list:
+                if upstream in task_dict:  # 上游任务也在当前DAG中
+                    tasks_with_downstream.add(upstream)  # 这个上游任务有下游
+    
+    logger.info(f"总共建立了 {dependency_count} 个任务之间的依赖关系")
+    logger.info(f"已有上游任务的节点: {tasks_with_upstream}")
+    
+    return tasks_with_upstream, tasks_with_downstream, dependency_count
+
+
+def connect_start_and_end_tasks(task_dict, tasks_with_upstream, tasks_with_downstream, 
+                               wait_task, completed_task, dag_type):
+    """
+    连接开始节点到等待任务,末端节点到完成标记
+    
+    参数:
+        task_dict: 任务字典
+        tasks_with_upstream: 有上游任务的节点集合
+        tasks_with_downstream: 有下游任务的节点集合
+        wait_task: 等待任务
+        completed_task: 完成标记任务
+        dag_type: DAG类型名称(用于日志)
+    
+    返回:
+        tuple: (start_tasks, end_tasks)
+    """
+    # 连接开始节点
+    start_tasks = []
+    for table_name, task in task_dict.items():
+        if table_name not in tasks_with_upstream:
+            start_tasks.append(table_name)
+            logger.info(f"任务 {table_name} 没有上游任务,应该连接到{dag_type}等待任务")
+    
+    logger.info(f"需要连接到{dag_type}等待任务的任务: {start_tasks}")
+    
+    for task_name in start_tasks:
+        wait_task >> task_dict[task_name]
+        logger.info(f"连接 {wait_task.task_id} >> {task_name}")
+    
+    # 连接末端节点
+    end_tasks = []
+    for table_name, task in task_dict.items():
+        if table_name not in tasks_with_downstream:
+            end_tasks.append(table_name)
+            logger.info(f"任务 {table_name} 没有下游任务,是末端任务")
+    
+    logger.info(f"需要连接到{dag_type}完成标记的末端任务: {end_tasks}")
+    
+    for end_task in end_tasks:
+        task_dict[end_task] >> completed_task
+        logger.info(f"连接 {end_task} >> {completed_task.task_id}")
+    
+    # 处理特殊情况
+    logger.info("处理特殊情况")
+    if not start_tasks:
+        logger.warning(f"没有找到开始任务,将{dag_type}等待任务直接连接到完成标记")
+        wait_task >> completed_task
+    
+    if not end_tasks:
+        logger.warning(f"没有找到末端任务,将所有任务连接到{dag_type}完成标记")
+        for table_name, task in task_dict.items():
+            task >> completed_task
+            logger.info(f"直接连接任务到完成标记: {table_name} >> {completed_task.task_id}")
+    
+    return start_tasks, end_tasks
+
+
+def process_model_tables(enabled_tables, dag_type, wait_task, completed_task, dag, **task_options):
+    """
+    处理模型表并构建DAG
+    
+    参数:
+        enabled_tables: 已启用的表列表
+        dag_type: DAG类型 (daily, monthly等)
+        wait_task: 等待任务
+        completed_task: 完成标记任务
+        dag: Airflow DAG对象
+        task_options: 创建任务的额外选项
+    """
+    model_tables = [t for t in enabled_tables if is_data_model_table(t['table_name'])]
+    logger.info(f"获取到 {len(model_tables)} 个启用的 {dag_type} 模型表")
+    
+    if not model_tables:
+        # 如果没有模型表需要处理,直接将等待任务与完成标记相连接
+        logger.info(f"没有找到需要处理的{dag_type}模型表,DAG将直接标记为完成")
+        wait_task >> completed_task
+        return
+    
+    # 获取表名列表
+    table_names = [t['table_name'] for t in model_tables]
+    
+    try:
+        # 构建模型依赖DAG
+        optimized_table_order, dependency_graph = build_model_dependency_dag(table_names, model_tables)
+        
+        # 创建任务字典
+        task_dict = create_task_dict(optimized_table_order, model_tables, dag, dag_type, **task_options)
+        
+        # 建立任务依赖关系
+        tasks_with_upstream, tasks_with_downstream, _ = build_task_dependencies(task_dict, dependency_graph)
+        
+        # 连接开始节点和末端节点
+        connect_start_and_end_tasks(task_dict, tasks_with_upstream, tasks_with_downstream, 
+                                  wait_task, completed_task, dag_type)
+        
+    except Exception as e:
+        logger.error(f"处理{dag_type}模型表时出错: {str(e)}")
+        # 出错时也要确保完成标记被触发
+        wait_task >> completed_task
+        raise

+ 97 - 0
dataops/scripts/book_sale_amt_2weekly_process.py

@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import sys
+import os
+from datetime import datetime, timedelta
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("book_sale_amt_2weekly_process")
+
+def process_2weekly_book_sales():
+    """处理双周图书销售额数据的函数"""
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    logger.info(f"开始执行双周图书销售额处理 - 脚本: {script_name}")
+    
+    try:
+        # 模拟数据处理过程
+        logger.info("从数据源获取原始销售数据...")
+        # 实际应用中这里会连接到数据库或其他数据源
+        
+        logger.info("按双周期汇总销售额...")
+        # 模拟处理步骤
+        today = datetime.now()
+        # 计算当前所在的周数
+        week_number = int(today.strftime("%U"))
+        # 计算当前双周期
+        biweekly_number = (week_number // 2) + 1
+        # 计算双周期的开始和结束日期
+        start_day = datetime.strptime(f"{today.year}-W{biweekly_number*2-1}-1", "%Y-W%W-%w")
+        end_day = start_day + timedelta(days=13)  # 两周共14天
+        date_range = f"{start_day.strftime('%Y-%m-%d')} 至 {end_day.strftime('%Y-%m-%d')}"
+        
+        logger.info(f"正在处理第 {biweekly_number} 个双周期 ({date_range}) 的数据")
+        
+        logger.info("汇总统计双周期内每天的销售数据...")
+        logger.info("计算与上一个双周期的对比...")
+        logger.info("计算热销书籍排行榜...")
+        logger.info("生成双周期销售趋势分析...")
+        
+        logger.info("数据处理完成,准备保存结果...")
+        # 实际应用中这里会将结果保存到数据库
+        
+        return True
+    except Exception as e:
+        logger.error(f"处理双周图书销售额时出错: {str(e)}")
+        return False
+
+def run(table_name, execution_mode, **kwargs):
+    """
+    统一入口函数,符合Airflow动态脚本调用规范
+    
+    参数:
+        table_name (str): 要处理的表名
+        execution_mode (str): 执行模式 (append/full_refresh)
+        **kwargs: 其他可能的参数
+    
+    返回:
+        bool: 执行成功返回True,否则返回False
+    """
+    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    # 记录详细的执行信息
+    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    
+    # 根据执行模式判断处理逻辑
+    if execution_mode == "full_refresh":
+        logger.info("执行完全刷新模式 - 将处理所有历史数据")
+        logger.info("获取过去12个双周期的历史数据进行分析...")
+    else:  # append
+        logger.info("执行增量模式 - 只处理最新双周期数据")
+    
+    # 调用实际处理函数
+    result = process_2weekly_book_sales()
+    
+    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    logger.info(f"处理结果: {'成功' if result else '失败'}")
+    
+    return result
+
+if __name__ == "__main__":
+    # 直接执行时调用统一入口函数
+    run(table_name="book_sale_amt_2weekly", execution_mode="append") 

+ 92 - 0
dataops/scripts/book_sale_amt_2yearly_process.py

@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import sys
+import os
+from datetime import datetime, timedelta
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("book_sale_amt_2yearly_process")
+
+def process_2yearly_book_sales():
+    """处理两年度图书销售额数据的函数"""
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    logger.info(f"开始执行两年度图书销售额处理 - 脚本: {script_name}")
+    
+    try:
+        # 模拟数据处理过程
+        logger.info("从数据源获取原始销售数据...")
+        # 实际应用中这里会连接到数据库或其他数据源
+        
+        logger.info("按两年周期汇总销售额...")
+        # 模拟处理步骤
+        current_year = int(datetime.now().strftime("%Y"))
+        # 计算当前两年周期
+        cycle_start = current_year - (current_year % 2)
+        cycle_end = cycle_start + 1
+        logger.info(f"正在处理 {cycle_start}-{cycle_end} 两年周期的数据")
+        
+        logger.info("计算与上一个两年周期的对比...")
+        logger.info("计算每年在两年周期中的占比...")
+        logger.info("生成两年周期销售趋势分析...")
+        logger.info("生成中长期销售预测...")
+        
+        logger.info("数据处理完成,准备保存结果...")
+        # 实际应用中这里会将结果保存到数据库
+        
+        return True
+    except Exception as e:
+        logger.error(f"处理两年度图书销售额时出错: {str(e)}")
+        return False
+
+def run(table_name, execution_mode, **kwargs):
+    """
+    统一入口函数,符合Airflow动态脚本调用规范
+    
+    参数:
+        table_name (str): 要处理的表名
+        execution_mode (str): 执行模式 (append/full_refresh)
+        **kwargs: 其他可能的参数
+    
+    返回:
+        bool: 执行成功返回True,否则返回False
+    """
+    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    # 记录详细的执行信息
+    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    
+    # 根据执行模式判断处理逻辑
+    if execution_mode == "full_refresh":
+        logger.info("执行完全刷新模式 - 将处理所有历史数据")
+        # 两年周期数据通常需要处理多个周期的历史数据进行比较
+        logger.info("获取过去4个两年周期的历史数据进行分析...")
+    else:  # append
+        logger.info("执行增量模式 - 只处理最新两年周期数据")
+    
+    # 调用实际处理函数
+    result = process_2yearly_book_sales()
+    
+    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    logger.info(f"处理结果: {'成功' if result else '失败'}")
+    
+    return result
+
+if __name__ == "__main__":
+    # 直接执行时调用统一入口函数
+    run(table_name="book_sale_amt_2yearly", execution_mode="append") 

+ 93 - 0
dataops/scripts/book_sale_amt_daily_clean.py

@@ -0,0 +1,93 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import sys
+import os
+from datetime import datetime, timedelta
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("book_sale_amt_daily_clean")
+
+def clean_daily_book_sales():
+    """清洗日度图书销售额数据的函数"""
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    logger.info(f"开始执行日度图书销售额数据清洗 - 脚本: {script_name}")
+    
+    try:
+        # 模拟数据处理过程
+        logger.info("从数据源获取原始销售数据...")
+        # 实际应用中这里会连接到数据库或其他数据源
+        
+        logger.info("执行数据清洗流程...")
+        
+        # 模拟处理步骤
+        today = datetime.now()
+        yesterday = today - timedelta(days=1)
+        date_str = yesterday.strftime('%Y-%m-%d')
+        
+        logger.info(f"正在清洗 {date_str} 的数据")
+        
+        logger.info("检查数据完整性...")
+        logger.info("检测并处理异常值...")
+        logger.info("填充缺失数据...")
+        logger.info("标准化数据格式...")
+        logger.info("去除重复记录...")
+        
+        logger.info("数据清洗完成,准备保存结果...")
+        # 实际应用中这里会将结果保存到数据库
+        
+        return True
+    except Exception as e:
+        logger.error(f"清洗日度图书销售额数据时出错: {str(e)}")
+        return False
+
+def run(table_name, execution_mode, **kwargs):
+    """
+    统一入口函数,符合Airflow动态脚本调用规范
+    
+    参数:
+        table_name (str): 要处理的表名
+        execution_mode (str): 执行模式 (append/full_refresh)
+        **kwargs: 其他可能的参数
+    
+    返回:
+        bool: 执行成功返回True,否则返回False
+    """
+    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    # 记录详细的执行信息
+    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    
+    # 根据执行模式判断处理逻辑
+    if execution_mode == "full_refresh":
+        logger.info("执行完全刷新模式 - 将处理所有历史数据")
+        logger.info("获取过去30天的历史数据进行清洗...")
+    else:  # append
+        logger.info("执行增量模式 - 只清洗最新一天的数据")
+    
+    # 调用实际处理函数
+    result = clean_daily_book_sales()
+    
+    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    logger.info(f"处理结果: {'成功' if result else '失败'}")
+    
+    return result
+
+if __name__ == "__main__":
+    # 直接执行时调用统一入口函数
+    run(table_name="book_sale_amt_daily", execution_mode="append") 

+ 90 - 0
dataops/scripts/book_sale_amt_half_yearly_process.py

@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import sys
+import os
+from datetime import datetime, timedelta
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("book_sale_amt_half_yearly_process")
+
+def process_half_yearly_book_sales():
+    """处理半年度图书销售额数据的函数"""
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    logger.info(f"开始执行半年度图书销售额处理 - 脚本: {script_name}")
+    
+    try:
+        # 模拟数据处理过程
+        logger.info("从数据源获取原始销售数据...")
+        # 实际应用中这里会连接到数据库或其他数据源
+        
+        logger.info("按半年汇总销售额...")
+        # 模拟处理步骤
+        current_year = datetime.now().strftime("%Y")
+        current_month = datetime.now().month
+        half_year = "上半年" if current_month <= 6 else "下半年"
+        logger.info(f"正在处理 {current_year} 年 {half_year} 的数据")
+        
+        logger.info("计算同比增长率...")
+        logger.info("计算各月份在半年度中的占比...")
+        logger.info("生成半年度销售趋势分析...")
+        
+        logger.info("数据处理完成,准备保存结果...")
+        # 实际应用中这里会将结果保存到数据库
+        
+        return True
+    except Exception as e:
+        logger.error(f"处理半年度图书销售额时出错: {str(e)}")
+        return False
+
+def run(table_name, execution_mode, **kwargs):
+    """
+    统一入口函数,符合Airflow动态脚本调用规范
+    
+    参数:
+        table_name (str): 要处理的表名
+        execution_mode (str): 执行模式 (append/full_refresh)
+        **kwargs: 其他可能的参数
+    
+    返回:
+        bool: 执行成功返回True,否则返回False
+    """
+    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    # 记录详细的执行信息
+    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    
+    # 根据执行模式判断处理逻辑
+    if execution_mode == "full_refresh":
+        logger.info("执行完全刷新模式 - 将处理所有历史数据")
+        # 半年度数据通常需要处理多个半年度的历史数据进行比较
+        logger.info("获取过去3年的半年度历史数据进行分析...")
+    else:  # append
+        logger.info("执行增量模式 - 只处理最新半年度数据")
+    
+    # 调用实际处理函数
+    result = process_half_yearly_book_sales()
+    
+    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    logger.info(f"处理结果: {'成功' if result else '失败'}")
+    
+    return result
+
+if __name__ == "__main__":
+    # 直接执行时调用统一入口函数
+    run(table_name="book_sale_amt_half_yearly", execution_mode="append")