Quellcode durchsuchen

pipeline_data修改了一半

wangxq vor 1 Monat
Ursprung
Commit
db9d2718b0
1 geänderte Dateien mit 227 neuen und 464 gelöschten Zeilen
  1. 227 464
      dags/dag_dataops_pipeline_data_scheduler.py

+ 227 - 464
dags/dag_dataops_pipeline_data_scheduler.py

@@ -130,6 +130,70 @@ class DecimalEncoder(json.JSONEncoder):
         # 让父类处理其他类型
         return super(DecimalEncoder, self).default(obj)
 
+#############################################
+# 新的工具函数
+#############################################
+
+def execute_python_script(target_table, script_name, script_exec_mode, exec_date, **kwargs):
+    """
+    执行Python脚本并返回执行结果
+    
+    参数:
+        target_table: 目标表名
+        script_name: 脚本名称 
+        script_exec_mode: 脚本执行模式
+        exec_date: 执行日期
+        
+    返回:
+        bool: 脚本执行结果
+    """
+    # 添加详细日志
+    logger.info(f"===== 开始执行脚本 =====")
+    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
+    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
+    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
+    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+
+    # 检查script_name是否为空
+    if not script_name:
+        logger.error(f"表 {target_table} 的script_name为空,无法执行")
+        return False
+        
+    # 记录执行开始时间
+    start_time = datetime.now()
+    
+    try:
+        # 执行实际脚本
+        logger.info(f"开始执行脚本: {script_name}")
+        result = execute_python_script(script_name, target_table, script_exec_mode)
+        logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
+        
+        # 确保result是布尔值
+        if result is None:
+            logger.warning(f"脚本返回值为None,转换为False")
+            result = False
+        elif not isinstance(result, bool):
+            original_result = result
+            result = bool(result)
+            logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+        
+        # 记录结束时间和结果
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+        
+        return result
+    except Exception as e:
+        # 处理异常
+        logger.error(f"执行任务出错: {str(e)}")
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
+        logger.info(f"===== 脚本执行异常结束 =====")
+        
+        # 确保不会阻塞DAG
+        return False
+
 #############################################
 # 第一阶段: 准备阶段(Prepare Phase)的函数
 #############################################
