ソースを参照

准备修改两个文件中的时区,我发现ds和exection_time都是UTC时区的

wangxq 1 ヶ月 前
コミット
22cb341bd1

+ 165 - 146
dags/dag_dataops_pipeline_data_scheduler.py

@@ -23,6 +23,7 @@ from common import (
     get_today_date
 )
 from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
+import pytz
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
@@ -436,6 +437,10 @@ def filter_invalid_tables(tables_info):
 def prepare_dag_schedule(**kwargs):
     """准备DAG调度任务的主函数"""
     exec_date = kwargs.get('ds') or get_today_date()
+    execution_date = kwargs.get('execution_date')
+    
+    # 记录重要的时间参数
+    logger.info(f"【时间参数】prepare_dag_schedule: ds={exec_date}, execution_date={execution_date}")
     logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
     
     # 1. 获取启用的表
@@ -528,19 +533,23 @@ def prepare_dag_schedule(**kwargs):
     }
     
     # 将执行计划保存到XCom
-    kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
+    kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
     logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
     
     return len(valid_tables)
 
-def check_execution_plan_file(**kwargs):
+def check_execution_plan(**kwargs):
     """
     检查执行计划是否存在且有效
     返回False将阻止所有下游任务执行
     """
-    logger.info("检查数据库中的执行计划是否存在且有效")
+    execution_date = kwargs.get('execution_date')
     exec_date = kwargs.get('ds') or get_today_date()
     
+    # 记录重要的时间参数
+    logger.info(f"【时间参数】check_execution_plan: ds={exec_date}, execution_date={execution_date}")
+    logger.info("检查数据库中的执行计划是否存在且有效")
+    
     # 从数据库获取执行计划
     execution_plan = get_execution_plan_from_db(exec_date)
     
@@ -634,13 +643,18 @@ def get_table_dependencies(table_names):
 def create_execution_plan(**kwargs):
     """准备执行计划的函数,使用从准备阶段传递的数据"""
     try:
+        execution_date = kwargs.get('execution_date')
+        exec_date = kwargs.get('ds') or get_today_date()
+        
+        # 记录重要的时间参数
+        logger.info(f"【时间参数】create_execution_plan: ds={exec_date}, execution_date={execution_date}")
+        
         # 从XCom获取执行计划
         execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
         
         # 如果找不到执行计划,则从数据库获取
         if not execution_plan:
             # 获取执行日期
-            exec_date = kwargs.get('ds') or get_today_date()
             logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
             
             # 获取所有任务
@@ -663,10 +677,10 @@ def create_execution_plan(**kwargs):
             }
             
             # 保存执行计划到XCom
-            kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(new_execution_plan, default=json_serial))
+            kwargs['ti'].xcom_push(key='execution_plan', value=new_execution_plan)
             logger.info(f"创建新的执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
             
-            return json.dumps(new_execution_plan, default=json_serial)
+            return new_execution_plan
         
         logger.info(f"成功获取执行计划")
         return execution_plan
@@ -680,7 +694,7 @@ def create_execution_plan(**kwargs):
             "dependencies": {}
         }
         
-        return json.dumps(empty_plan, default=json_serial)
+        return empty_plan
 
 def process_resource(target_table, script_name, script_exec_mode, exec_date):
     """处理单个资源表"""
@@ -688,16 +702,6 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date):
     logger.info(f"===== 开始执行 {task_id} =====")
     logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
     
-    # 检查exec_date是否是JSON字符串
-    if isinstance(exec_date, str) and exec_date.startswith('{'):
-        try:
-            # 尝试解析JSON字符串
-            exec_date_data = json.loads(exec_date)
-            exec_date = exec_date_data.get("exec_date")
-            logger.info(f"从JSON中提取执行日期: {exec_date}")
-        except Exception as e:
-            logger.error(f"解析exec_date JSON时出错: {str(e)}")
-    
     # 确保exec_date是字符串
     if not isinstance(exec_date, str):
         exec_date = str(exec_date)
@@ -729,16 +733,6 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
     logger.info(f"===== 开始执行 {task_id} =====")
     logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
     
