Sfoglia il codice sorgente

更新执行时长的功能没有成功

wangxq 2 mesi fa
parent
commit
09b2edddcd
2 ha cambiato i file con 523 aggiunte e 63 eliminazioni
  1. 248 61
      dags/common.py
  2. 275 2
      dags/dag_dataops_pipeline_data_scheduler.py

+ 248 - 61
dags/common.py

@@ -8,6 +8,8 @@ import networkx as nx
 import os
 from datetime import datetime, timedelta
 from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
+import functools
+import time
 
 # 创建统一的日志记录器
 logger = logging.getLogger("airflow.task")
@@ -24,43 +26,166 @@ def get_neo4j_driver():
 
 def update_task_start_time(exec_date, target_table, script_name, start_time):
     """更新任务开始时间"""
+    logger.info(f"===== 更新任务开始时间 =====")
+    logger.info(f"参数: exec_date={exec_date} ({type(exec_date).__name__}), target_table={target_table}, script_name={script_name}")
+    
     conn = get_pg_conn()
     cursor = conn.cursor()
     try:
+        # 首先检查记录是否存在
         cursor.execute("""
+            SELECT COUNT(*) 
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND target_table = %s AND script_name = %s
+        """, (exec_date, target_table, script_name))
+        count = cursor.fetchone()[0]
+        logger.info(f"查询到符合条件的记录数: {count}")
+        
+        if count == 0:
+            logger.warning(f"未找到匹配的记录: exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
+            logger.info("尝试记录在airflow_dag_schedule表中找到的记录:")
+            cursor.execute("""
+                SELECT exec_date, target_table, script_name
+                FROM airflow_dag_schedule
+                LIMIT 5
+            """)
+            sample_records = cursor.fetchall()
+            for record in sample_records:
+                logger.info(f"样本记录: exec_date={record[0]} ({type(record[0]).__name__}), target_table={record[1]}, script_name={record[2]}")
+        
+        # 执行更新
+        sql = """
             UPDATE airflow_dag_schedule 
             SET exec_start_time = %s
             WHERE exec_date = %s AND target_table = %s AND script_name = %s
-        """, (start_time, exec_date, target_table, script_name))
+        """
+        logger.info(f"执行SQL: {sql}")
+        logger.info(f"参数: start_time={start_time}, exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
+        
+        cursor.execute(sql, (start_time, exec_date, target_table, script_name))
+        affected_rows = cursor.rowcount
+        logger.info(f"更新影响的行数: {affected_rows}")
+        
         conn.commit()
+        logger.info("事务已提交")
     except Exception as e:
         logger.error(f"更新任务开始时间失败: {str(e)}")
+        import traceback
+        logger.error(f"错误堆栈: {traceback.format_exc()}")
         conn.rollback()
+        logger.info("事务已回滚")
+        raise
     finally:
         cursor.close()
         conn.close()
+        logger.info("数据库连接已关闭")
+        logger.info("===== 更新任务开始时间完成 =====")
 
 def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
     """更新任务完成信息"""
+    logger.info(f"===== 更新任务完成信息 =====")
+    logger.info(f"参数: exec_date={exec_date} ({type(exec_date).__name__}), target_table={target_table}, script_name={script_name}")
+    logger.info(f"参数: success={success} ({type(success).__name__}), end_time={end_time}, duration={duration}")
+    
     conn = get_pg_conn()
     cursor = conn.cursor()
     try:
+        # 首先检查记录是否存在
         cursor.execute("""
+            SELECT COUNT(*) 
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND target_table = %s AND script_name = %s
+        """, (exec_date, target_table, script_name))
+        count = cursor.fetchone()[0]
+        logger.info(f"查询到符合条件的记录数: {count}")
+        
+        if count == 0:
+            logger.warning(f"未找到匹配的记录: exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
+            # 查询表中前几条记录作为参考
+            cursor.execute("""
+                SELECT exec_date, target_table, script_name
+                FROM airflow_dag_schedule
+                LIMIT 5
+            """)
+            sample_records = cursor.fetchall()
+            logger.info("airflow_dag_schedule表中的样本记录:")
+            for record in sample_records:
+                logger.info(f"样本记录: exec_date={record[0]} ({type(record[0]).__name__}), target_table={record[1]}, script_name={record[2]}")
+        
+        # 确保success是布尔类型
+        if not isinstance(success, bool):
+            original_success = success
+            success = bool(success)
+            logger.warning(f"success参数不是布尔类型,原始值: {original_success},转换为: {success}")
+        
+        # 执行更新
+        sql = """
             UPDATE airflow_dag_schedule 
             SET exec_result = %s, exec_end_time = %s, exec_duration = %s
             WHERE exec_date = %s AND target_table = %s AND script_name = %s