@@ -351,50 +415,8 @@ def filter_invalid_tables(tables_info):
 
 def write_to_airflow_dag_schedule(exec_date, tables_info):
     """将表信息写入airflow_dag_schedule表"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    
-    try:
-        # 清理当日数据,避免重复
-        cursor.execute("""
-            DELETE FROM airflow_dag_schedule WHERE exec_date = %s
-        """, (exec_date,))
-        logger.info(f"已清理执行日期 {exec_date} 的现有数据")
-        
-        # 批量插入新数据
-        inserted_count = 0
-        for table in tables_info:
-            cursor.execute("""
-                INSERT INTO airflow_dag_schedule (
-                    exec_date, source_table, target_table, target_table_label,
-                    target_table_status, is_directly_schedule, default_update_frequency,
-                    script_name, script_type, script_exec_mode
-                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
-            """, (
-                exec_date,
-                table.get('source_table'),
-                table['target_table'],
-                table.get('target_table_label'),
-                table.get('target_table_status', True),
-                table.get('is_directly_schedule', False),
-                table.get('default_update_frequency'),
-                table.get('script_name'),
-                table.get('script_type', 'python'),
-                table.get('script_exec_mode', 'append')
-            ))
-            inserted_count += 1
-        
-        conn.commit()
-        logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
-        return inserted_count
-    except Exception as e:
-        logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
-        conn.rollback()
-        # 不要返回0,而是重新抛出异常,确保错误被正确传播
-        raise
-    finally:
-        cursor.close()
-        conn.close()
+    logger.info(f"模拟写入 {len(tables_info)} 条记录到 airflow_dag_schedule 表 (已移除数据库操作)")
+    return len(tables_info)
 
 def prepare_dag_schedule(**kwargs):
     """准备DAG调度任务的主函数"""
@@ -586,71 +608,23 @@ def get_latest_date():
         conn.close()
 
 def get_all_tasks(exec_date):
-    """获取所有需要执行的任务(DataResource和DataModel)"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 查询数据表中记录总数
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s
-        """, (exec_date,))
-        total_count = cursor.fetchone()[0]
-        logger.info(f"执行日期 {exec_date} 在airflow_dag_schedule表中共有 {total_count} 条记录")
-        
-        # 查询所有资源表任务
-        cursor.execute("""
-            SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
-        """, (exec_date,))
-        resource_results = cursor.fetchall()
-        logger.info(f"查询到 {len(resource_results)} 条DataResource记录")
-        
-        # 查询所有模型表任务
-        cursor.execute("""
-            SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
-        """, (exec_date,))
-        model_results = cursor.fetchall()
-        logger.info(f"查询到 {len(model_results)} 条DataModel记录")
-        
-        # 整理资源表信息
-        resource_tasks = []
-        for row in resource_results:
-            source_table, target_table, target_table_label, script_name, script_exec_mode = row
-            if script_name:  # 确保脚本名称不为空
-                resource_tasks.append({
-                    "source_table": source_table,
-                    "target_table": target_table,
-                    "target_table_label": target_table_label,
-                    "script_name": script_name,
-                    "script_exec_mode": script_exec_mode or "append"
-                })
-        
-        # 整理模型表信息
-        model_tasks = []
-        for row in model_results:
-            source_table, target_table, target_table_label, script_name, script_exec_mode = row
-            if script_name:  # 确保脚本名称不为空
-                model_tasks.append({
-                    "source_table": source_table,
-                    "target_table": target_table,
-                    "target_table_label": target_table_label,
-                    "script_name": script_name,
-                    "script_exec_mode": script_exec_mode or "append"
-                })
-        
-        logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
-        return resource_tasks, model_tasks
-    except Exception as e:
-        logger.error(f"获取任务信息时出错: {str(e)}")
+    """
+    获取所有需要执行的任务(DataResource和DataModel)
+    直接从执行计划获取任务信息,不再查询数据库
+    """
+    # 从数据库获取执行计划
+    execution_plan = get_execution_plan_from_db(exec_date)
+    
+    if not execution_plan:
+        logger.warning(f"未找到执行日期 {exec_date} 的执行计划")
         return [], []
-    finally:
-        cursor.close()
-        conn.close()
+    
+    # 提取资源任务和模型任务
+    resource_tasks = execution_plan.get("resource_tasks", [])
+    model_tasks = execution_plan.get("model_tasks", [])
+    
+    logger.info(f"获取到 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
+    return resource_tasks, model_tasks
 
 def get_table_dependencies(table_names):
     """获取表之间的依赖关系"""
@@ -759,101 +733,20 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date):
         logger.info(f"将exec_date转换为字符串: {exec_date}")
     
     try:
-        # 正常调用执行监控函数
-        logger.info(f"调用execute_with_monitoring: target_table={target_table}, script_name={script_name}, exec_date={exec_date}")
-        result = execute_with_monitoring(
+        # 使用新的函数执行脚本,不依赖数据库
+        logger.info(f"调用execute_python_script: target_table={target_table}, script_name={script_name}")
+        result = execute_python_script(
             target_table=target_table,
             script_name=script_name,
             script_exec_mode=script_exec_mode,
             exec_date=exec_date
         )
         logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
-        
-        # 验证状态是否正确更新到数据库
-        try:
-            conn = get_pg_conn()
-            cursor = conn.cursor()
-            cursor.execute("""
-                SELECT exec_result, exec_start_time, exec_end_time, exec_duration
-                FROM airflow_dag_schedule
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (exec_date, target_table, script_name))
-            db_record = cursor.fetchone()
-            
-            if db_record:
-                logger.info(f"数据库中任务状态: exec_result={db_record[0]}, start_time={db_record[1]}, end_time={db_record[2]}, duration={db_record[3]}")
-                
-                # 如果数据库中没有结果但执行已完成,尝试再次更新
-                if db_record[0] is None and db_record[1] is not None:
-                    now = datetime.now()
-                    duration = (now - db_record[1]).total_seconds() if db_record[1] else 0
-                    
-                    logger.info(f"数据库中结果为空但已有开始时间,尝试手动更新结果: {result}")
-                    cursor.execute("""
-                        UPDATE airflow_dag_schedule
-                        SET exec_result = %s, exec_end_time = %s, exec_duration = %s
-                        WHERE exec_date = %s AND target_table = %s AND script_name = %s
-                    """, (result, now, duration, exec_date, target_table, script_name))
-                    conn.commit()
-                    logger.info(f"手动更新结果行数: {cursor.rowcount}")
-            else:
-                logger.warning(f"在数据库中未找到任务记录")
-            
-            cursor.close()
-            conn.close()
-        except Exception as verify_e:
-            logger.error(f"验证任务状态时出错: {str(verify_e)}")
-        
         return result
     except Exception as e:
         logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
         import traceback
         logger.error(traceback.format_exc())