-    # 检查exec_date是否是JSON字符串
-    if isinstance(exec_date, str) and exec_date.startswith('{'):
-        try:
-            # 尝试解析JSON字符串
-            exec_date_data = json.loads(exec_date)
-            exec_date = exec_date_data.get("exec_date")
-            logger.info(f"从JSON中提取执行日期: {exec_date}")
-        except Exception as e:
-            logger.error(f"解析exec_date JSON时出错: {str(e)}")
-    
     # 确保exec_date是字符串
     if not isinstance(exec_date, str):
         exec_date = str(exec_date)
@@ -775,18 +769,29 @@ def get_execution_stats(exec_date):
     """
     from airflow.models import DagRun, TaskInstance
     from airflow.utils.state import State
+    from sqlalchemy import desc
+    from airflow import settings
     
-    logger.info(f"获取执行日期 {exec_date} 的执行统计信息")
+    # 记录原始输入参数,仅供参考
+    logger.debug(f"【执行日期】get_execution_stats接收到 exec_date: {exec_date}, 类型: {type(exec_date)}")
+    logger.debug(f"获取执行日期 {exec_date} 的执行统计信息")
     
     # 当前DAG ID
     dag_id = "dag_dataops_pipeline_data_scheduler"
     
     try:
-        # 查找对应的DAG运行
-        dag_runs = DagRun.find(dag_id=dag_id, execution_date=exec_date)
+        # 直接查询最近的DAG运行,不依赖于精确的执行日期
+        logger.debug("忽略精确的执行日期,直接查询最近的DAG运行")
+        session = settings.Session()
+        
+        # 查询最近的DAG运行,按updated_at降序排序
+        dag_runs = session.query(DagRun).filter(
+            DagRun.dag_id == dag_id
+        ).order_by(desc(DagRun.updated_at)).limit(1).all()
         
         if not dag_runs:
-            logger.warning(f"未找到DAG {dag_id} 在 {exec_date} 的运行记录")
+            logger.warning(f"未找到DAG {dag_id} 的任何运行记录")
+            session.close()
             return {
                 "exec_date": exec_date,
                 "total_tasks": 0,
@@ -802,9 +807,19 @@ def get_execution_stats(exec_date):
             }
         
         dag_run = dag_runs[0]
+        logger.debug(f"找到最近的DAG运行: execution_date={dag_run.execution_date}, updated_at={dag_run.updated_at}, state={dag_run.state}")
+        
+        # 直接查询最近更新的任务实例,不再通过execution_date过滤
+        # 只通过dag_id过滤,按更新时间降序排序
+        task_instances = session.query(TaskInstance).filter(
+            TaskInstance.dag_id == dag_id
+        ).order_by(desc(TaskInstance.updated_at)).limit(100).all()  # 获取最近100条记录
+        
+        # 日志记录找到的任务实例数量
+        logger.debug(f"找到 {len(task_instances)} 个最近的任务实例")
         
-        # 获取所有任务实例
-        task_instances = TaskInstance.find(dag_id=dag_id, execution_date=dag_run.execution_date)
+        # 关闭会话
+        session.close()
         
         # 统计任务状态
         total_tasks = len(task_instances)
@@ -849,6 +864,8 @@ def get_execution_stats(exec_date):
         # 汇总统计信息
         stats = {
             "exec_date": exec_date,
+            "dag_run_execution_date": dag_run.execution_date,
+            "dag_run_updated_at": dag_run.updated_at,
             "total_tasks": total_tasks,
             "type_counts": type_counts,
             "success_count": success_count,
@@ -866,53 +883,17 @@ def get_execution_stats(exec_date):
         logger.error(f"获取执行统计信息时出错: {str(e)}")
         import traceback
         logger.error(traceback.format_exc())
-        return {}
+        # 在出错时显式抛出异常,确保任务失败
+        raise
 
 def generate_execution_report(exec_date, stats):
-    """生成执行报告"""
-    # 构建报告
+    """生成简化的执行报告,只包含基本信息"""
+    # 构建简化报告
     report = []
     report.append(f"========== 数据运维系统执行报告 ==========")
     report.append(f"执行日期: {exec_date}")
     report.append(f"总任务数: {stats['total_tasks']}")
     
-    # 任务类型分布
-    report.append("\n--- 任务类型分布 ---")
-    for label, count in stats.get('type_counts', {}).items():
-        report.append(f"{label} 任务: {count} 个")
-    
-    # 执行结果统计
-    report.append("\n--- 执行结果统计 ---")
-    report.append(f"成功任务: {stats.get('success_count', 0)} 个")
-    report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
-    report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
-    report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
-    
-    # 执行时间统计
-    report.append("\n--- 执行时间统计 (秒) ---")
-    avg_duration = stats.get('avg_duration')
-    min_duration = stats.get('min_duration')
-    max_duration = stats.get('max_duration')
-    
-    report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
-    report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
-    report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
-    
-    # 失败任务详情
-    failed_tasks = stats.get('failed_tasks', [])
-    if failed_tasks:
-        report.append("\n--- 失败任务详情 ---")
-        for i, task in enumerate(failed_tasks, 1):
-            report.append(f"{i}. 任务ID: {task['task_id']}")
-            report.append(f"   状态: {task['state']}")
-            exec_duration = task.get('exec_duration')
-            if exec_duration is not None:
-                report.append(f"   执行时间: {exec_duration:.2f} 秒")
-            else:
-                report.append("   执行时间: N/A")
-    
-    report.append("\n========== 报告结束 ==========")
-    
     # 将报告转换为字符串
     report_str = "\n".join(report)
     
@@ -923,79 +904,96 @@ def generate_execution_report(exec_date, stats):
 
 def summarize_execution(**kwargs):
     """简化的汇总执行情况函数,只判断整个作业是否成功"""
-    try:
-        exec_date = kwargs.get('ds') or get_today_date()
-        logger.info(f"开始汇总执行日期 {exec_date} 的执行情况")
-        
-        # 获取任务实例对象
-        task_instance = kwargs.get('ti')
-        dag_id = task_instance.dag_id
-        
-        # 获取DAG运行状态信息
-        from airflow.models import DagRun
-        from airflow.utils.state import State
-        
-        # 查找对应的DAG运行
-        dag_runs = DagRun.find(dag_id=dag_id, execution_date=task_instance.execution_date)
-        
-        if not dag_runs or len(dag_runs) == 0:
-            logger.warning(f"未找到DAG {dag_id} 在执行日期 {exec_date} 的运行记录")
-            state = "UNKNOWN"
-            success = False
-        else:
-            # 获取状态
-            dag_run = dag_runs[0]  # 取第一个匹配的DAG运行
-            state = dag_run.state
-            logger.info(f"DAG {dag_id} 的状态为: {state}")
-            
-            # 判断是否成功
-            success = (state == State.SUCCESS)
-        
-        # 获取更详细的执行统计信息
-        stats = get_execution_stats(exec_date)
-        
-        # 创建简单的报告
-        if success:
-            report = f"DAG {dag_id} 在 {exec_date} 的执行成功完成。"
-            if stats:
-                report += f" 总共有 {stats.get('total_tasks', 0)} 个任务," \
-                          f"其中成功 {stats.get('success_count', 0)} 个," \
-                          f"失败 {stats.get('fail_count', 0)} 个。"
-        else:
-            report = f"DAG {dag_id} 在 {exec_date} 的执行未成功完成,状态为: {state}。"
-            if stats and stats.get('failed_tasks'):
-                report += f" 有 {len(stats.get('failed_tasks', []))} 个任务失败。"
-        
-        # 记录执行结果
-        logger.info(report)
-        
-        # 如果 stats 为空,创建一个简单的状态信息
-        if not stats:
-            stats = {
-                "exec_date": exec_date,
-                "success": success,
-                "dag_id": dag_id,
-                "dag_run_state": state
-            }
-        
+    exec_date = kwargs.get('ds') or get_today_date()
+    execution_date = kwargs.get('execution_date')
+    
+    # 记录重要的时间参数,仅供参考
+    logger.debug(f"【时间参数】summarize_execution: ds={exec_date}, execution_date={execution_date}")
+    logger.debug(f"开始汇总执行日期 {exec_date} 的执行情况")
+    
+    # 获取任务实例对象
+    task_instance = kwargs.get('ti')
+    dag_id = task_instance.dag_id
+    
+    # 获取DAG运行状态信息 - 直接查询最近的运行
+    from airflow.models import DagRun
+    from airflow.utils.state import State
+    from sqlalchemy import desc
+    from airflow import settings
+    
+    logger.debug("直接查询最近更新的DAG运行记录,不依赖执行日期")
+    session = settings.Session()
+    
+    # 查询最近的DAG运行,按updated_at降序排序
+    dag_runs = session.query(DagRun).filter(
+        DagRun.dag_id == dag_id
+    ).order_by(desc(DagRun.updated_at)).limit(1).all()
+    
+    session.close()
+    
+    if not dag_runs or len(dag_runs) == 0:
+        logger.warning(f"未找到DAG {dag_id} 的任何运行记录")
+        state = "UNKNOWN"
+        success = False
+    else:
+        # 获取状态
+        dag_run = dag_runs[0]  # 取最近更新的DAG运行
+        state = dag_run.state
+        logger.debug(f"找到最近的DAG运行: execution_date={dag_run.execution_date}, updated_at={dag_run.updated_at}, state={state}")
+        logger.debug(f"DAG {dag_id} 的状态为: {state}")
+        
+        # 判断是否成功
+        success = (state == State.SUCCESS)
+    
+    # 获取更详细的执行统计信息 - 直接调用get_execution_stats而不关心具体日期
+    stats = get_execution_stats(exec_date)
+    
+    # 创建简单的报告
+    if success:
+        report = f"DAG {dag_id} 在 {exec_date} 的执行成功完成。"
+        if stats:
+            report += f" 总共有 {stats.get('total_tasks', 0)} 个任务," \
+                      f"其中成功 {stats.get('success_count', 0)} 个," \
+                      f"失败 {stats.get('fail_count', 0)} 个。"
+    else:
+        report = f"DAG {dag_id} 在 {exec_date} 的执行未成功完成,状态为: {state}。"
+        if stats and stats.get('failed_tasks'):
+            report += f" 有 {len(stats.get('failed_tasks', []))} 个任务失败。"
+    
+    # 记录执行结果
+    logger.info(report)
+    
+    # 如果 stats 为空或缺少total_tasks字段,创建一个完整的状态信息
+    if not stats or 'total_tasks' not in stats:
+        stats = {
+            "exec_date": exec_date,
+            "total_tasks": 0,
+            "type_counts": {},
+            "success_count": 0,
+            "fail_count": 0,
+            "pending_count": 0,
+            "success_rate": 0,
+            "avg_duration": None,
+            "min_duration": None,
+            "max_duration": None,
+            "failed_tasks": [],
+            "success": success,
+            "dag_id": dag_id,
+            "dag_run_state": state
+        }
+    else:
         # 添加success状态到stats
         stats["success"] = success
-        
-        # 将结果推送到XCom
-        task_instance.xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
-        task_instance.xcom_push(key='execution_report', value=report)
-        task_instance.xcom_push(key='execution_success', value=success)
-        
-        # 生成简化的执行报告
-        simple_report = generate_execution_report(exec_date, stats)
-        
-        return simple_report
-    except Exception as e:
-        logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        # 返回一个简单的错误报告
-        return f"执行汇总时出现错误: {str(e)}"
+    
+    # 将结果推送到XCom
+    task_instance.xcom_push(key='execution_stats', value=stats)
+    task_instance.xcom_push(key='execution_report', value=report)
+    task_instance.xcom_push(key='execution_success', value=success)
+    
+    # 生成简化的执行报告
+    simple_report = generate_execution_report(exec_date, stats)
+    
+    return simple_report
 
 # 添加新函数,用于从数据库获取执行计划
 def get_execution_plan_from_db(ds):
@@ -1008,6 +1006,15 @@ def get_execution_plan_from_db(ds):
     返回:
         dict: 执行计划字典,如果找不到则返回None
     """