-        """, (success, end_time, duration, exec_date, target_table, script_name))
+        """
+        logger.info(f"执行SQL: {sql}")
+        logger.info(f"参数: success={success}, end_time={end_time}, duration={duration}, exec_date={exec_date}, target_table={target_table}, script_name={script_name}")
+        
+        cursor.execute(sql, (success, end_time, duration, exec_date, target_table, script_name))
+        affected_rows = cursor.rowcount
+        logger.info(f"更新影响的行数: {affected_rows}")
+        
+        if affected_rows == 0:
+            logger.warning("更新操作没有影响任何行,可能是因为条件不匹配")
+            # 尝试用不同格式的exec_date查询
+            if isinstance(exec_date, str):
+                try:
+                    # 尝试解析日期字符串
+                    from datetime import datetime
+                    parsed_date = datetime.strptime(exec_date, "%Y-%m-%d").date()
+                    logger.info(f"尝试使用解析后的日期格式: {parsed_date}")
+                    
+                    cursor.execute("""
+                        SELECT COUNT(*) 
+                        FROM airflow_dag_schedule 
+                        WHERE exec_date = %s AND target_table = %s AND script_name = %s
+                    """, (parsed_date, target_table, script_name))
+                    parsed_count = cursor.fetchone()[0]
+                    logger.info(f"使用解析日期后查询到的记录数: {parsed_count}")
+                    
+                    if parsed_count > 0:
+                        # 尝试用解析的日期更新
+                        cursor.execute("""
+                            UPDATE airflow_dag_schedule 
+                            SET exec_result = %s, exec_end_time = %s, exec_duration = %s
+                            WHERE exec_date = %s AND target_table = %s AND script_name = %s
+                        """, (success, end_time, duration, parsed_date, target_table, script_name))
+                        new_affected_rows = cursor.rowcount
+                        logger.info(f"使用解析日期后更新影响的行数: {new_affected_rows}")
+                except Exception as parse_e:
+                    logger.error(f"尝试解析日期格式时出错: {str(parse_e)}")
+        
         conn.commit()
+        logger.info("事务已提交")
     except Exception as e:
         logger.error(f"更新任务完成信息失败: {str(e)}")
+        import traceback
+        logger.error(f"错误堆栈: {traceback.format_exc()}")
         conn.rollback()
+        logger.info("事务已回滚")
+        raise
     finally:
         cursor.close()
         conn.close()
+        logger.info("数据库连接已关闭")
+        logger.info("===== 更新任务完成信息完成 =====")
 
 def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
     """执行脚本并监控执行情况"""
 
+    # 添加详细日志
+    logger.info(f"===== 开始监控执行 =====")
+    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
+    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
+    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
+    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+
     # 检查script_name是否为空
     if not script_name:
         logger.error(f"表 {target_table} 的script_name为空,无法执行")
@@ -70,89 +195,151 @@ def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_da
         return False
     # 记录执行开始时间
     start_time = datetime.now()
-    update_task_start_time(exec_date, target_table, script_name, start_time)
+    
+    # 尝试更新开始时间并记录结果
+    try:
+        update_task_start_time(exec_date, target_table, script_name, start_time)
+        logger.info(f"成功更新任务开始时间: {start_time}")
+    except Exception as e:
+        logger.error(f"更新任务开始时间失败: {str(e)}")
     
     try:
         # 执行实际脚本