-        
-        # 尝试直接在这里更新错误状态
-        try:
-            conn = get_pg_conn()
-            cursor = conn.cursor()
-            now = datetime.now()
-            
-            # 检查是否已有开始时间
-            cursor.execute("""
-                SELECT exec_start_time 
-                FROM airflow_dag_schedule 
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (exec_date, target_table, script_name))
-            start_time_record = cursor.fetchone()
-            
-            if start_time_record and start_time_record[0]:
-                start_time = start_time_record[0]
-                duration = (now - start_time).total_seconds()
-            else:
-                # 如果没有开始时间,则使用0秒持续时间并记录开始时间
-                duration = 0
-                cursor.execute("""
-                    UPDATE airflow_dag_schedule
-                    SET exec_start_time = %s
-                    WHERE exec_date = %s AND target_table = %s AND script_name = %s
-                """, (now, exec_date, target_table, script_name))
-                logger.info("未找到开始时间,手动设置开始时间")
-                
-            # 更新为失败状态
-            cursor.execute("""
-                UPDATE airflow_dag_schedule 
-                SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (now, duration, exec_date, target_table, script_name))
-            conn.commit()
-            logger.info(f"手动更新任务失败状态: {target_table}, 影响行数: {cursor.rowcount}")
-        except Exception as db_e:
-            logger.error(f"尝试手动更新失败状态时出错: {str(db_e)}")
-        finally:
-            if 'cursor' in locals() and cursor:
-                cursor.close()
-            if 'conn' in locals() and conn:
-                conn.close()
-        
-        # 确保即使出错也返回结果,不会阻塞DAG
         logger.info(f"===== 结束执行 {task_id} (失败) =====")
         return False
     finally:
@@ -881,100 +774,20 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
         logger.info(f"将exec_date转换为字符串: {exec_date}")
     
     try:
