ソースを参照

发现monthly依赖处理逻辑的问题,现在准备统一修改

wangxq 2 ヶ月 前
コミット
23a8f2f9ad

+ 69 - 17
dags/dag_data_model_monthly.py

@@ -7,10 +7,52 @@ from utils import get_enabled_tables, is_data_model_table, run_model_script, get
 from config import NEO4J_CONFIG
 import pendulum
 import logging
+import networkx as nx
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
 
+def generate_optimized_execution_order(table_names: list) -> list:
+    """
+    生成优化的执行顺序,可处理循环依赖    
+    参数:
+        table_names: 表名列表    
+    返回:
+        list: 优化后的执行顺序列表
+    """
+    # 创建依赖图
+    G = nx.DiGraph()
+    
+    # 添加所有节点
+    for table_name in table_names:
+        G.add_node(table_name)
+    
+    # 添加依赖边
+    dependency_dict = get_model_dependency_graph(table_names)
+    for target, upstreams in dependency_dict.items():
+        for upstream in upstreams:
+            if upstream in table_names:  # 确保只考虑目标表集合中的表
+                G.add_edge(upstream, target)
+    
+    # 检测循环依赖
+    cycles = list(nx.simple_cycles(G))
+    if cycles:
+        logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
+        # 打破循环依赖(简单策略:移除每个循环中的一条边)
+        for cycle in cycles:
+            # 移除循环中的最后一条边
+            G.remove_edge(cycle[-1], cycle[0])
+            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
+    
+    # 生成拓扑排序
+    try:
+        execution_order = list(nx.topological_sort(G))
+        return execution_order
+    except Exception as e:
+        logger.error(f"生成执行顺序失败: {str(e)}")
+        # 返回原始列表作为备选
+        return table_names
+
 def is_first_day():
     return True
     # 生产环境中应使用实际判断
@@ -51,9 +93,15 @@ with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_int
                 logger.info("没有找到需要处理的月模型表,DAG将直接标记为完成")
                 wait_for_weekly >> monthly_completed
             else:
-                # 获取依赖图
+                # 获取表名列表
+                table_names = [t['table_name'] for t in model_tables]
+                
+                # 使用优化函数生成执行顺序,可以处理循环依赖
+                optimized_table_order = generate_optimized_execution_order(table_names)
+                logger.info(f"生成优化执行顺序, 共 {len(optimized_table_order)} 个表")
+                
+                # 获取依赖图 (仍然需要用于设置任务依赖关系)
                 try:
-                    table_names = [t['table_name'] for t in model_tables]
                     dependency_graph = get_model_dependency_graph(table_names)
                     logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
                 except Exception as e:
@@ -64,20 +112,23 @@ with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_int
 
                 # 构建 task 对象
                 task_dict = {}
-                for item in model_tables:
-                    try:
-                        task = PythonOperator(
-                            task_id=f"process_monthly_{item['table_name']}",
-                            python_callable=run_model_script,
-                            op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
-                        )
-                        task_dict[item['table_name']] = task
-                        logger.info(f"创建模型处理任务: process_monthly_{item['table_name']}")
-                    except Exception as e:
-                        logger.error(f"创建任务 process_monthly_{item['table_name']} 时出错: {str(e)}")
-                        # 出错时也要确保完成标记被触发
-                        wait_for_weekly >> monthly_completed
-                        raise
+                for table_name in optimized_table_order:
+                    # 获取表的配置信息
+                    table_config = next((t for t in model_tables if t['table_name'] == table_name), None)
+                    if table_config:
+                        try:
+                            task = PythonOperator(
+                                task_id=f"process_monthly_{table_name}",
+                                python_callable=run_model_script,
+                                op_kwargs={"table_name": table_name, "execution_mode": table_config['execution_mode']},
+                            )
+                            task_dict[table_name] = task
+                            logger.info(f"创建模型处理任务: process_monthly_{table_name}")
+                        except Exception as e:
+                            logger.error(f"创建任务 process_monthly_{table_name} 时出错: {str(e)}")
+                            # 出错时也要确保完成标记被触发
+                            wait_for_weekly >> monthly_completed
+                            raise
 
                 # 建立任务依赖(基于 DERIVED_FROM 图)
                 dependency_count = 0
@@ -101,7 +152,8 @@ with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_int
                 if top_level_tasks:
                     logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
                     for name in top_level_tasks:
-                        wait_for_weekly >> task_dict[name]
+                        if name in task_dict:
+                            wait_for_weekly >> task_dict[name]
                 else:
                     logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")
                     # 如果没有顶层任务,直接将等待任务与完成标记相连接

+ 70 - 18
dags/dag_data_model_weekly.py