-        success = execute_script(script_name, target_table, script_exec_mode)
+        logger.info(f"开始执行脚本: {script_name}")
+        result = execute_script(script_name, target_table, script_exec_mode)
+        logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
+        
+        # 确保result是布尔值
+        if result is None:
+            logger.warning(f"脚本返回值为None,转换为False")
+            result = False
+        elif not isinstance(result, bool):
+            original_result = result
+            result = bool(result)
+            logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
         
         # 记录结束时间和结果
         end_time = datetime.now()
         duration = (end_time - start_time).total_seconds()
-        update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
         
-        return success
+        # 尝试更新完成状态并记录结果
+        try:
+            logger.info(f"尝试更新完成状态: result={result}, end_time={end_time}, duration={duration}")
+            update_task_completion(exec_date, target_table, script_name, result, end_time, duration)
+            logger.info(f"成功更新任务完成状态,结果: {result}")
+        except Exception as e:
+            logger.error(f"更新任务完成状态失败: {str(e)}")
+        
+        logger.info(f"===== 监控执行完成 =====")
+        return result
     except Exception as e:
         # 处理异常
         logger.error(f"执行任务出错: {str(e)}")
         end_time = datetime.now()
         duration = (end_time - start_time).total_seconds()
-        update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
-        raise e
-
-def execute_script(script_name, table_name, execution_mode):
-    """执行脚本并返回结果"""
-    if not script_name:
-        logger.error("未提供脚本名称,无法执行")
-        return False
-    
-    try:
-        # 检查脚本路径
-        script_path = Path(SCRIPTS_BASE_PATH) / script_name
-        logger.info(f"准备执行脚本,完整路径: {script_path}")
         
-        # 检查脚本路径是否存在
-        if not os.path.exists(script_path):
-            logger.error(f"脚本文件不存在: {script_path}")
-            logger.error(f"请确认脚本文件已部署到正确路径: {SCRIPTS_BASE_PATH}")
-            
-            # 尝试列出脚本目录中的文件
-            try:
-                script_dir = Path(SCRIPTS_BASE_PATH)
-                if os.path.exists(script_dir):
-                    files = os.listdir(script_dir)
-                    logger.info(f"可用脚本文件: {files}")
-                else:
-                    logger.error(f"脚本目录不存在: {script_dir}")
-            except Exception as le:
-                logger.error(f"尝试列出脚本目录内容时出错: {str(le)}")
-                
-            return False
-            
-        logger.info(f"脚本文件存在,开始导入: {script_path}")
+        # 尝试更新失败状态并记录结果
+        try:
+            logger.info(f"尝试更新失败状态: end_time={end_time}, duration={duration}")
+            update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
+            logger.info(f"成功更新任务失败状态")
+        except Exception as update_e:
+            logger.error(f"更新任务失败状态失败: {str(update_e)}")
         
-        # 动态导入模块
+        logger.info(f"===== 监控执行异常结束 =====")
+        raise e
+
+def ensure_boolean_result(func):
+    """装饰器:确保函数返回布尔值"""
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
         try:
-            spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
-            if spec is None:
-                logger.error(f"无法加载脚本规范: {script_path}")
+            result = func(*args, **kwargs)
+            logger.debug(f"脚本原始返回值: {result} (类型: {type(result).__name__})")
+            
+            # 处理None值
+            if result is None:
+                logger.warning(f"脚本函数 {func.__name__} 返回了None,默认设置为False")
                 return False
                 
-            module = importlib.util.module_from_spec(spec)
-            spec.loader.exec_module(module)
-            logger.info(f"成功导入脚本模块: {script_name}")
-        except ImportError as ie:
-            logger.error(f"导入脚本时出错: {str(ie)}")
-            import traceback
-            logger.error(traceback.format_exc())
+            # 处理非布尔值
+            if not isinstance(result, bool):
+                try:
+                    # 尝试转换为布尔值
+                    bool_result = bool(result)
+                    logger.warning(f"脚本函数 {func.__name__} 返回非布尔值 {result},已转换为布尔值 {bool_result}")
+                    return bool_result
+                except Exception as e:
+                    logger.error(f"无法将脚本返回值 {result} 转换为布尔值: {str(e)}")
+                    return False
+            
+            return result
+        except Exception as e:
+            logger.error(f"脚本函数 {func.__name__} 执行出错: {str(e)}")
             return False
