瀏覽代碼

修改pipeline_prepare代码中的支持多个版本的json文件,还未删除它把执行计划写入数据库表的功能

wangxq 1 月之前
父節點
當前提交
1e4838a523
共有 2 個文件被更改,包括 286 次插入175 次删除
  1. 2 0
      dags/config.py
  2. 284 175
      dags/dag_dataops_pipeline_prepare_scheduler.py

+ 2 - 0
dags/config.py

@@ -37,3 +37,5 @@ SCRIPTS_BASE_PATH = "/opt/airflow/dataops/scripts"
 # 本地开发环境脚本路径(如果需要区分环境)
 # LOCAL_SCRIPTS_BASE_PATH = "/path/to/local/scripts"
 
+# 执行计划保留的数量
+EXECUTION_PLAN_KEEP_COUNT = 5

+ 284 - 175
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -7,6 +7,8 @@ import logging
 import networkx as nx
 import json
 import os
+import re
+import glob
 from pathlib import Path
 import hashlib
 from common import (
@@ -14,7 +16,7 @@ from common import (
     get_neo4j_driver,
     get_today_date
 )
-from config import PG_CONFIG, NEO4J_CONFIG
+from config import PG_CONFIG, NEO4J_CONFIG, EXECUTION_PLAN_KEEP_COUNT
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
@@ -358,18 +360,21 @@ def touch_data_scheduler_file():
         bool: 是否成功更新
     """
     data_scheduler_path = os.path.join(os.path.dirname(__file__), 'dag_dataops_pipeline_data_scheduler.py')
+
     
+    success = False
     try:
         if os.path.exists(data_scheduler_path):
             # 更新文件修改时间,触发Airflow重新解析
             os.utime(data_scheduler_path, None)
             logger.info(f"已触发数据调度器DAG重新解析: {data_scheduler_path}")
-            return True
+            success = True
         else:
             logger.warning(f"数据调度器DAG文件不存在: {data_scheduler_path}")
-            return False
+                
+        return success
     except Exception as e:
-        logger.error(f"触发数据调度器DAG重新解析时出错: {str(e)}")
+        logger.error(f"触发DAG重新解析时出错: {str(e)}")
         return False
 
 def get_subscription_state_hash():
@@ -393,6 +398,92 @@ def get_subscription_state_hash():
         cursor.close()
         conn.close()
 
+def has_any_execution_plans():
+    """
+    检查当前目录下是否存在任何执行计划文件
+    
+    返回:
+        bool: 如果存在任何执行计划文件返回True,否则返回False
+    """
+    dag_dir = os.path.dirname(__file__)
+    for file in os.listdir(dag_dir):
+        if file.startswith('exec_plan_') and file.endswith('.json'):
+            logger.info(f"找到现有执行计划文件: {file}")
+            return True
+    
+    logger.info("未找到任何执行计划文件")
+    return False
+
+def get_execution_plan_files():
+    """
+    获取所有执行计划文件,按日期排序
+    
+    返回:
+        list: 排序后的执行计划文件列表,格式为[(日期, json文件路径, ready文件路径)]
+    """
+    dag_dir = os.path.dirname(__file__)
+    plan_files = []
+    
+    # 查找所有执行计划文件
+    for file in os.listdir(dag_dir):
+        match = re.match(r'exec_plan_(\d{4}-\d{2}-\d{2})\.json', file)
+        if match:
+            date_str = match.group(1)
+            json_path = os.path.join(dag_dir, file)
+            ready_path = os.path.join(dag_dir, f"exec_plan_{date_str}.ready")
+            
+            if os.path.exists(ready_path):
+                plan_files.append((date_str, json_path, ready_path))
+    
+    # 按日期排序(从旧到新)
+    plan_files.sort(key=lambda x: x[0])
+    
+    return plan_files
+
+def cleanup_old_execution_plans(keep_count=None):
+    """
+    清理过期的执行计划文件,保留最新的指定数量
+    
+    参数:
+        keep_days (int): 要保留的文件天数,如果为None则使用配置
+    
+    返回:
+        int: 删除的文件数量
+    """
+    if keep_count is None:
+        keep_count = EXECUTION_PLAN_KEEP_COUNT
+    
+    # 获取所有执行计划文件
+    plan_files = get_execution_plan_files()
+    logger.info(f"找到 {len(plan_files)} 个执行计划文件,将保留最新的 {keep_count} 个")
+    
+    # 如果文件数量未超过保留数,不需要删除
+    if len(plan_files) <= keep_count:
+        logger.info(f"执行计划文件数量 ({len(plan_files)}) 未超过保留数量 ({keep_count}),无需清理")
+        return 0
+    
+    # 删除最旧的文件
+    files_to_delete = plan_files[:-keep_count]
+    deleted_count = 0
+    
+    for _, json_path, ready_path in files_to_delete:
+        try:
+            # 删除JSON文件
+            if os.path.exists(json_path):
+                os.remove(json_path)
+                deleted_count += 1
+                logger.info(f"已删除过期执行计划文件: {json_path}")
+            
+            # 删除ready文件
+            if os.path.exists(ready_path):
+                os.remove(ready_path)
+                deleted_count += 1
+                logger.info(f"已删除过期ready文件: {ready_path}")
+        except Exception as e:
+            logger.error(f"删除文件时出错: {str(e)}")
+    
+    return deleted_count
+
 def prepare_pipeline_dag_schedule(**kwargs):
     """准备Pipeline DAG调度任务的主函数"""
     # 检查是否是手动触发模式
@@ -402,212 +493,229 @@ def prepare_pipeline_dag_schedule(**kwargs):
         is_force_refresh = params.get('FORCE_REFRESH', False)
         logger.info(f"接收到强制刷新参数: FORCE_REFRESH={is_force_refresh}")
     
-    # 获取当前日期
+    # 获取执行日期
     exec_date = kwargs.get('ds') or get_today_date()
     logger.info(f"开始准备执行日期 {exec_date} 的Pipeline调度任务")
     
-    # 检查当前日期是否已存在于airflow_dag_schedule表中
-    date_exists = check_if_date_exists(exec_date)
+    # 定义执行计划文件路径 - 使用新的基于日期的命名
+    plan_base_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}')
+    plan_path = f"{plan_base_path}.json"
+    ready_path = f"{plan_base_path}.ready"
+    
+    # 检查是否需要创建新的执行计划文件
+    need_create_plan = False
+    
+    # 新的条件1: 当前目录下没有任何json文件
+    has_any_plans = has_any_execution_plans()
+    if not has_any_plans:
+        logger.info("当前目录下没有任何执行计划文件,需要创建新的执行计划")
+        need_create_plan = True
+    
+    # 新的条件2: schedule_status表中的数据发生了变更
+    if not need_create_plan:
+        # 计算当前哈希值
+        current_hash = get_subscription_state_hash()
+        # 读取上次记录的哈希值
+        hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
+        last_hash = None
+        if os.path.exists(hash_file):
+            try:
+                with open(hash_file, 'r') as f:
+                    last_hash = f.read().strip()
+            except Exception as e:
+                logger.warning(f"读取上次订阅状态哈希值失败: {str(e)}")
+        
+        # 如果哈希值不同,表示数据发生了变更
+        if current_hash != last_hash:
+            logger.info(f"检测到schedule_status表数据变更。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
+            need_create_plan = True
+    
+    # 强制刷新模式覆盖以上判断
+    if is_force_refresh:
+        logger.info("强制刷新模式,将创建新的执行计划")
+        need_create_plan = True
     
-    # 如果日期已存在且不是强制刷新模式,则跳过
-    if date_exists and not is_force_refresh:
-        logger.info(f"执行日期 {exec_date} 已存在于airflow_dag_schedule表中,跳过处理")
+    # 如果不需要创建新的执行计划,直接返回
+    if not need_create_plan:
+        logger.info("无需创建新的执行计划文件")
         return 0
     
-    # 如果是强制刷新模式或日期不存在,则继续处理
-    if is_force_refresh:
-        logger.info(f"强制刷新模式,将清除现有记录并重新生成")
-        clear_existing_records(exec_date)
-    elif date_exists:
-        logger.warning(f"日期存在但未清除记录,请检查逻辑")
+    # 继续处理,创建新的执行计划
+    # 清除数据库中的现有记录
+    clear_existing_records(exec_date)
+    
+    # 1. 获取启用的表
+    enabled_tables = get_enabled_tables()
+    logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
+    
+    if not enabled_tables:
+        logger.warning("没有找到启用的表,准备工作结束")
         return 0
     
-    # 检查执行计划文件和ready文件
-    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-    ready_path = f"{plan_path}.ready"
-    files_exist = os.path.exists(plan_path) and os.path.exists(ready_path)
+    # 2. 获取表的详细信息
+    tables_info = []
+    for table_name in enabled_tables:
+        table_info = get_table_info_from_neo4j(table_name)
+        if table_info:
+            tables_info.append(table_info)
     
-    # 计算当前订阅表状态的哈希值,用于检测变化
-    current_hash = get_subscription_state_hash()
+    logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
     
-    # 读取上次记录的哈希值
-    hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
-    last_hash = None
-    if os.path.exists(hash_file):
-        try:
-            with open(hash_file, 'r') as f:
-                last_hash = f.read().strip()
-        except Exception as e:
-            logger.warning(f"读取上次订阅状态哈希值失败: {str(e)}")
-    
-    # 判断是否需要重新生成执行计划
-    need_regenerate = is_force_refresh or not files_exist or current_hash != last_hash
-    
-    if need_regenerate:
-        if is_force_refresh:
-            logger.info("强制刷新模式,将重新生成执行计划")
-        elif not files_exist:
-            logger.info("执行计划文件或ready标记文件不存在,将重新生成执行计划")
-        elif current_hash != last_hash:
-            logger.info(f"检测到订阅表状态变化。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
-        
-        # 1. 获取启用的表
-        enabled_tables = get_enabled_tables()
-        logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
-        
-        if not enabled_tables:
-            logger.warning("没有找到启用的表,准备工作结束")
-            return 0
-        
-        # 2. 获取表的详细信息
-        tables_info = []
-        for table_name in enabled_tables:
-            table_info = get_table_info_from_neo4j(table_name)
-            if table_info:
-                tables_info.append(table_info)
+    # 3. 处理依赖关系,添加被动调度的表
+    enriched_tables = process_dependencies(tables_info)
+    logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
+    
+    # 4. 过滤无效表及其依赖
+    valid_tables = filter_invalid_tables(enriched_tables)
+    logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
+    
+    # 5. 写入airflow_dag_schedule表
+    inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
+    
+    # 6. 检查插入操作是否成功,如果失败则抛出异常
+    if inserted_count == 0 and valid_tables:
+        error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
+        logger.error(error_msg)
+        raise Exception(error_msg)
+    
+    # 7. 保存最新执行计划,供DAG读取使用
+    try:
+        # 构建执行计划
+        resource_tasks = []
+        model_tasks = []
         
-        logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
+        for table in valid_tables:
+            if table.get('target_table_label') == 'DataResource':
+                resource_tasks.append({
+                    "source_table": table.get('source_table'),
+                    "target_table": table['target_table'],
+                    "target_table_label": "DataResource",
+                    "script_name": table.get('script_name'),
+                    "script_exec_mode": table.get('script_exec_mode', 'append')
+                })
+            elif table.get('target_table_label') == 'DataModel':
+                model_tasks.append({
+                    "source_table": table.get('source_table'),
+                    "target_table": table['target_table'],
+                    "target_table_label": "DataModel",
+                    "script_name": table.get('script_name'),
+                    "script_exec_mode": table.get('script_exec_mode', 'append')
+                })
         
-        # 3. 处理依赖关系,添加被动调度的表
-        enriched_tables = process_dependencies(tables_info)
-        logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
+        # 获取依赖关系
+        model_table_names = [t['target_table'] for t in model_tasks]
+        dependencies = {}
         
-        # 4. 过滤无效表及其依赖
-        valid_tables = filter_invalid_tables(enriched_tables)
-        logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
+        driver = get_neo4j_driver()
+        try:
+            with driver.session() as session:
+                for table_name in model_table_names:
+                    query = """
+                        MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
+                        RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
+                    """
+                    result = session.run(query, table_name=table_name)
+                    
+                    deps = []
+                    for record in result:
+                        target = record.get("target")
+                        target_labels = record.get("target_labels", [])
+                        
+                        if target:
+                            table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
+                            deps.append({
+                                "table_name": target,
+                                "table_type": table_type
+                            })
+                    
+                    dependencies[table_name] = deps
+        finally:
+            driver.close()
         
-        # 5. 写入airflow_dag_schedule表
-        inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
+        # 创建执行计划
+        execution_plan = {
+            "exec_date": exec_date,
+            "resource_tasks": resource_tasks,
+            "model_tasks": model_tasks,
+            "dependencies": dependencies
+        }
         
-        # 6. 检查插入操作是否成功,如果失败则抛出异常
-        if inserted_count == 0 and valid_tables:
-            error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
-            logger.error(error_msg)
-            raise Exception(error_msg)
+        # 创建临时文件
+        temp_plan_path = f"{plan_path}.temp"
         
-        # 7. 保存最新执行计划,供DAG读取使用
         try:
-            # 构建执行计划
-            resource_tasks = []
-            model_tasks = []
+            # 写入临时文件
+            with open(temp_plan_path, 'w') as f:
+                json.dump(execution_plan, f, indent=2)
+            logger.info(f"已保存执行计划到临时文件: {temp_plan_path}")
             
-            for table in valid_tables:
-                if table.get('target_table_label') == 'DataResource':
-                    resource_tasks.append({
-                        "source_table": table.get('source_table'),
-                        "target_table": table['target_table'],
-                        "target_table_label": "DataResource",
-                        "script_name": table.get('script_name'),
-                        "script_exec_mode": table.get('script_exec_mode', 'append')
-                    })
-                elif table.get('target_table_label') == 'DataModel':
-                    model_tasks.append({
-                        "source_table": table.get('source_table'),
-                        "target_table": table['target_table'],
-                        "target_table_label": "DataModel",
-                        "script_name": table.get('script_name'),
-                        "script_exec_mode": table.get('script_exec_mode', 'append')
-                    })
+            # 原子替换正式文件
+            os.replace(temp_plan_path, plan_path)
+            logger.info(f"已替换执行计划文件: {plan_path}")
             
-            # 获取依赖关系
-            model_table_names = [t['target_table'] for t in model_tasks]
-            dependencies = {}
+            # 创建ready文件,标记执行计划就绪,包含详细时间信息
+            now = datetime.now()
+            timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
+            with open(ready_path, 'w') as f:
+                f.write(f"Created at: {timestamp}\nFor date: {exec_date}")
+            logger.info(f"已创建ready标记文件: {ready_path}")
             
-            driver = get_neo4j_driver()
-            try:
-                with driver.session() as session:
-                    for table_name in model_table_names:
-                        query = """
-                            MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
-                            RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
-                        """
-                        result = session.run(query, table_name=table_name)
-                        
-                        deps = []
-                        for record in result:
-                            target = record.get("target")
-                            target_labels = record.get("target_labels", [])
-                            
-                            if target:
-                                table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
-                                deps.append({
-                                    "table_name": target,
-                                    "table_type": table_type
-                                })
-                        
-                        dependencies[table_name] = deps
-            finally:
-                driver.close()
+            # 更新订阅表状态哈希值
+            current_hash = get_subscription_state_hash()
+            hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
+            with open(hash_file, 'w') as f:
+                f.write(current_hash)
+            logger.info(f"已更新订阅表状态哈希值: {current_hash}")
             
-            # 创建执行计划
-            execution_plan = {
-                "exec_date": exec_date,
-                "resource_tasks": resource_tasks,
-                "model_tasks": model_tasks,
-                "dependencies": dependencies
-            }
+            # 清理过期的执行计划文件
+            deleted_count = cleanup_old_execution_plans()
+            logger.info(f"清理了 {deleted_count} 个过期执行计划文件")
             
-            # 创建临时文件
-            temp_plan_path = f"{plan_path}.temp"
+            # dag_dataops_pipeline_data_scheduler.py文件的修改日期更新
+            touch_data_scheduler_file()
             
-            try:
-                # 写入临时文件
-                with open(temp_plan_path, 'w') as f:
-                    json.dump(execution_plan, f, indent=2)
-                logger.info(f"已保存执行计划到临时文件: {temp_plan_path}")
-                
-                # 原子替换正式文件
-                os.replace(temp_plan_path, plan_path)
-                logger.info(f"已替换执行计划文件: {plan_path}")
-                
-                # 创建ready文件,标记执行计划就绪
-                with open(ready_path, 'w') as f:
-                    f.write(datetime.now().isoformat())
-                logger.info(f"已创建ready标记文件: {ready_path}")
-                
-                # 更新订阅表状态哈希值
-                with open(hash_file, 'w') as f:
-                    f.write(current_hash)
-                logger.info(f"已更新订阅表状态哈希值: {current_hash}")
-                
-                # 触发pipeline_data_scheduler DAG重新解析
-                touch_data_scheduler_file()
-                
-            except Exception as e:
-                logger.error(f"保存执行计划文件或触发DAG重新解析时出错: {str(e)}")
-                # 出错时清理临时文件
-                if os.path.exists(temp_plan_path):
-                    try:
-                        os.remove(temp_plan_path)
-                        logger.info(f"已清理临时文件: {temp_plan_path}")
-                    except Exception as rm_e:
-                        logger.error(f"清理临时文件时出错: {str(rm_e)}")
-                raise  # 重新抛出异常,确保任务失败
-                
         except Exception as e:
-            error_msg = f"保存或验证执行计划文件时出错: {str(e)}"
-            logger.error(error_msg)
-            # 强制抛出异常,确保任务失败,阻止下游DAG执行
-            raise Exception(error_msg)
-        
-        return inserted_count
-    else:
-        logger.info("订阅表状态未变化且执行计划文件存在,无需更新执行计划")
-        return 0
+            logger.error(f"保存执行计划文件或触发DAG重新解析时出错: {str(e)}")
+            # 出错时清理临时文件
+            if os.path.exists(temp_plan_path):
+                try:
+                    os.remove(temp_plan_path)
+                    logger.info(f"已清理临时文件: {temp_plan_path}")
+                except Exception as rm_e:
+                    logger.error(f"清理临时文件时出错: {str(rm_e)}")
+            raise  # 重新抛出异常,确保任务失败
+                
+    except Exception as e:
+        error_msg = f"保存或验证执行计划文件时出错: {str(e)}"
+        logger.error(error_msg)
+        # 强制抛出异常,确保任务失败,阻止下游DAG执行
+        raise Exception(error_msg)
+    
+    return inserted_count
 
 def check_execution_plan_file(**kwargs):
     """
-    检查执行计划文件是否存在且有效
+    检查当天的执行计划文件是否存在且有效
     返回False将阻止所有下游任务执行
     """
-    logger.info("检查执行计划文件是否存在且有效")
-    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+    # 获取执行日期
+    exec_date = kwargs.get('ds') or get_today_date()
+    logger.info(f"检查执行日期 {exec_date} 的执行计划文件是否存在且有效")
+    
+    # 定义执行计划文件路径
+    plan_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}.json')
+    ready_path = os.path.join(os.path.dirname(__file__), f'exec_plan_{exec_date}.ready')
     
     # 检查文件是否存在
     if not os.path.exists(plan_path):
         logger.error(f"执行计划文件不存在: {plan_path}")
         return False
     
+    # 检查ready标记是否存在
+    if not os.path.exists(ready_path):
+        logger.error(f"执行计划ready标记文件不存在: {ready_path}")
+        return False
+    
     # 检查文件是否可读且内容有效
     try:
         with open(plan_path, 'r') as f:
@@ -681,6 +789,7 @@ with DAG(
     check_plan_file = ShortCircuitOperator(
         task_id="check_execution_plan_file",
         python_callable=check_execution_plan_file,
+        provide_context=True,
         dag=dag
     )