@@ -8,10 +8,52 @@ from utils import get_enabled_tables, is_data_model_table, run_model_script, get
 from config import NEO4J_CONFIG
 import pendulum
 import logging
+import networkx as nx
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
 
+def generate_optimized_execution_order(table_names: list) -> list:
+    """
+    生成优化的执行顺序,可处理循环依赖    
+    参数:
+        table_names: 表名列表    
+    返回:
+        list: 优化后的执行顺序列表
+    """
+    # 创建依赖图
+    G = nx.DiGraph()
+    
+    # 添加所有节点
+    for table_name in table_names:
+        G.add_node(table_name)
+    
+    # 添加依赖边
+    dependency_dict = get_model_dependency_graph(table_names)
+    for target, upstreams in dependency_dict.items():
+        for upstream in upstreams:
+            if upstream in table_names:  # 确保只考虑目标表集合中的表
+                G.add_edge(upstream, target)
+    
+    # 检测循环依赖
+    cycles = list(nx.simple_cycles(G))
+    if cycles:
+        logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
+        # 打破循环依赖(简单策略:移除每个循环中的一条边)
+        for cycle in cycles:
+            # 移除循环中的最后一条边
+            G.remove_edge(cycle[-1], cycle[0])
+            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
+    
+    # 生成拓扑排序
+    try:
+        execution_order = list(nx.topological_sort(G))
+        return execution_order
+    except Exception as e:
+        logger.error(f"生成执行顺序失败: {str(e)}")
+        # 返回原始列表作为备选
+        return table_names
+
 def is_monday():
     return True
     #return pendulum.now().day_of_week == 0