-        except SyntaxError as se:
-            logger.error(f"脚本语法错误: {str(se)}")
-            logger.error(f"错误位置: {se.filename}, 行 {se.lineno}, 列 {se.offset}")
+    return wrapper
+
+def execute_script(script_path=None, script_name=None, script_exec_mode=None, table_name=None, execution_mode=None, args=None):
+    """
+    执行指定的脚本,并返回执行结果
+    支持两种调用方式:
+    1. execute_script(script_path, script_name, script_exec_mode, args={})
+    2. execute_script(script_name, table_name, execution_mode)
+    """
+    # 确定调用方式并统一参数
+    if script_path and script_name and script_exec_mode is not None:
+        # 第一种调用方式
+        if args is None:
+            args = {}
+    elif script_name and table_name and execution_mode is not None:
+        # 第二种调用方式
+        script_path = os.path.join(SCRIPTS_BASE_PATH, f"{script_name}.py")
+        script_exec_mode = execution_mode
+        args = {"table_name": table_name}
+    else:
+        logger.error("参数不正确,无法执行脚本")
+        return False
+
+    try:
+        # 确保脚本路径存在
+        if not os.path.exists(script_path):
+            logger.error(f"脚本路径 {script_path} 不存在")
             return False
+
+        # 加载脚本模块
+        spec = importlib.util.spec_from_file_location("script_module", script_path)
+        module = importlib.util.module_from_spec(spec)
+        spec.loader.exec_module(module)
         
-        # 验证run函数存在
+        # 检查并记录所有可用的函数
+        module_functions = [f for f in dir(module) if callable(getattr(module, f)) and not f.startswith('_')]
+        logger.debug(f"模块 {script_name} 中的可用函数: {module_functions}")
+
+        # 获取脚本的运行函数
         if not hasattr(module, "run"):
-            available_funcs = [func for func in dir(module) if callable(getattr(module, func)) and not func.startswith("_")]
-            logger.error(f"脚本 {script_name} 未定义标准入口函数 run(),无法执行")
-            logger.error(f"可用函数: {available_funcs}")
+            logger.error(f"脚本 {script_name} 没有run函数")
             return False
+
+        # 装饰run函数,确保返回布尔值
+        original_run = module.run
+        module.run = ensure_boolean_result(original_run)
         
-        # 执行run函数
-        logger.info(f"执行脚本 {script_name} 的run函数,参数: table_name={table_name}, execution_mode={execution_mode}")
-        result = module.run(table_name=table_name, execution_mode=execution_mode)
-        logger.info(f"脚本 {script_name} 执行结果: {result}")
-        return result
+        logger.info(f"开始执行脚本 {script_name},执行模式: {script_exec_mode}, 参数: {args}")
+        start_time = time.time()
+        
+        # 执行脚本
+        if table_name is not None:
+            # 第二种调用方式的参数格式
+            exec_result = module.run(table_name=table_name, execution_mode=script_exec_mode)
+        else:
+            # 第一种调用方式的参数格式
+            exec_result = module.run(script_exec_mode, args)
+        
+        end_time = time.time()
+        duration = end_time - start_time
+        
+        logger.info(f"脚本 {script_name} 执行完成,结果: {exec_result}, 耗时: {duration:.2f}秒")
+        return exec_result
     except Exception as e:
         logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
         import traceback

+ 275 - 2
dags/dag_dataops_pipeline_data_scheduler.py

@@ -28,6 +28,87 @@ from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
 # 创建日志记录器
 logger = logging.getLogger(__name__)
 