+    # 记录输入参数详细信息
+    if isinstance(ds, datetime):
+        if ds.tzinfo:
+            logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 带时区: {ds.tzinfo}")
+        else:
+            logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 无时区")
+    else:
+        logger.debug(f"【执行日期】get_execution_plan_from_db接收到: {ds}, 类型: {type(ds)}")
+    
     logger.info(f"尝试从数据库获取执行日期 {ds} 的执行计划")
     conn = get_pg_conn()
     cursor = conn.cursor()
@@ -1095,6 +1102,12 @@ with DAG(
     }
 ) as dag:
     
+    # 记录DAG实例化时的重要信息
+    now = datetime.now()
+    now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
+    default_exec_date = get_today_date()
+    logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
+    
     #############################################
     # 阶段1: 准备阶段(Prepare Phase)
     #############################################
@@ -1113,8 +1126,8 @@ with DAG(
         
         # 验证执行计划有效性
         check_plan = ShortCircuitOperator(
-            task_id="check_execution_plan_file",
-            python_callable=check_execution_plan_file,
+            task_id="check_execution_plan",
+            python_callable=check_execution_plan,
             provide_context=True
         )
         
@@ -1179,6 +1192,12 @@ with DAG(
         exec_date = get_today_date()  # 使用当天日期作为默认值
         logger.info(f"当前DAG执行日期 ds={exec_date},尝试从数据库获取执行计划")
         
+        # 记录实际使用的执行日期的时区信息和原始格式
+        if isinstance(exec_date, datetime):
+            logger.info(f"【执行日期详情】类型: datetime, 时区: {exec_date.tzinfo}, 值: {exec_date}")
+        else:
+            logger.info(f"【执行日期详情】类型: {type(exec_date)}, 值: {exec_date}")
+        
         # 从数据库获取执行计划
         execution_plan = get_execution_plan_from_db(exec_date)
         

+ 112 - 205
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -16,7 +16,7 @@ from common import (
     get_neo4j_driver,
     get_today_date
 )
-from config import PG_CONFIG, NEO4J_CONFIG, EXECUTION_PLAN_KEEP_COUNT
+from config import PG_CONFIG, NEO4J_CONFIG
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
@@ -282,91 +282,32 @@ def get_subscription_state_hash():
         cursor.close()
         conn.close()
 
-def has_any_execution_plans():
+def check_execution_plan_in_db(exec_date):
     """
-    检查当前目录下是否存在任何执行计划文件
-    
-    返回:
-        bool: 如果存在任何执行计划文件返回True,否则返回False
-    """
-    dag_dir = os.path.dirname(__file__)
-    for file in os.listdir(dag_dir):
-        if file.startswith('exec_plan_') and file.endswith('.json'):
-            logger.info(f"找到现有执行计划文件: {file}")
-            return True
-    
-    logger.info("未找到任何执行计划文件")
-    return False
-
-def get_execution_plan_files():
-    """
-    获取所有执行计划文件,按日期排序
-    
-    返回:
-        list: 排序后的执行计划文件列表,格式为[(日期, json文件路径, ready文件路径)]
-    """
-    dag_dir = os.path.dirname(__file__)
-    plan_files = []
-    
-    # 查找所有执行计划文件
-    for file in os.listdir(dag_dir):
-        match = re.match(r'exec_plan_(\d{4}-\d{2}-\d{2})\.json', file)
-        if match:
-            date_str = match.group(1)
-            json_path = os.path.join(dag_dir, file)
-            ready_path = os.path.join(dag_dir, f"exec_plan_{date_str}.ready")
-            
-            if os.path.exists(ready_path):
-                plan_files.append((date_str, json_path, ready_path))
-    
-    # 按日期排序(从旧到新)
-    plan_files.sort(key=lambda x: x[0])
-    
-    return plan_files
-
-def cleanup_old_execution_plans(keep_count=None):
-    """
-    清理过期的执行计划文件,保留最新的指定数量
+    检查数据库中是否已存在指定日期的执行计划
     
     参数:
-        keep_days (int): 要保留的文件天数,如果为None则使用配置
-    
+        exec_date (str): 执行日期,格式为YYYY-MM-DD
+        
     返回:
-        int: 删除的文件数量
+        bool: 如果存在返回True,否则返回False
     """
-    if keep_count is None:
-        keep_count = EXECUTION_PLAN_KEEP_COUNT
-    
-    # 获取所有执行计划文件
-    plan_files = get_execution_plan_files()
-    logger.info(f"找到 {len(plan_files)} 个执行计划文件,将保留最新的 {keep_count} 个")
-    
-    # 如果文件数量未超过保留数,不需要删除
-    if len(plan_files) <= keep_count:
-        logger.info(f"执行计划文件数量 ({len(plan_files)}) 未超过保留数量 ({keep_count}),无需清理")
-        return 0
-    
-    # 删除最旧的文件
-    files_to_delete = plan_files[:-keep_count]
-    deleted_count = 0
-    
-    for _, json_path, ready_path in files_to_delete:
-        try:
-            # 删除JSON文件
-            if os.path.exists(json_path):
-                os.remove(json_path)
-                deleted_count += 1
-                logger.info(f"已删除过期执行计划文件: {json_path}")
-            
-            # 删除ready文件
-            if os.path.exists(ready_path):
-                os.remove(ready_path)
-                deleted_count += 1
-                logger.info(f"已删除过期ready文件: {ready_path}")
-        except Exception as e:
-            logger.error(f"删除文件时出错: {str(e)}")
-    
-    return deleted_count
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            SELECT COUNT(*)
+            FROM airflow_exec_plans
+            WHERE ds = %s
+        """, (exec_date,))
+        count = cursor.fetchone()[0]
+        return count > 0
+    except Exception as e:
+        logger.error(f"检查数据库中执行计划时出错: {str(e)}")
+        return False
+    finally:
+        cursor.close()
+        conn.close()
 
 def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
     """
@@ -420,21 +361,16 @@ def prepare_pipeline_dag_schedule(**kwargs):
     exec_date = kwargs.get('ds') or get_today_date()
     logger.info(f"开始准备执行日期 {exec_date} 的Pipeline调度任务")
     
-    # 定义执行计划文件路径 - 使用新的基于日期的命名
-    plan_base_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}')
-    plan_path = f"{plan_base_path}.json"
-    ready_path = f"{plan_base_path}.ready"
-    
-    # 检查是否需要创建新的执行计划文件
+    # 检查是否需要创建新的执行计划
     need_create_plan = False
     
-    # 新的条件1: 当前目录下没有任何json文件
-    has_any_plans = has_any_execution_plans()
-    if not has_any_plans:
-        logger.info("当前目录下没有任何执行计划文件,需要创建新的执行计划")
+    # 条件1: 数据库中不存在当天的执行计划
+    has_plan_in_db = check_execution_plan_in_db(exec_date)
+    if not has_plan_in_db:
+        logger.info(f"数据库中不存在执行日期 {exec_date} 的执行计划,需要创建新的执行计划")
         need_create_plan = True
     
-    # 新的条件2: schedule_status表中的数据发生了变更
+    # 条件2: schedule_status表中的数据发生了变更
     if not need_create_plan:
         # 计算当前哈希值
         current_hash = get_subscription_state_hash()
@@ -460,7 +396,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
     
     # 如果不需要创建新的执行计划,直接返回
     if not need_create_plan:
-        logger.info("无需创建新的执行计划文件")
+        logger.info("无需创建新的执行计划")
         return 0
     
     # 继续处理,创建新的执行计划
@@ -489,7 +425,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
     valid_tables = filter_invalid_tables(enriched_tables)
     logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
     
-    # 保存最新执行计划,供DAG读取使用
+    # 构建执行计划并保存到数据库
     try:
         # 构建执行计划
         resource_tasks = []
@@ -551,148 +487,119 @@ def prepare_pipeline_dag_schedule(**kwargs):
             "dependencies": dependencies
         }
         