@@ -51,9 +93,15 @@ with DAG("dag_data_model_weekly", start_date=datetime(2024, 1, 1), schedule_inte
                 logger.info("没有找到需要处理的周模型表,DAG将直接标记为完成")
                 wait_for_daily >> weekly_completed
             else:
-                # 获取依赖图
+                # 获取表名列表
+                table_names = [t['table_name'] for t in model_tables]
+                
+                # 使用优化函数生成执行顺序,可以处理循环依赖
+                optimized_table_order = generate_optimized_execution_order(table_names)
+                logger.info(f"生成优化执行顺序, 共 {len(optimized_table_order)} 个表")
+                
+                # 获取依赖图 (仍然需要用于设置任务依赖关系)
                 try:
-                    table_names = [t['table_name'] for t in model_tables]
                     dependency_graph = get_model_dependency_graph(table_names)
                     logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
                 except Exception as e:
@@ -64,21 +112,24 @@ with DAG("dag_data_model_weekly", start_date=datetime(2024, 1, 1), schedule_inte
 
                 # 构建 task 对象
                 task_dict = {}
-                for item in model_tables:
-                    try:
-                        task = PythonOperator(
-                            task_id=f"process_weekly_{item['table_name']}",
-                            python_callable=run_model_script,
-                            op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
-                        )
-                        task_dict[item['table_name']] = task
-                        logger.info(f"创建模型处理任务: process_weekly_{item['table_name']}")
-                    except Exception as e:
-                        logger.error(f"创建任务 process_weekly_{item['table_name']} 时出错: {str(e)}")
-                        # 出错时也要确保完成标记被触发
-                        wait_for_daily >> weekly_completed
-                        raise
-
+                for table_name in optimized_table_order:
+                    # 获取表的配置信息
+                    table_config = next((t for t in model_tables if t['table_name'] == table_name), None)
+                    if table_config:
+                        try:
+                            task = PythonOperator(
+                                task_id=f"process_weekly_{table_name}",
+                                python_callable=run_model_script,
+                                op_kwargs={"table_name": table_name, "execution_mode": table_config['execution_mode']},
+                            )
+                            task_dict[table_name] = task
+                            logger.info(f"创建模型处理任务: process_weekly_{table_name}")
+                        except Exception as e:
+                            logger.error(f"创建任务 process_weekly_{table_name} 时出错: {str(e)}")
+                            # 出错时也要确保完成标记被触发
+                            wait_for_daily >> weekly_completed
+                            raise
+                
                 # 建立任务依赖(基于 DERIVED_FROM 图)
                 dependency_count = 0
                 for target, upstream_list in dependency_graph.items():
@@ -101,7 +152,8 @@ with DAG("dag_data_model_weekly", start_date=datetime(2024, 1, 1), schedule_inte
                 if top_level_tasks:
                     logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
                     for name in top_level_tasks:
-                        wait_for_daily >> task_dict[name]
+                        if name in task_dict:
+                            wait_for_daily >> task_dict[name]
                 else:
                     logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")
                     # 如果没有顶层任务,直接将等待任务与完成标记相连接

+ 87 - 0
dataops/scripts/book_sale_amt_weekly_process.py

@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import sys
+import os
+from datetime import datetime, timedelta
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("book_sale_amt_weekly_process")
+
+def process_weekly_book_sales():
+    """处理周度图书销售额数据的函数"""
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    logger.info(f"开始执行周度图书销售额处理 - 脚本: {script_name}")
+    
+    try:
+        # 模拟数据处理过程
+        logger.info("从数据源获取原始销售数据...")
+        # 实际应用中这里会连接到数据库或其他数据源
+        
+        logger.info("按周汇总销售额...")
+        # 模拟处理步骤
+        today = datetime.now()
+        # 计算本周的开始日期(周一)和结束日期(周日)
+        start_of_week = today - timedelta(days=today.weekday())
+        end_of_week = start_of_week + timedelta(days=6)
+        week_range = f"{start_of_week.strftime('%Y-%m-%d')} 至 {end_of_week.strftime('%Y-%m-%d')}"
+        
+        logger.info(f"正在处理周期 {week_range} 的数据")
+        
+        logger.info("数据处理完成,准备保存结果...")
+        # 实际应用中这里会将结果保存到数据库
+        
+        return True
+    except Exception as e:
+        logger.error(f"处理周度图书销售额时出错: {str(e)}")
+        return False
+
+def run(table_name, execution_mode, **kwargs):
+    """
+    统一入口函数,符合Airflow动态脚本调用规范
+    
+    参数:
+        table_name (str): 要处理的表名
+        execution_mode (str): 执行模式 (append/full_refresh)
+        **kwargs: 其他可能的参数
+    
+    返回:
+        bool: 执行成功返回True,否则返回False
+    """
+    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    # 记录详细的执行信息
+    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    
+    # 根据执行模式判断处理逻辑
+    if execution_mode == "full_refresh":
+        logger.info("执行完全刷新模式 - 将处理所有历史数据")
+    else:  # append
+        logger.info("执行增量模式 - 只处理最新数据")
+    
+    # 调用实际处理函数
+    result = process_weekly_book_sales()
+    
+    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    logger.info(f"处理结果: {'成功' if result else '失败'}")
+    
+    return result
+
+if __name__ == "__main__":
+    # 直接执行时调用统一入口函数
+    run(table_name="book_sale_amt_weekly", execution_mode="append") 

+ 88 - 0
dataops/scripts/book_sale_amt_yearly_process.py

@@ -0,0 +1,88 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import logging
+import sys
+import os
+from datetime import datetime, timedelta
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("book_sale_amt_yearly_process")
+
+def process_yearly_book_sales():
+    """处理年度图书销售额数据的函数"""
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    logger.info(f"开始执行年度图书销售额处理 - 脚本: {script_name}")
+    
+    try:
+        # 模拟数据处理过程
+        logger.info("从数据源获取原始销售数据...")
+        # 实际应用中这里会连接到数据库或其他数据源
+        
+        logger.info("按年汇总销售额...")
+        # 模拟处理步骤
+        current_year = datetime.now().strftime("%Y")
+        logger.info(f"正在处理 {current_year} 年的数据")
+        
+        logger.info("计算同比增长率...")
+        logger.info("计算各月份占比...")
+        logger.info("生成年度销售趋势分析...")
+        
+        logger.info("数据处理完成,准备保存结果...")
+        # 实际应用中这里会将结果保存到数据库
+        
+        return True
+    except Exception as e:
+        logger.error(f"处理年度图书销售额时出错: {str(e)}")
+        return False
+
+def run(table_name, execution_mode, **kwargs):
+    """
+    统一入口函数,符合Airflow动态脚本调用规范
+    
+    参数:
+        table_name (str): 要处理的表名
+        execution_mode (str): 执行模式 (append/full_refresh)
+        **kwargs: 其他可能的参数
+    
+    返回:
+        bool: 执行成功返回True,否则返回False
+    """
+    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    
+    # 获取当前脚本的文件名
+    script_name = os.path.basename(__file__)
+    
+    # 记录详细的执行信息
+    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    
+    # 根据执行模式判断处理逻辑
+    if execution_mode == "full_refresh":
+        logger.info("执行完全刷新模式 - 将处理所有历史数据")
+        # 年度数据通常需要处理多年的历史数据进行比较
+        logger.info("获取过去5年的历史数据进行分析...")
+    else:  # append
+        logger.info("执行增量模式 - 只处理最新年度数据")
+    
+    # 调用实际处理函数
+    result = process_yearly_book_sales()
+    
+    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    logger.info(f"处理结果: {'成功' if result else '失败'}")
+    
+    return result
+
+if __name__ == "__main__":
+    # 直接执行时调用统一入口函数
+    run(table_name="book_sale_amt_yearly", execution_mode="append")