+# 开启详细诊断日志记录
+ENABLE_DEBUG_LOGGING = True
+
+def log_debug(message):
+    """记录调试日志,但只在启用调试模式时"""
+    if ENABLE_DEBUG_LOGGING:
+        logger.info(f"[DEBUG] {message}")
+
+# 在DAG启动时输出诊断信息
+log_debug("======== 诊断信息 ========")
+log_debug(f"当前工作目录: {os.getcwd()}")
+log_debug(f"SCRIPTS_BASE_PATH: {SCRIPTS_BASE_PATH}")
+log_debug(f"导入的common模块路径: {get_pg_conn.__module__}")
+
+# 检查数据库连接
+def validate_database_connection():
+    """验证数据库连接是否正常"""
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        cursor.execute("SELECT version()")
+        version = cursor.fetchone()
+        log_debug(f"数据库连接正常,PostgreSQL版本: {version[0]}")
+        
+        # 检查airflow_dag_schedule表是否存在
+        cursor.execute("""
+            SELECT EXISTS (
+               SELECT FROM information_schema.tables 
+               WHERE table_name = 'airflow_dag_schedule'
+            )
+        """)
+        table_exists = cursor.fetchone()[0]
+        if table_exists:
+            # 检查表结构
+            cursor.execute("""
+                SELECT column_name, data_type 
+                FROM information_schema.columns 
+                WHERE table_name = 'airflow_dag_schedule'
+            """)
+            columns = cursor.fetchall()
+            log_debug(f"airflow_dag_schedule表存在,列信息:")
+            for col in columns:
+                log_debug(f"  - {col[0]}: {col[1]}")
+            
+            # 查询最新记录数量
+            cursor.execute("SELECT COUNT(*) FROM airflow_dag_schedule")
+            count = cursor.fetchone()[0]
+            log_debug(f"airflow_dag_schedule表中有 {count} 条记录")
+            
+            # 检查最近的执行记录
+            cursor.execute("""
+                SELECT exec_date, COUNT(*) as record_count
+                FROM airflow_dag_schedule
+                GROUP BY exec_date
+                ORDER BY exec_date DESC
+                LIMIT 3
+            """)
+            recent_dates = cursor.fetchall()
+            log_debug(f"最近的执行日期及记录数:")
+            for date_info in recent_dates:
+                log_debug(f"  - {date_info[0]}: {date_info[1]} 条记录")
+        else:
+            log_debug("airflow_dag_schedule表不存在!")
+        
+        cursor.close()
+        conn.close()
+        return True
+    except Exception as e:
+        log_debug(f"数据库连接验证失败: {str(e)}")
+        import traceback
+        log_debug(f"错误堆栈: {traceback.format_exc()}")
+        return False
+
+# 执行数据库连接验证
+try:
+    validate_database_connection()
+except Exception as e:
+    log_debug(f"验证数据库连接时出错: {str(e)}")
+
+log_debug("======== 诊断信息结束 ========")
+
 #############################################
 # 通用工具函数
 #############################################
@@ -719,7 +800,10 @@ def create_execution_plan(**kwargs):
 
 def process_resource(target_table, script_name, script_exec_mode, exec_date):
     """处理单个资源表"""
+    task_id = f"resource_{target_table}"
+    logger.info(f"===== 开始执行 {task_id} =====")
     logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
+    
     # 检查exec_date是否是JSON字符串
     if isinstance(exec_date, str) and exec_date.startswith('{'):
         try:
@@ -730,8 +814,14 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date):
         except Exception as e:
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
     
+    # 确保exec_date是字符串
+    if not isinstance(exec_date, str):
+        exec_date = str(exec_date)
+        logger.info(f"将exec_date转换为字符串: {exec_date}")
+    
     try:
         # 正常调用执行监控函数
+        logger.info(f"调用execute_with_monitoring: target_table={target_table}, script_name={script_name}, exec_date={exec_date}")
         result = execute_with_monitoring(
             target_table=target_table,
             script_name=script_name,
@@ -739,15 +829,103 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date):
             exec_date=exec_date
         )
         logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