-        # 创建临时文件
-        temp_plan_path = f"{plan_path}.temp"
+        # 更新订阅表状态哈希值
+        current_hash = get_subscription_state_hash()
+        hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
+        with open(hash_file, 'w') as f:
+            f.write(current_hash)
+        logger.info(f"已更新订阅表状态哈希值: {current_hash}")
+        
+        # 触发数据调度器DAG重新解析
+        touch_data_scheduler_file()
         
+        # 保存执行计划到数据库表
         try:
-            # 写入临时文件
-            with open(temp_plan_path, 'w') as f:
-                json.dump(execution_plan, f, indent=2)
-            logger.info(f"已保存执行计划到临时文件: {temp_plan_path}")
-            
-            # 原子替换正式文件
-            os.replace(temp_plan_path, plan_path)
-            logger.info(f"已替换执行计划文件: {plan_path}")
-            
-            # 创建ready文件,标记执行计划就绪,包含详细时间信息
-            now = datetime.now()
-            timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
-            with open(ready_path, 'w') as f:
-                f.write(f"Created at: {timestamp}\nFor date: {exec_date}")
-            logger.info(f"已创建ready标记文件: {ready_path}")
-            
-            # 更新订阅表状态哈希值
-            current_hash = get_subscription_state_hash()
-            hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
-            with open(hash_file, 'w') as f:
-                f.write(current_hash)
-            logger.info(f"已更新订阅表状态哈希值: {current_hash}")
-            
-            # 清理过期的执行计划文件
-            deleted_count = cleanup_old_execution_plans()
-            logger.info(f"清理了 {deleted_count} 个过期执行计划文件")
+            # 获取DAG运行信息
+            dag_run = kwargs.get('dag_run')
+            if dag_run:
+                dag_id = dag_run.dag_id
+                run_id = dag_run.run_id
+                logical_date = dag_run.logical_date
+            else:
+                # 如果无法获取dag_run,使用默认值
+                dag_id = kwargs.get('dag').dag_id if 'dag' in kwargs else "dag_dataops_pipeline_prepare_scheduler"
+                run_id = f"manual_{datetime.now().strftime('%Y%m%d%H%M%S')}"
+                logical_date = datetime.now()
             
