瀏覽代碼

增加manual_trigger_dag.py对logical_date的转换功能.

wangxq 3 周之前
父節點
當前提交
a383754a13
共有 1 個文件被更改,包括 67 次插入9 次删除
  1. 67 9
      dags/dataops_productline_manual_trigger_dag.py

+ 67 - 9
dags/dataops_productline_manual_trigger_dag.py

@@ -15,6 +15,9 @@
   - 'python_script':执行物理Python脚本文件
   - 'python_script':执行物理Python脚本文件
   - 'python':从data_transform_scripts表获取Python脚本内容并执行
   - 'python':从data_transform_scripts表获取Python脚本内容并执行
   - 'sql':从data_transform_scripts表获取SQL脚本内容并执行
   - 'sql':从data_transform_scripts表获取SQL脚本内容并执行
+- 支持 Logical date 参数:
+  - 可以在Airflow UI中选择特定的Logical date
+  - 选择的日期将被转换为执行日期并传递给脚本
 
 
 参数:
 参数:
 - script_name:[可选] 目标脚本名称
 - script_name:[可选] 目标脚本名称
@@ -61,6 +64,8 @@ import networkx as nx
 import json
 import json
 from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
 from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
 import traceback
 import traceback
+import pendulum
+import pytz
 
 
 # 设置logger
 # 设置logger
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
@@ -76,6 +81,22 @@ default_args = {
     'retry_delay': timedelta(minutes=1),
     'retry_delay': timedelta(minutes=1),
 }
 }
 
 
+def get_cn_exec_date(logical_date):
+    """
+    获取逻辑执行日期
+    
+    参数:
+        logical_date: 逻辑执行日期,UTC时间
+
+    返回:
+        logical_exec_date: 逻辑执行日期,北京时间
+        local_logical_date: 北京时区的logical_date
+    """
+    # 获取逻辑执行日期
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    return exec_date, local_logical_date
+
 def get_pg_conn():
 def get_pg_conn():
     """获取PostgreSQL连接"""
     """获取PostgreSQL连接"""
     return psycopg2.connect(**PG_CONFIG)
     return psycopg2.connect(**PG_CONFIG)
@@ -124,6 +145,12 @@ def get_dag_params(**context):
     dependency_level = params.get('dependency_level')
     dependency_level = params.get('dependency_level')
     logger.info(f"获取的依赖级别值: {dependency_level}")
     logger.info(f"获取的依赖级别值: {dependency_level}")
 
 
+    # 获取 logical_date
+    dag_run = context.get('dag_run')
+    logical_date = dag_run.logical_date if dag_run else datetime.now()
+    exec_date, local_logical_date = get_cn_exec_date(logical_date)
+    logger.info(f"【时间参数】get_dag_params: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+
     # 验证参数组合
     # 验证参数组合
     if not script_name and not target_table:
     if not script_name and not target_table:
         raise ValueError("必须至少提供script_name或target_table参数之一")
         raise ValueError("必须至少提供script_name或target_table参数之一")
@@ -133,8 +160,8 @@ def get_dag_params(**context):
         logger.warning(f"无效的依赖级别参数: {dependency_level},使用默认值'dependency'")
         logger.warning(f"无效的依赖级别参数: {dependency_level},使用默认值'dependency'")
         dependency_level = 'dependency'
         dependency_level = 'dependency'
     
     
-    logger.info(f"最终使用的参数 - 脚本名称: {script_name}, 目标表: {target_table}, 依赖级别: {dependency_level}")
-    return script_name, target_table, dependency_level
+    logger.info(f"最终使用的参数 - 脚本名称: {script_name}, 目标表: {target_table}, 依赖级别: {dependency_level}, 执行日期: {exec_date}")
+    return script_name, target_table, dependency_level, exec_date, logical_date
 
 
 def get_table_label(table_name):
 def get_table_label(table_name):
     """确定表的标签类型(DataModel or DataResource)"""
     """确定表的标签类型(DataModel or DataResource)"""
@@ -561,6 +588,8 @@ def execute_python_script(script_info):
     target_table_label = script_info.get('target_table_label')
     target_table_label = script_info.get('target_table_label')
     source_tables = script_info.get('source_tables', [])
     source_tables = script_info.get('source_tables', [])
     frequency = script_info.get('frequency', 'daily')
     frequency = script_info.get('frequency', 'daily')
+    # 使用传入的执行日期,如果不存在则使用当前日期
+    exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
     
     
     # 记录开始执行
     # 记录开始执行
     logger.info(f"===== 开始执行物理Python脚本文件: {script_name} =====")
     logger.info(f"===== 开始执行物理Python脚本文件: {script_name} =====")
@@ -569,6 +598,7 @@ def execute_python_script(script_info):
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"源表: {source_tables}")
     logger.info(f"源表: {source_tables}")
     logger.info(f"频率: {frequency}")
     logger.info(f"频率: {frequency}")