+        
+        # 验证状态是否正确更新到数据库
+        try:
+            conn = get_pg_conn()
+            cursor = conn.cursor()
+            cursor.execute("""
+                SELECT exec_result, exec_start_time, exec_end_time, exec_duration
+                FROM airflow_dag_schedule
+                WHERE exec_date = %s AND target_table = %s AND script_name = %s
+            """, (exec_date, target_table, script_name))
+            db_record = cursor.fetchone()
+            
+            if db_record:
+                logger.info(f"数据库中任务状态: exec_result={db_record[0]}, start_time={db_record[1]}, end_time={db_record[2]}, duration={db_record[3]}")
+                
+                # 如果数据库中没有结果但执行已完成,尝试再次更新
+                if db_record[0] is None and db_record[1] is not None:
+                    now = datetime.now()
+                    duration = (now - db_record[1]).total_seconds() if db_record[1] else 0
+                    
+                    logger.info(f"数据库中结果为空但已有开始时间,尝试手动更新结果: {result}")
+                    cursor.execute("""
+                        UPDATE airflow_dag_schedule
+                        SET exec_result = %s, exec_end_time = %s, exec_duration = %s
+                        WHERE exec_date = %s AND target_table = %s AND script_name = %s
+                    """, (result, now, duration, exec_date, target_table, script_name))
+                    conn.commit()
+                    logger.info(f"手动更新结果行数: {cursor.rowcount}")
+            else:
+                logger.warning(f"在数据库中未找到任务记录")
+            
+            cursor.close()
+            conn.close()
+        except Exception as verify_e:
+            logger.error(f"验证任务状态时出错: {str(verify_e)}")
+        
         return result
     except Exception as e:
         logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        
+        # 尝试直接在这里更新错误状态
+        try:
+            conn = get_pg_conn()
+            cursor = conn.cursor()
+            now = datetime.now()
+            
+            # 检查是否已有开始时间
+            cursor.execute("""
+                SELECT exec_start_time 
+                FROM airflow_dag_schedule 
+                WHERE exec_date = %s AND target_table = %s AND script_name = %s
+            """, (exec_date, target_table, script_name))
+            start_time_record = cursor.fetchone()
+            
+            if start_time_record and start_time_record[0]:
+                start_time = start_time_record[0]
+                duration = (now - start_time).total_seconds()
+            else:
+                # 如果没有开始时间,则使用0秒持续时间并记录开始时间
+                duration = 0
+                cursor.execute("""
+                    UPDATE airflow_dag_schedule
+                    SET exec_start_time = %s
+                    WHERE exec_date = %s AND target_table = %s AND script_name = %s
+                """, (now, exec_date, target_table, script_name))
+                logger.info("未找到开始时间,手动设置开始时间")
+                
+            # 更新为失败状态
+            cursor.execute("""
+                UPDATE airflow_dag_schedule 
+                SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
+                WHERE exec_date = %s AND target_table = %s AND script_name = %s
+            """, (now, duration, exec_date, target_table, script_name))
+            conn.commit()
+            logger.info(f"手动更新任务失败状态: {target_table}, 影响行数: {cursor.rowcount}")
+        except Exception as db_e:
+            logger.error(f"尝试手动更新失败状态时出错: {str(db_e)}")
+        finally:
+            if 'cursor' in locals() and cursor:
+                cursor.close()
+            if 'conn' in locals() and conn:
+                conn.close()
+        
         # 确保即使出错也返回结果,不会阻塞DAG
+        logger.info(f"===== 结束执行 {task_id} (失败) =====")
         return False
+    finally:
+        logger.info(f"===== 结束执行 {task_id} =====")
 
 def process_model(target_table, script_name, script_exec_mode, exec_date):
     """处理单个模型表"""
+    task_id = f"model_{target_table}"
+    logger.info(f"===== 开始执行 {task_id} =====")
     logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
+    
     # 检查exec_date是否是JSON字符串
     if isinstance(exec_date, str) and exec_date.startswith('{'):
         try:
@@ -758,7 +936,13 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
         except Exception as e:
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
     
+    # 确保exec_date是字符串
+    if not isinstance(exec_date, str):
+        exec_date = str(exec_date)
+        logger.info(f"将exec_date转换为字符串: {exec_date}")
+    
     try:
+        logger.info(f"调用execute_with_monitoring: target_table={target_table}, script_name={script_name}, exec_date={exec_date}")
         result = execute_with_monitoring(
             target_table=target_table,
             script_name=script_name,
@@ -766,11 +950,96 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
             exec_date=exec_date
         )
         logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