-            # dag_dataops_pipeline_data_scheduler.py文件的修改日期更新
-            touch_data_scheduler_file()
+            # 保存到数据库
+            save_result = save_execution_plan_to_db(
+                execution_plan=execution_plan,
+                dag_id=dag_id,
+                run_id=run_id,
+                logical_date=logical_date,
+                ds=exec_date
+            )
             
-            # 保存执行计划到数据库表
-            try:
-                # 获取DAG运行信息
-                dag_run = kwargs.get('dag_run')
-                if dag_run:
-                    dag_id = dag_run.dag_id
-                    run_id = dag_run.run_id
-                    logical_date = dag_run.logical_date
-                else:
-                    # 如果无法获取dag_run,使用默认值
-                    dag_id = kwargs.get('dag').dag_id if 'dag' in kwargs else "dag_dataops_pipeline_prepare_scheduler"
-                    run_id = f"manual_{datetime.now().strftime('%Y%m%d%H%M%S')}"
-                    logical_date = datetime.now()
-                
-                # 保存到数据库
-                save_result = save_execution_plan_to_db(
-                    execution_plan=execution_plan,
-                    dag_id=dag_id,
-                    run_id=run_id,
-                    logical_date=logical_date,
-                    ds=exec_date
-                )
-                
-                if save_result:
-                    logger.info("执行计划已成功保存到数据库")
-                else:
-                    logger.warning("执行计划保存到数据库失败,但文件已保存成功")
-            except Exception as db_e:
-                # 捕获数据库保存错误,但不影响主流程
-                logger.error(f"保存执行计划到数据库时出错: {str(db_e)}")
-                logger.info("继续执行,因为文件已成功保存")
+            if save_result:
+                logger.info("执行计划已成功保存到数据库")
+            else:
+                raise Exception("执行计划保存到数据库失败")
             
