Selaa lähdekoodia

修正了手工传参的大小写问题

wangxq 1 kuukausi sitten
vanhempi
commit
832dbc08c2

+ 75 - 2
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -42,7 +42,7 @@ def get_enabled_tables():
         conn.close()
 
 def check_table_directly_subscribed(table_name):
-    """检查表是否在schedule_status表中直接订阅"""
+    """检查表是否在schedule_status表中直接调度"""
     conn = get_pg_conn()
     cursor = conn.cursor()
     try:
@@ -60,6 +60,64 @@ def check_table_directly_subscribed(table_name):
         cursor.close()
         conn.close()
 
+
+def should_execute_today(table_name, frequency, exec_date):
+    """
+    判断指定频率的表在给定执行日期是否应该执行
+    
+    参数:
+        table_name (str): 表名,用于日志记录
+        frequency (str): 调度频率,如'daily'、'weekly'、'monthly'、'yearly',为None时默认为'daily'
+        exec_date (str): 执行日期,格式为'YYYY-MM-DD'
+    
+    返回:
+        bool: 如果该表应该在执行日期执行,则返回True,否则返回False
+    """
+    # 将执行日期字符串转换为pendulum日期对象
+    try:
+        exec_date_obj = pendulum.parse(exec_date)
+    except Exception as e:
+        logger.error(f"解析执行日期 {exec_date} 出错: {str(e)},使用当前日期")
+        exec_date_obj = pendulum.today()
+    
+    # 计算下一个日期,用于判断是否是月初、周初等
+    next_date = exec_date_obj.add(days=1)
+    
+    # 如果频率为None或空字符串,默认为daily
+    if not frequency:
+        logger.info(f"表 {table_name} 未指定调度频率,默认为daily")
+        return True
+    
+    frequency = frequency.lower() if isinstance(frequency, str) else 'daily'
+    
+    if frequency == 'daily':
+        # 日任务每天都执行
+        return True
+    elif frequency == 'weekly':
+        # 周任务只在周日执行(因为exec_date+1是周一时才执行)
+        is_sunday = next_date.day_of_week == 1  # 1表示周一
+        logger.info(f"表 {table_name} 是weekly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否周日: {is_sunday}")
+        return is_sunday
+    elif frequency == 'monthly':
+        # 月任务只在每月最后一天执行(因为exec_date+1是月初时才执行)
+        is_month_end = next_date.day == 1
+        logger.info(f"表 {table_name} 是monthly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否月末: {is_month_end}")
+        return is_month_end
+    elif frequency == 'quarterly':
+        # 季度任务只在每季度最后一天执行(因为exec_date+1是季度初时才执行)
+        is_quarter_end = next_date.day == 1 and next_date.month in [1, 4, 7, 10]
+        logger.info(f"表 {table_name} 是quarterly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否季末: {is_quarter_end}")
+        return is_quarter_end
+    elif frequency == 'yearly':
+        # 年任务只在每年最后一天执行(因为exec_date+1是年初时才执行)
+        is_year_end = next_date.day == 1 and next_date.month == 1
+        logger.info(f"表 {table_name} 是yearly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否年末: {is_year_end}")
+        return is_year_end
+    else:
+        # 未知频率,默认执行
+        logger.warning(f"表 {table_name} 使用未知的调度频率: {frequency},默认执行")
+        return True
+
 def get_table_info_from_neo4j(table_name):
     """从Neo4j获取表的详细信息"""
     driver = get_neo4j_driver()
@@ -465,8 +523,23 @@ def prepare_pipeline_dag_schedule(**kwargs):
     
     logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
     
+    # 2.1 根据调度频率过滤表(新增的步骤)
+    filtered_tables_info = []
+    for table_info in tables_info:
+        table_name = table_info['target_table']
+        frequency = table_info.get('default_update_frequency')
+        
+        if should_execute_today(table_name, frequency, exec_date):
+            filtered_tables_info.append(table_info)
+            logger.info(f"表 {table_name} (频率: {frequency}) 将在今天{exec_date}执行")
+        else:
+            logger.info(f"表 {table_name} (频率: {frequency}) 今天{exec_date}不执行,已过滤")
+    
+    logger.info(f"按调度频率过滤后,今天{exec_date}需要执行的表有 {len(filtered_tables_info)} 个")
+
+
     # 3. 处理依赖关系,添加被动调度的表
