Procházet zdrojové kódy

删除了对airflow_dag_schedule的依赖,但是对json和json.ready的依赖没有删除干净

wangxq před 1 měsícem
rodič
revize
93b2c15da7
1 změnil soubory, kde provedl 86 přidání a 125 odebrání
  1. 86 125
      dags/dag_dataops_pipeline_data_scheduler.py

+ 86 - 125
dags/dag_dataops_pipeline_data_scheduler.py

@@ -20,7 +20,6 @@ from decimal import Decimal
 from common import (
     get_pg_conn, 
     get_neo4j_driver,
-    execute_with_monitoring,
     get_today_date
 )
 from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
@@ -52,11 +51,11 @@ def validate_database_connection():
         version = cursor.fetchone()
         log_debug(f"数据库连接正常,PostgreSQL版本: {version[0]}")
         
-        # 检查airflow_dag_schedule表是否存在
+        # 检查airflow_exec_plans表是否存在
         cursor.execute("""
             SELECT EXISTS (
                SELECT FROM information_schema.tables 
-               WHERE table_name = 'airflow_dag_schedule'
+               WHERE table_name = 'airflow_exec_plans'
             )
         """)
         table_exists = cursor.fetchone()[0]
@@ -65,24 +64,24 @@ def validate_database_connection():
             cursor.execute("""
                 SELECT column_name, data_type 
                 FROM information_schema.columns 
-                WHERE table_name = 'airflow_dag_schedule'
+                WHERE table_name = 'airflow_exec_plans'
             """)
             columns = cursor.fetchall()
-            log_debug(f"airflow_dag_schedule表存在,列信息:")
+            log_debug(f"airflow_exec_plans表存在,列信息:")
             for col in columns:
                 log_debug(f"  - {col[0]}: {col[1]}")
             
             # 查询最新记录数量
-            cursor.execute("SELECT COUNT(*) FROM airflow_dag_schedule")
+            cursor.execute("SELECT COUNT(*) FROM airflow_exec_plans")
             count = cursor.fetchone()[0]
-            log_debug(f"airflow_dag_schedule表中有 {count} 条记录")
+            log_debug(f"airflow_exec_plans表中有 {count} 条记录")
             
             # 检查最近的执行记录
             cursor.execute("""
-                SELECT exec_date, COUNT(*) as record_count
-                FROM airflow_dag_schedule
-                GROUP BY exec_date
-                ORDER BY exec_date DESC
+                SELECT ds, COUNT(*) as record_count
+                FROM airflow_exec_plans
+                GROUP BY ds
+                ORDER BY ds DESC
                 LIMIT 3
             """)
             recent_dates = cursor.fetchall()
@@ -90,7 +89,7 @@ def validate_database_connection():
             for date_info in recent_dates:
                 log_debug(f"  - {date_info[0]}: {date_info[1]} 条记录")
         else:
-            log_debug("airflow_dag_schedule表不存在!")
+            log_debug("airflow_exec_plans表不存在!")
         
         cursor.close()
         conn.close()