-        except Exception as e:
-            logger.error(f"保存执行计划文件或触发DAG重新解析时出错: {str(e)}")
-            # 出错时清理临时文件
-            if os.path.exists(temp_plan_path):
-                try:
-                    os.remove(temp_plan_path)
-                    logger.info(f"已清理临时文件: {temp_plan_path}")
-                except Exception as rm_e:
-                    logger.error(f"清理临时文件时出错: {str(rm_e)}")
-            raise  # 重新抛出异常,确保任务失败
+        except Exception as db_e:
+            # 捕获数据库保存错误
+            error_msg = f"保存执行计划到数据库时出错: {str(db_e)}"
+            logger.error(error_msg)
+            raise Exception(error_msg)
                 
     except Exception as e:
-        error_msg = f"保存或验证执行计划文件时出错: {str(e)}"
+        error_msg = f"创建或保存执行计划时出错: {str(e)}"
         logger.error(error_msg)
         # 强制抛出异常,确保任务失败,阻止下游DAG执行
         raise Exception(error_msg)
     
-    return len(valid_tables)  # 返回有效表数量代替插入记录数
+    return len(valid_tables)  # 返回有效表数量
 
-def check_execution_plan_file(**kwargs):
+def check_execution_plan_db(**kwargs):
     """
-    检查当天的执行计划文件是否存在且有效
+    检查当天的执行计划是否存在于数据库中
     返回False将阻止所有下游任务执行
     """
     # 获取执行日期
     exec_date = kwargs.get('ds') or get_today_date()