+        
+        # 验证状态是否正确更新到数据库
+        try:
+            conn = get_pg_conn()
+            cursor = conn.cursor()
+            cursor.execute("""
+                SELECT exec_result, exec_start_time, exec_end_time, exec_duration
+                FROM airflow_dag_schedule
+                WHERE exec_date = %s AND target_table = %s AND script_name = %s
+            """, (exec_date, target_table, script_name))
+            db_record = cursor.fetchone()
+            
+            if db_record:
+                logger.info(f"数据库中任务状态: exec_result={db_record[0]}, start_time={db_record[1]}, end_time={db_record[2]}, duration={db_record[3]}")
+                
+                # 如果数据库中没有结果但执行已完成,尝试再次更新
+                if db_record[0] is None and db_record[1] is not None:
+                    now = datetime.now()
+                    duration = (now - db_record[1]).total_seconds() if db_record[1] else 0
+                    
+                    logger.info(f"数据库中结果为空但已有开始时间,尝试手动更新结果: {result}")
+                    cursor.execute("""
+                        UPDATE airflow_dag_schedule
+                        SET exec_result = %s, exec_end_time = %s, exec_duration = %s
+                        WHERE exec_date = %s AND target_table = %s AND script_name = %s
+                    """, (result, now, duration, exec_date, target_table, script_name))
+                    conn.commit()
+                    logger.info(f"手动更新结果行数: {cursor.rowcount}")
+            else:
+                logger.warning(f"在数据库中未找到任务记录")
+            
+            cursor.close()
+            conn.close()
+        except Exception as verify_e:
+            logger.error(f"验证任务状态时出错: {str(verify_e)}")
+        
         return result
     except Exception as e:
         logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        
+        # 尝试直接在这里更新错误状态
+        try:
+            conn = get_pg_conn()
+            cursor = conn.cursor()
+            now = datetime.now()
+            
+            # 检查是否已有开始时间
+            cursor.execute("""
+                SELECT exec_start_time 
+                FROM airflow_dag_schedule 
+                WHERE exec_date = %s AND target_table = %s AND script_name = %s
+            """, (exec_date, target_table, script_name))
+            start_time_record = cursor.fetchone()
+            
+            if start_time_record and start_time_record[0]:
+                start_time = start_time_record[0]
+                duration = (now - start_time).total_seconds()
+            else:
+                # 如果没有开始时间,则使用0秒持续时间并记录开始时间
+                duration = 0
+                cursor.execute("""
+                    UPDATE airflow_dag_schedule
+                    SET exec_start_time = %s
+                    WHERE exec_date = %s AND target_table = %s AND script_name = %s
+                """, (now, exec_date, target_table, script_name))
+                logger.info("未找到开始时间,手动设置开始时间")
+                
+            # 更新为失败状态
+            cursor.execute("""
+                UPDATE airflow_dag_schedule 
+                SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
+                WHERE exec_date = %s AND target_table = %s AND script_name = %s
+            """, (now, duration, exec_date, target_table, script_name))
+            conn.commit()
+            logger.info(f"手动更新任务失败状态: {target_table}, 影响行数: {cursor.rowcount}")
+        except Exception as db_e:
+            logger.error(f"尝试手动更新失败状态时出错: {str(db_e)}")
+        finally:
+            if 'cursor' in locals() and cursor:
+                cursor.close()
+            if 'conn' in locals() and conn:
+                conn.close()
+        
         # 确保即使出错也返回结果,不会阻塞DAG
+        logger.info(f"===== 结束执行 {task_id} (失败) =====")
         return False
+    finally:
+        logger.info(f"===== 结束执行 {task_id} =====")
 
 #############################################
 # 第三阶段: 汇总阶段(Summary Phase)的函数
@@ -1210,7 +1479,9 @@ with DAG(
                                     "target_table": table_name,
                                     "script_name": script_name,
                                     "script_exec_mode": exec_mode,
-                                    "exec_date": exec_date
+                                    # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
+                                    # 这样 execute_with_monitoring 函数才能正确更新数据库
+                                    "exec_date": str(exec_date)
                                 },
                                 retries=TASK_RETRY_CONFIG["retries"],
                                 retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
@@ -1276,7 +1547,9 @@ with DAG(
                                     "target_table": table_name,
                                     "script_name": script_name,
                                     "script_exec_mode": exec_mode,
-                                    "exec_date": exec_date
+                                    # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
+                                    # 这样 execute_with_monitoring 函数才能正确更新数据库
+                                    "exec_date": str(exec_date)
                                 },
                                 retries=TASK_RETRY_CONFIG["retries"],
                                 retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])