+    logger.info(f"执行日期: {exec_date}")
     
     
     # 检查脚本文件是否存在
     # 检查脚本文件是否存在
     exists, script_path = check_script_exists(script_name)
     exists, script_path = check_script_exists(script_name)
@@ -593,7 +623,7 @@ def execute_python_script(script_info):
                 "table_name": target_table,
                 "table_name": target_table,
                 "execution_mode": execution_mode,
                 "execution_mode": execution_mode,
                 "frequency": frequency,
                 "frequency": frequency,
-                "exec_date": datetime.now().strftime('%Y-%m-%d')
+                "exec_date": exec_date  # 使用传入的执行日期而不是当前日期
             }
             }
             
             
             # 如果是structure类型,添加特殊参数
             # 如果是structure类型,添加特殊参数
@@ -642,6 +672,8 @@ def execute_sql(script_info):
     execution_mode = script_info.get('execution_mode', 'append')
     execution_mode = script_info.get('execution_mode', 'append')
     target_table_label = script_info.get('target_table_label')
     target_table_label = script_info.get('target_table_label')
     frequency = script_info.get('frequency', 'daily')
     frequency = script_info.get('frequency', 'daily')
+    # 使用传入的执行日期,如果不存在则使用当前日期
+    exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
     
     
     # 记录开始执行
     # 记录开始执行
     logger.info(f"===== 开始执行SQL脚本: {script_name} =====")
     logger.info(f"===== 开始执行SQL脚本: {script_name} =====")