-    logger.info(f"检查执行日期 {exec_date} 的执行计划文件是否存在且有效")
+    logger.info(f"检查执行日期 {exec_date} 的执行计划是否存在于数据库中")
     
-    # 定义执行计划文件路径
-    plan_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}.json')
-    ready_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}.ready')
-    
-    # 检查文件是否存在
-    if not os.path.exists(plan_path):
-        logger.error(f"执行计划文件不存在: {plan_path}")
-        return False
-    
-    # 检查ready标记是否存在
-    if not os.path.exists(ready_path):
-        logger.error(f"执行计划ready标记文件不存在: {ready_path}")
-        return False
-    
-    # 检查文件是否可读且内容有效
+    # 检查数据库中是否存在执行计划
+    conn = get_pg_conn()
+    cursor = conn.cursor()
     try:
-        with open(plan_path, 'r') as f:
-            data = json.load(f)
+        cursor.execute("""
+            SELECT plan
+            FROM airflow_exec_plans
+            WHERE ds = %s
+            ORDER BY logical_date DESC
+            LIMIT 1
+        """, (exec_date,))
+        
+        result = cursor.fetchone()
+        if not result:
+            logger.error(f"数据库中不存在执行日期 {exec_date} 的执行计划")
+            return False
+        
+        # 检查执行计划内容是否有效
+        try:
+            # PostgreSQL的jsonb类型会被psycopg2自动转换为Python字典,无需再使用json.loads
+            plan_data = result[0]
             
             # 检查必要字段