@@ -163,26 +162,45 @@ def execute_python_script(target_table, script_name, script_exec_mode, exec_date
     start_time = datetime.now()
     
     try:
-        # 执行实际脚本
-        logger.info(f"开始执行脚本: {script_name}")
-        result = execute_python_script(script_name, target_table, script_exec_mode)
-        logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
-        
-        # 确保result是布尔值
-        if result is None:
-            logger.warning(f"脚本返回值为None,转换为False")
-            result = False
-        elif not isinstance(result, bool):
-            original_result = result
-            result = bool(result)
-            logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
-        
-        # 记录结束时间和结果
-        end_time = datetime.now()
-        duration = (end_time - start_time).total_seconds()
-        logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+        # 导入和执行脚本模块
+        import importlib.util
+        import sys
         
-        return result
+        script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
+        
+        if not os.path.exists(script_path):
+            logger.error(f"脚本文件不存在: {script_path}")
+            return False
+            
+        # 动态导入模块
+        spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
+        module = importlib.util.module_from_spec(spec)
+        spec.loader.exec_module(module)
+        
+        # 检查并调用标准入口函数run
+        if hasattr(module, "run"):
+            logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
+            result = module.run(table_name=target_table, execution_mode=script_exec_mode)
+            logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
+            
+            # 确保result是布尔值
+            if result is None:
+                logger.warning(f"脚本返回值为None,转换为False")
+                result = False
+            elif not isinstance(result, bool):
+                original_result = result
+                result = bool(result)
+                logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+            
+            # 记录结束时间和结果
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+            
+            return result
+        else:
+            logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
+            return False
     except Exception as e:
         # 处理异常
         logger.error(f"执行任务出错: {str(e)}")
@@ -190,6 +208,8 @@ def execute_python_script(target_table, script_name, script_exec_mode, exec_date
         duration = (end_time - start_time).total_seconds()
         logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
         logger.info(f"===== 脚本执行异常结束 =====")
+        import traceback
+        logger.error(traceback.format_exc())
         
         # 确保不会阻塞DAG
         return False
@@ -413,11 +433,6 @@ def filter_invalid_tables(tables_info):
     
     return valid_tables
 
-def write_to_airflow_dag_schedule(exec_date, tables_info):
-    """将表信息写入airflow_dag_schedule表"""
-    logger.info(f"模拟写入 {len(tables_info)} 条记录到 airflow_dag_schedule 表 (已移除数据库操作)")
-    return len(tables_info)
-
 def prepare_dag_schedule(**kwargs):
     """准备DAG调度任务的主函数"""
     exec_date = kwargs.get('ds') or get_today_date()
@@ -448,14 +463,9 @@ def prepare_dag_schedule(**kwargs):
     valid_tables = filter_invalid_tables(enriched_tables)
     logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
     
-    # 5. 写入airflow_dag_schedule表
-    inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
-    
-    # 6. 检查插入操作是否成功,如果失败则抛出异常
-    if inserted_count == 0 and valid_tables:
-        error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
-        logger.error(error_msg)
-        raise Exception(error_msg)
+    # 已删除对 airflow_dag_schedule 表的写入操作
+    # 只记录准备了多少个表
+    logger.info(f"处理了 {len(valid_tables)} 个有效表")
     
     # 7. 生成执行计划数据
     resource_tasks = []
@@ -521,92 +531,53 @@ def prepare_dag_schedule(**kwargs):
     kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
     logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
     
-    return inserted_count
+    return len(valid_tables)
 
 def check_execution_plan_file(**kwargs):
     """
-    检查执行计划文件是否存在且有效
+    检查执行计划是否存在且有效
     返回False将阻止所有下游任务执行
     """
-    logger.info("检查执行计划文件是否存在且有效")
-    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-    ready_path = f"{plan_path}.ready"
+    logger.info("检查数据库中的执行计划是否存在且有效")
+    exec_date = kwargs.get('ds') or get_today_date()
     
-    # 检查文件是否存在
-    if not os.path.exists(plan_path):
-        logger.error(f"执行计划文件不存在: {plan_path}")
-        return False
+    # 从数据库获取执行计划
+    execution_plan = get_execution_plan_from_db(exec_date)
     
-    # 检查ready标记是否存在
-    if not os.path.exists(ready_path):
-        logger.error(f"执行计划ready标记文件不存在: {ready_path}")
+    # 检查是否成功获取到执行计划
+    if not execution_plan:
+        logger.error(f"未找到执行日期 {exec_date} 的执行计划")
         return False
     
-    # 检查文件是否可读且内容有效
-    try:
-        with open(plan_path, 'r') as f:
-            data = json.load(f)
-            
-            # 检查必要字段
-            if "exec_date" not in data:
-                logger.error("执行计划缺少exec_date字段")
-                return False
-                
-            if not isinstance(data.get("resource_tasks", []), list):
-                logger.error("执行计划的resource_tasks字段无效")
-                return False
-                
-            if not isinstance(data.get("model_tasks", []), list):
-                logger.error("执行计划的model_tasks字段无效")
-                return False
-            
-            # 检查是否有任务数据
-            resource_tasks = data.get("resource_tasks", [])
-            model_tasks = data.get("model_tasks", [])
-            if not resource_tasks and not model_tasks:
-                logger.warning("执行计划不包含任何任务,但文件格式有效")
-                # 注意:即使没有任务,我们仍然允许流程继续
-            
-            logger.info(f"执行计划文件验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
-            return True
-            
-    except json.JSONDecodeError as je:
-        logger.error(f"执行计划文件不是有效的JSON: {str(je)}")
+    # 检查执行计划是否包含必要字段
+    if "exec_date" not in execution_plan:
+        logger.error("执行计划缺少exec_date字段")
         return False
-    except Exception as e:
-        logger.error(f"检查执行计划文件时出错: {str(e)}")
+        
+    if not isinstance(execution_plan.get("resource_tasks", []), list):
+        logger.error("执行计划的resource_tasks字段无效")
+        return False
+        
+    if not isinstance(execution_plan.get("model_tasks", []), list):
+        logger.error("执行计划的model_tasks字段无效")
         return False
+    
+    # 检查是否有任务数据
+    resource_tasks = execution_plan.get("resource_tasks", [])
+    model_tasks = execution_plan.get("model_tasks", [])
+    
+    if not resource_tasks and not model_tasks:
+        logger.warning("执行计划不包含任何任务")
+        # 如果没有任务,则阻止下游任务执行
+        return False
+    
+    logger.info(f"执行计划验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
+    return True
 
 #############################################
 # 第二阶段: 数据处理阶段(Data Processing Phase)的函数
 #############################################
 
-def get_latest_date():
-    """获取数据库中包含记录的最近日期"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT DISTINCT exec_date
-            FROM airflow_dag_schedule 
-            ORDER BY exec_date DESC
-            LIMIT 1
-        """)
-        result = cursor.fetchone()
-        if result:
-            latest_date = result[0]
-            logger.info(f"找到最近的包含记录的日期: {latest_date}")
-            return latest_date
-        else:
-            logger.warning("未找到包含记录的日期,将使用当前日期")
-            return get_today_date()
-    except Exception as e:
-        logger.error(f"查找最近日期时出错: {str(e)}")
-        return get_today_date()
-    finally:
-        cursor.close()
-        conn.close()
-
 def get_all_tasks(exec_date):
     """
     获取所有需要执行的任务(DataResource和DataModel)
@@ -669,7 +640,7 @@ def create_execution_plan(**kwargs):
         # 如果找不到执行计划,则从数据库获取
         if not execution_plan:
             # 获取执行日期
-            exec_date = get_latest_date()
+            exec_date = kwargs.get('ds') or get_today_date()
             logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
             
             # 获取所有任务
@@ -897,14 +868,6 @@ def get_execution_stats(exec_date):
         logger.error(traceback.format_exc())
         return {}
 
-def update_missing_results(exec_date):
-    """
-    更新缺失的执行结果信息
-    此函数已不再操作数据库,仅返回0表示无需更新
-    """
-    logger.info(f"模拟更新缺失的执行结果信息 (已移除数据库操作)")
-    return 0
-
 def generate_execution_report(exec_date, stats):
     """生成执行报告"""
     # 构建报告
@@ -1265,7 +1228,6 @@ with DAG(
                         "script_name": script_name,
                         "script_exec_mode": exec_mode,
                         # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
-                        # 这样 execute_with_monitoring 函数才能正确更新数据库
                         "exec_date": str(exec_date)
                     },
                     retries=TASK_RETRY_CONFIG["retries"],
@@ -1333,7 +1295,6 @@ with DAG(
                         "script_name": script_name,
                         "script_exec_mode": exec_mode,
                         # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
-                        # 这样 execute_with_monitoring 函数才能正确更新数据库
                         "exec_date": str(exec_date)
                     },
                     retries=TASK_RETRY_CONFIG["retries"],