@@ -649,6 +681,7 @@ def execute_sql(script_info):
     logger.info(f"执行模式: {execution_mode}")
     logger.info(f"执行模式: {execution_mode}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"频率: {frequency}")
     logger.info(f"频率: {frequency}")
+    logger.info(f"执行日期: {exec_date}")
     
     
     try:
     try:
         # 记录执行开始时间
         # 记录执行开始时间
@@ -674,7 +707,7 @@ def execute_sql(script_info):
                 "script_type": "sql",
                 "script_type": "sql",
                 "target_table": target_table,
                 "target_table": target_table,
                 "script_name": script_name,
                 "script_name": script_name,
-                "exec_date": datetime.now().strftime('%Y-%m-%d'),
+                "exec_date": exec_date,  # 使用传入的执行日期而不是当前日期
                 "frequency": frequency,
                 "frequency": frequency,
                 "target_table_label": target_table_label,
                 "target_table_label": target_table_label,
                 "execution_mode": execution_mode
                 "execution_mode": execution_mode
@@ -726,6 +759,8 @@ def execute_python(script_info):
     execution_mode = script_info.get('execution_mode', 'append')
     execution_mode = script_info.get('execution_mode', 'append')
     target_table_label = script_info.get('target_table_label')
     target_table_label = script_info.get('target_table_label')
     frequency = script_info.get('frequency', 'daily')
     frequency = script_info.get('frequency', 'daily')
+    # 使用传入的执行日期,如果不存在则使用当前日期
+    exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
     
     
     # 记录开始执行
     # 记录开始执行
     logger.info(f"===== 开始执行Python脚本(data_transform_scripts): {script_name} =====")
     logger.info(f"===== 开始执行Python脚本(data_transform_scripts): {script_name} =====")
@@ -733,6 +768,7 @@ def execute_python(script_info):
     logger.info(f"执行模式: {execution_mode}")
     logger.info(f"执行模式: {execution_mode}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"频率: {frequency}")
     logger.info(f"频率: {frequency}")
+    logger.info(f"执行日期: {exec_date}")
     
     
     try:
     try:
         # 记录执行开始时间
         # 记录执行开始时间
@@ -758,7 +794,7 @@ def execute_python(script_info):
                 "script_type": "python",
                 "script_type": "python",
                 "target_table": target_table,
                 "target_table": target_table,
                 "script_name": script_name,
                 "script_name": script_name,
-                "exec_date": datetime.now().strftime('%Y-%m-%d'),
+                "exec_date": exec_date,  # 使用传入的执行日期而不是当前日期
                 "frequency": frequency,
                 "frequency": frequency,
                 "target_table_label": target_table_label,
                 "target_table_label": target_table_label,
                 "execution_mode": execution_mode
                 "execution_mode": execution_mode
@@ -894,7 +930,7 @@ def prepare_dependency_chain(**context):
     准备依赖链并保存到XCom
     准备依赖链并保存到XCom
     """
     """
     # 获取脚本和表名参数
     # 获取脚本和表名参数
-    script_name, target_table, dependency_level = get_dag_params(**context)
+    script_name, target_table, dependency_level, exec_date, logical_date = get_dag_params(**context)
     
     
     # 记录依赖级别信息
     # 记录依赖级别信息
     logger.info(f"依赖级别说明:")
     logger.info(f"依赖级别说明:")
@@ -902,6 +938,7 @@ def prepare_dependency_chain(**context):
     logger.info(f"- dependency: 处理直接依赖")
     logger.info(f"- dependency: 处理直接依赖")
     logger.info(f"- full: 处理所有间接依赖")
     logger.info(f"- full: 处理所有间接依赖")
     logger.info(f"当前依赖级别: {dependency_level}")
     logger.info(f"当前依赖级别: {dependency_level}")
+    logger.info(f"执行日期: {exec_date}")
     
     
     # 准备脚本信息
     # 准备脚本信息
     script_infos = prepare_script_info(script_name, target_table, dependency_level)
     script_infos = prepare_script_info(script_name, target_table, dependency_level)
@@ -941,9 +978,11 @@ def prepare_dependency_chain(**context):
         logger.error("没有找到任何有效的依赖链")
         logger.error("没有找到任何有效的依赖链")
         return False
         return False
     
     
-    # 保存依赖链到XCom
+    # 保存依赖链和执行日期到XCom
     ti = context['ti']
     ti = context['ti']
     ti.xcom_push(key='dependency_chain', value=unique_dependencies)
     ti.xcom_push(key='dependency_chain', value=unique_dependencies)
+    ti.xcom_push(key='exec_date', value=exec_date)
+    ti.xcom_push(key='logical_date', value=logical_date.isoformat() if isinstance(logical_date, datetime) else logical_date)
     
     
     logger.info(f"成功准备了 {len(unique_dependencies)} 个脚本的依赖链")
     logger.info(f"成功准备了 {len(unique_dependencies)} 个脚本的依赖链")
     return True
     return True
@@ -952,9 +991,22 @@ def execute_script_chain(**context):
     """
     """
     执行依赖链中的所有脚本
     执行依赖链中的所有脚本
     """
     """
-    # 获取依赖链
+    # 获取依赖链和执行日期
     ti = context['ti']
     ti = context['ti']
     dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
     dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
+    exec_date = ti.xcom_pull(task_ids='prepare_dependency_chain', key='exec_date')
+    logical_date_str = ti.xcom_pull(task_ids='prepare_dependency_chain', key='logical_date')
+    
+    # 转换logical_date为datetime对象(如果是字符串)
+    if isinstance(logical_date_str, str):
+        try:
+            logical_date = pendulum.parse(logical_date_str)
+        except:
+            logical_date = datetime.now()
+    else:
+        logical_date = datetime.now()
+    
+    logger.info(f"【时间参数】execute_script_chain: exec_date={exec_date}, logical_date={logical_date}")
     
     
     if not dependency_chain:
     if not dependency_chain:
         logger.error("没有找到依赖链,无法执行脚本")
         logger.error("没有找到依赖链,无法执行脚本")
@@ -979,6 +1031,10 @@ def execute_script_chain(**context):
         # 根据脚本类型选择执行函数
         # 根据脚本类型选择执行函数
         executor = choose_executor(script_info)
         executor = choose_executor(script_info)
         
         
+        # 将执行日期添加到脚本信息中
+        script_info['exec_date'] = exec_date
+        script_info['logical_date'] = logical_date
+        
         # 执行脚本
         # 执行脚本
         success = executor(script_info)
         success = executor(script_info)
         
         
@@ -1010,6 +1066,7 @@ def generate_execution_report(**context):
     ti = context['ti']
     ti = context['ti']
     results = ti.xcom_pull(task_ids='execute_script_chain', key='execution_results')
     results = ti.xcom_pull(task_ids='execute_script_chain', key='execution_results')
     dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
     dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
+    exec_date = ti.xcom_pull(task_ids='prepare_dependency_chain', key='exec_date')
     
     
     if not results:
     if not results:
         report = "未找到执行结果,无法生成报告"
         report = "未找到执行结果,无法生成报告"
@@ -1032,7 +1089,8 @@ def generate_execution_report(**context):
     # 构建报告
     # 构建报告
     report = []
     report = []
     report.append("\n========== 脚本执行报告 ==========")
     report.append("\n========== 脚本执行报告 ==========")
-    report.append(f"执行日期: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
+    report.append(f"执行日期: {exec_date}")
+    report.append(f"报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
     report.append(f"总脚本数: {total}")
     report.append(f"总脚本数: {total}")
     report.append(f"成功数: {success_count}")
     report.append(f"成功数: {success_count}")
     report.append(f"失败数: {fail_count}")
     report.append(f"失败数: {fail_count}")