-            if "exec_date" not in data:
+            if "exec_date" not in plan_data:
                 logger.error("执行计划缺少exec_date字段")
                 return False
                 
-            if not isinstance(data.get("resource_tasks", []), list):
+            if not isinstance(plan_data.get("resource_tasks", []), list):
                 logger.error("执行计划的resource_tasks字段无效")
                 return False
                 
-            if not isinstance(data.get("model_tasks", []), list):
+            if not isinstance(plan_data.get("model_tasks", []), list):
                 logger.error("执行计划的model_tasks字段无效")
                 return False
             
             # 检查是否有任务数据
-            resource_tasks = data.get("resource_tasks", [])
-            model_tasks = data.get("model_tasks", [])
-            if not resource_tasks and not model_tasks:
-                logger.warning("执行计划不包含任何任务,但文件格式有效")
-                # 注意:即使没有任务,我们仍然允许流程继续
+            resource_tasks = plan_data.get("resource_tasks", [])
+            model_tasks = plan_data.get("model_tasks", [])
             
-            logger.info(f"执行计划文件验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
+            logger.info(f"执行计划验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
             return True
             
-    except json.JSONDecodeError as je:
-        logger.error(f"执行计划文件不是有效的JSON: {str(je)}")
-        return False
+        except Exception as je:
+            logger.error(f"处理执行计划数据时出错: {str(je)}")
+            return False
+        
     except Exception as e:
-        logger.error(f"检查执行计划文件时出错: {str(e)}")
+        logger.error(f"检查数据库中执行计划时出错: {str(e)}")
         return False
+    finally:
+        cursor.close()
+        conn.close()
 
 # 创建DAG
 with DAG(
@@ -728,10 +635,10 @@ with DAG(
         dag=dag
     )
     
-    # 检查执行计划文件
-    check_plan_file = ShortCircuitOperator(
-        task_id="check_execution_plan_file",
-        python_callable=check_execution_plan_file,
+    # 检查执行计划是否存在于数据库中
+    check_plan_in_db = ShortCircuitOperator(
+        task_id="check_execution_plan_in_db",
+        python_callable=check_execution_plan_db,
         provide_context=True,
         dag=dag
     )
@@ -743,4 +650,4 @@ with DAG(
     )
     
     # 设置任务依赖
-    start_preparation >> prepare_task >> check_plan_file >> preparation_completed
+    start_preparation >> prepare_task >> check_plan_in_db >> preparation_completed