-        logger.info(f"调用execute_with_monitoring: target_table={target_table}, script_name={script_name}, exec_date={exec_date}")
-        result = execute_with_monitoring(
+        # 使用新的函数执行脚本,不依赖数据库
+        logger.info(f"调用execute_python_script: target_table={target_table}, script_name={script_name}")
+        result = execute_python_script(
             target_table=target_table,
             script_name=script_name,
             script_exec_mode=script_exec_mode,
             exec_date=exec_date
         )
         logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
-        
-        # 验证状态是否正确更新到数据库
-        try:
-            conn = get_pg_conn()
-            cursor = conn.cursor()
-            cursor.execute("""
-                SELECT exec_result, exec_start_time, exec_end_time, exec_duration
-                FROM airflow_dag_schedule
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (exec_date, target_table, script_name))
-            db_record = cursor.fetchone()
-            
-            if db_record:
-                logger.info(f"数据库中任务状态: exec_result={db_record[0]}, start_time={db_record[1]}, end_time={db_record[2]}, duration={db_record[3]}")
-                
-                # 如果数据库中没有结果但执行已完成,尝试再次更新
-                if db_record[0] is None and db_record[1] is not None:
-                    now = datetime.now()
-                    duration = (now - db_record[1]).total_seconds() if db_record[1] else 0
-                    
-                    logger.info(f"数据库中结果为空但已有开始时间,尝试手动更新结果: {result}")
-                    cursor.execute("""
-                        UPDATE airflow_dag_schedule
-                        SET exec_result = %s, exec_end_time = %s, exec_duration = %s
-                        WHERE exec_date = %s AND target_table = %s AND script_name = %s
-                    """, (result, now, duration, exec_date, target_table, script_name))
-                    conn.commit()
-                    logger.info(f"手动更新结果行数: {cursor.rowcount}")
-            else:
-                logger.warning(f"在数据库中未找到任务记录")
-            
-            cursor.close()
-            conn.close()
-        except Exception as verify_e:
-            logger.error(f"验证任务状态时出错: {str(verify_e)}")
-        
         return result
     except Exception as e:
         logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
         import traceback
         logger.error(traceback.format_exc())
-        
-        # 尝试直接在这里更新错误状态
-        try:
-            conn = get_pg_conn()
-            cursor = conn.cursor()
-            now = datetime.now()
-            
-            # 检查是否已有开始时间
-            cursor.execute("""
-                SELECT exec_start_time 
-                FROM airflow_dag_schedule 
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (exec_date, target_table, script_name))
-            start_time_record = cursor.fetchone()
-            
-            if start_time_record and start_time_record[0]:
-                start_time = start_time_record[0]
-                duration = (now - start_time).total_seconds()
-            else:
-                # 如果没有开始时间,则使用0秒持续时间并记录开始时间
-                duration = 0
-                cursor.execute("""
-                    UPDATE airflow_dag_schedule
-                    SET exec_start_time = %s
-                    WHERE exec_date = %s AND target_table = %s AND script_name = %s
-                """, (now, exec_date, target_table, script_name))
-                logger.info("未找到开始时间,手动设置开始时间")
-                
-            # 更新为失败状态
-            cursor.execute("""
-                UPDATE airflow_dag_schedule 
-                SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (now, duration, exec_date, target_table, script_name))
-            conn.commit()
-            logger.info(f"手动更新任务失败状态: {target_table}, 影响行数: {cursor.rowcount}")
-        except Exception as db_e:
-            logger.error(f"尝试手动更新失败状态时出错: {str(db_e)}")
-        finally:
-            if 'cursor' in locals() and cursor:
-                cursor.close()
-            if 'conn' in locals() and conn:
-                conn.close()
-        
-        # 确保即使出错也返回结果,不会阻塞DAG
         logger.info(f"===== 结束执行 {task_id} (失败) =====")
         return False
     finally:
@@ -985,92 +798,82 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
 #############################################
 
 def get_execution_stats(exec_date):
-    """获取当日执行统计信息"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
+    """
+    获取执行统计信息,使用Airflow的API获取执行状态
+    不再依赖airflow_dag_schedule表
+    """
+    from airflow.models import DagRun, TaskInstance
+    from airflow.utils.state import State
+    
+    logger.info(f"获取执行日期 {exec_date} 的执行统计信息")
+    
+    # 当前DAG ID
+    dag_id = "dag_dataops_pipeline_data_scheduler"
+    
     try:
-        # 查询总任务数
-        cursor.execute("""
-            SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
-        """, (exec_date,))
-        result = cursor.fetchone()
-        total_tasks = result[0] if result else 0
+        # 查找对应的DAG运行
+        dag_runs = DagRun.find(dag_id=dag_id, execution_date=exec_date)
         
-        # 查询每种类型的任务数
-        cursor.execute("""
-            SELECT target_table_label, COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s 
-            GROUP BY target_table_label
-        """, (exec_date,))
-        type_counts = {row[0]: row[1] for row in cursor.fetchall()}
+        if not dag_runs:
+            logger.warning(f"未找到DAG {dag_id} 在 {exec_date} 的运行记录")
+            return {
+                "exec_date": exec_date,
+                "total_tasks": 0,
+                "type_counts": {},
+                "success_count": 0,
+                "fail_count": 0,
+                "pending_count": 0,
+                "success_rate": 0,
+                "avg_duration": None,
+                "min_duration": None,
+                "max_duration": None,
+                "failed_tasks": []
+            }
         
-        # 查询执行结果统计
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS TRUE
-        """, (exec_date,))
-        result = cursor.fetchone()
-        success_count = result[0] if result else 0
+        dag_run = dag_runs[0]
         
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS FALSE
-        """, (exec_date,))
-        result = cursor.fetchone()
-        fail_count = result[0] if result else 0
+        # 获取所有任务实例
+        task_instances = TaskInstance.find(dag_id=dag_id, execution_date=dag_run.execution_date)
         
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS NULL
-        """, (exec_date,))
-        result = cursor.fetchone()
-        pending_count = result[0] if result else 0
+        # 统计任务状态
+        total_tasks = len(task_instances)
+        success_count = len([ti for ti in task_instances if ti.state == State.SUCCESS])
+        fail_count = len([ti for ti in task_instances if ti.state in (State.FAILED, State.UPSTREAM_FAILED)])
+        pending_count = total_tasks - success_count - fail_count
         
-        # 计算执行时间统计
-        cursor.execute("""
-            SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_duration IS NOT NULL
-        """, (exec_date,))
-        time_stats = cursor.fetchone()
-        
-        # 确保时间统计不为None
-        if time_stats and time_stats[0] is not None:
-            avg_duration = float(time_stats[0])
-            min_duration = float(time_stats[1]) if time_stats[1] is not None else None
-            max_duration = float(time_stats[2]) if time_stats[2] is not None else None
-        else:
-            avg_duration = None
-            min_duration = None
-            max_duration = None
+        # 计算成功率
+        success_rate = (success_count / total_tasks * 100) if total_tasks > 0 else 0
         
-        # 查询失败任务详情
-        cursor.execute("""
-            SELECT target_table, script_name, target_table_label, exec_duration
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS FALSE
-        """, (exec_date,))
-        failed_tasks = []
-        for row in cursor.fetchall():
-            task_dict = {
-                "target_table": row[0],
-                "script_name": row[1],
-                "target_table_label": row[2],
-            }
-            if row[3] is not None:
-                task_dict["exec_duration"] = float(row[3])
-            else:
-                task_dict["exec_duration"] = None
-            failed_tasks.append(task_dict)
+        # 计算执行时间
+        durations = []
+        for ti in task_instances:
+            if ti.start_date and ti.end_date:
+                duration = (ti.end_date - ti.start_date).total_seconds()
+                durations.append(duration)
+        
+        avg_duration = sum(durations) / len(durations) if durations else None
+        min_duration = min(durations) if durations else None
+        max_duration = max(durations) if durations else None
         
-        # 计算成功率,避免除零错误
-        success_rate = 0
-        if total_tasks > 0:
-            success_rate = (success_count / total_tasks) * 100
+        # 分类统计信息
+        type_counts = {
+            "resource": len([ti for ti in task_instances if ti.task_id.startswith("resource_")]),
+            "model": len([ti for ti in task_instances if ti.task_id.startswith("model_")])
+        }
+        
+        # 获取失败任务详情
+        failed_tasks = []
+        for ti in task_instances:
+            if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
+                task_dict = {
+                    "task_id": ti.task_id,
+                    "state": ti.state,
+                }
+                
+                if ti.start_date and ti.end_date:
+                    task_dict["exec_duration"] = (ti.end_date - ti.start_date).total_seconds()
+                
+                failed_tasks.append(task_dict)
         
         # 汇总统计信息
         stats = {
@@ -1090,64 +893,17 @@ def get_execution_stats(exec_date):
         return stats
     except Exception as e:
         logger.error(f"获取执行统计信息时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
         return {}
-    finally:
-        cursor.close()
-        conn.close()
 
 def update_missing_results(exec_date):
-    """更新缺失的执行结果信息"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 查询所有缺失执行结果的任务
-        cursor.execute("""
-            SELECT target_table, script_name
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS NULL
-        """, (exec_date,))
-        missing_results = cursor.fetchall()
-        
-        update_count = 0
-        for row in missing_results:
-            target_table, script_name = row
-            
-            # 如果有开始时间但没有结束时间,假设执行失败
-            cursor.execute("""
-                SELECT exec_start_time
-                FROM airflow_dag_schedule
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (exec_date, target_table, script_name))
-            
-            start_time = cursor.fetchone()
-            
-            if start_time and start_time[0]:
-                # 有开始时间但无结果,标记为失败
-                now = datetime.now()
-                duration = (now - start_time[0]).total_seconds()
-                
-                cursor.execute("""
-                    UPDATE airflow_dag_schedule
-                    SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
-                    WHERE exec_date = %s AND target_table = %s AND script_name = %s
-                """, (now, duration, exec_date, target_table, script_name))
-                
-                logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
-                update_count += 1
-            else:
-                # 没有开始时间且无结果,假设未执行
-                logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
-        
-        conn.commit()
-        logger.info(f"更新了 {update_count} 个缺失结果的任务")
-        return update_count
-    except Exception as e:
-        logger.error(f"更新缺失执行结果时出错: {str(e)}")
-        conn.rollback()
-        return 0
-    finally:
-        cursor.close()
-        conn.close()
+    """
+    更新缺失的执行结果信息
+    此函数已不再操作数据库,仅返回0表示无需更新
+    """
+    logger.info(f"模拟更新缺失的执行结果信息 (已移除数据库操作)")
+    return 0
 
 def generate_execution_report(exec_date, stats):
     """生成执行报告"""
@@ -1184,9 +940,8 @@ def generate_execution_report(exec_date, stats):
     if failed_tasks:
         report.append("\n--- 失败任务详情 ---")
         for i, task in enumerate(failed_tasks, 1):
-            report.append(f"{i}. 表名: {task['target_table']}")
-            report.append(f"   脚本: {task['script_name']}")
-            report.append(f"   类型: {task['target_table_label']}")
+            report.append(f"{i}. 任务ID: {task['task_id']}")
+            report.append(f"   状态: {task['state']}")
             exec_duration = task.get('exec_duration')
             if exec_duration is not None:
                 report.append(f"   执行时间: {exec_duration:.2f} 秒")
@@ -1204,71 +959,79 @@ def generate_execution_report(exec_date, stats):
     return report_str
 
 def summarize_execution(**kwargs):
-    """汇总执行情况的主函数"""
+    """简化的汇总执行情况函数,只判断整个作业是否成功"""
     try:
         exec_date = kwargs.get('ds') or get_today_date()
-        logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
+        logger.info(f"开始汇总执行日期 {exec_date} 的执行情况")
         
-        # 1. 更新缺失的执行结果
-        try:
-            update_count = update_missing_results(exec_date)
-            logger.info(f"更新了 {update_count} 个缺失的执行结果")
-        except Exception as e:
-            logger.error(f"更新缺失执行结果时出错: {str(e)}")
-            update_count = 0
+        # 获取任务实例对象
+        task_instance = kwargs.get('ti')
+        dag_id = task_instance.dag_id
         
-        # 2. 获取执行统计信息
-        try:
-            stats = get_execution_stats(exec_date)
-            if not stats:
-                logger.warning("未能获取执行统计信息,将使用默认值")
-                stats = {
-                    "exec_date": exec_date,
-                    "total_tasks": 0,
-                    "type_counts": {},
-                    "success_count": 0,
-                    "fail_count": 0,
-                    "pending_count": 0,
-                    "success_rate": 0,
-                    "avg_duration": None,
-                    "min_duration": None,
-                    "max_duration": None,
-                    "failed_tasks": []
-                }
-        except Exception as e:
-            logger.error(f"获取执行统计信息时出错: {str(e)}")
+        # 获取DAG运行状态信息
+        from airflow.models import DagRun
+        from airflow.utils.state import State
+        
+        # 查找对应的DAG运行
+        dag_runs = DagRun.find(dag_id=dag_id, execution_date=task_instance.execution_date)
+        
+        if not dag_runs or len(dag_runs) == 0:
+            logger.warning(f"未找到DAG {dag_id} 在执行日期 {exec_date} 的运行记录")
+            state = "UNKNOWN"
+            success = False
+        else:
+            # 获取状态
+            dag_run = dag_runs[0]  # 取第一个匹配的DAG运行
+            state = dag_run.state
+            logger.info(f"DAG {dag_id} 的状态为: {state}")
+            
+            # 判断是否成功
+            success = (state == State.SUCCESS)
+        
+        # 获取更详细的执行统计信息
+        stats = get_execution_stats(exec_date)
+        
+        # 创建简单的报告
+        if success:
+            report = f"DAG {dag_id} 在 {exec_date} 的执行成功完成。"
+            if stats:
+                report += f" 总共有 {stats.get('total_tasks', 0)} 个任务," \
+                          f"其中成功 {stats.get('success_count', 0)} 个," \
+                          f"失败 {stats.get('fail_count', 0)} 个。"
+        else:
+            report = f"DAG {dag_id} 在 {exec_date} 的执行未成功完成,状态为: {state}。"
+            if stats and stats.get('failed_tasks'):
+                report += f" 有 {len(stats.get('failed_tasks', []))} 个任务失败。"
+        
+        # 记录执行结果
+        logger.info(report)
+        
+        # 如果 stats 为空,创建一个简单的状态信息
+        if not stats:
             stats = {
                 "exec_date": exec_date,
-                "total_tasks": 0,
-                "type_counts": {},
-                "success_count": 0,
-                "fail_count": 0,
-                "pending_count": 0,
-                "success_rate": 0,
-                "avg_duration": None,
-                "min_duration": None,
-                "max_duration": None,
-                "failed_tasks": []
+                "success": success,
+                "dag_id": dag_id,
+                "dag_run_state": state
             }
         
-        # 3. 生成执行报告
-        try:
-            report = generate_execution_report(exec_date, stats)
-        except Exception as e:
-            logger.error(f"生成执行报告时出错: {str(e)}")
-            report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
+        # 添加success状态到stats
+        stats["success"] = success
         
-        # 将报告和统计信息传递给下一个任务
-        try:
-            kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
-            kwargs['ti'].xcom_push(key='execution_report', value=report)
-        except Exception as e:
-            logger.error(f"保存报告到XCom时出错: {str(e)}")
+        # 将结果推送到XCom
+        task_instance.xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
+        task_instance.xcom_push(key='execution_report', value=report)
+        task_instance.xcom_push(key='execution_success', value=success)
+        
+        # 生成简化的执行报告
+        simple_report = generate_execution_report(exec_date, stats)
         
-        return report
+        return simple_report
     except Exception as e:
         logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
-        # 返回一个简单的错误报告,确保任务不会失败
+        import traceback
+        logger.error(traceback.format_exc())
+        # 返回一个简单的错误报告
         return f"执行汇总时出现错误: {str(e)}"
 
 # 添加新函数,用于从数据库获取执行计划