-    enriched_tables = process_dependencies(tables_info)
+    enriched_tables = process_dependencies(filtered_tables_info)
     logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
     
     # 4. 过滤无效表及其依赖

+ 15 - 28
dags/dag_manual_dependency_trigger.py

@@ -10,15 +10,15 @@
   - 'source':查找并执行完整依赖链到Source层
 
 参数:
-- TABLE_NAME:目标表名
-- DEPENDENCY_LEVEL/UPPER_LEVEL_STOP:依赖级别
+- table_name:目标表名
+- dependency_level:依赖级别
 
 使用示例:
 ```
 {
   "conf": {
-    "TABLE_NAME": "book_sale_amt_2yearly",
-    "DEPENDENCY_LEVEL": "resource"
+    "table_name": "book_sale_amt_2yearly",
+    "dependency_level": "resource"
   }
 }
 ```
@@ -88,19 +88,15 @@ def get_execution_mode(table_name):
 def get_dag_params(**context):
     """获取DAG运行参数"""
     params = context.get('params', {})
-    table_name = params.get('TABLE_NAME')
+    table_name = params.get('table_name')
     
     # 记录原始参数信息
     logger.info(f"接收到的原始参数: {params}")
     
-    # 同时检查DEPENDENCY_LEVEL和UPPER_LEVEL_STOP参数,兼容两种参数名
-    dependency_level = params.get('DEPENDENCY_LEVEL')
-    logger.info(f"从DEPENDENCY_LEVEL获取的值: {dependency_level}")
-    
-    if dependency_level is None:
-        dependency_level = params.get('UPPER_LEVEL_STOP', 'resource')  # 兼容旧参数名
-        logger.info(f"从UPPER_LEVEL_STOP获取的值: {dependency_level}")
-    
+    # 获取依赖级别参数
+    dependency_level = params.get('dependency_level')
+    logger.info(f"获取的依赖级别值: {dependency_level}")
+
     if not table_name:
         raise ValueError("必须提供TABLE_NAME参数")
     
@@ -409,11 +405,9 @@ def build_dependency_chain_nx(start_table, dependency_level='resource'):
 
 def execute_scripts(scripts_list):
     """
-    执行指定的脚本列表
-    
+    执行指定的脚本列表    
     参数:
-        scripts_list (list): 要执行的脚本信息列表,每项包含table_name, script_name, execution_mode
-        
+        scripts_list (list): 要执行的脚本信息列表,每项包含table_name, script_name, execution_mode        
     返回:
         bool: 全部执行成功返回True,任一失败返回False
     """
@@ -557,7 +551,7 @@ def process_resources(**context):
     table_name = None
     if dependency_level == 'self':
         params = context.get('params', {})
-        table_name = params.get('TABLE_NAME') or params.get('table_name')
+        table_name = params.get('table_name')
         logger.info(f"依赖级别为'self',目标表: {table_name}")
     
     # 根据依赖级别过滤要执行的脚本
@@ -605,7 +599,7 @@ def process_models(**context):
     
     # 获取表名(在所有级别都需要)
     params = context.get('params', {})
-    table_name = params.get('TABLE_NAME') or params.get('table_name')
+    table_name = params.get('table_name')
     logger.info(f"目标表: {table_name}")
     
     # 如果依赖级别是'self',只处理起始表
@@ -638,19 +632,12 @@ with DAG(
     catchup=False,
     is_paused_upon_creation=True,  # 添加这一行,使DAG创建时不处于暂停状态
     params={
-        'TABLE_NAME': '',
-        'DEPENDENCY_LEVEL': {
+        'table_name': '',
+        'dependency_level': {
             'type': 'string',
             'enum': ['self', 'resource', 'source'],
             'default': 'resource',
             'description': '依赖级别: self-仅本表, resource-到Resource层(不执行Resource脚本), source-到Source层'
-        },
-        # 添加旧参数名,保持兼容性
-        'UPPER_LEVEL_STOP': {
-            'type': 'string',
-            'enum': ['self', 'resource', 'source'],
-            'default': 'resource',
-            'description': '依赖级别(旧参数名): self-仅本表, resource-到Resource层(不执行Resource脚本), source-到Source层'
         }
     },
 ) as dag: