Ver Fonte

load_data.py遇到一个错误

wangxq há 1 semana atrás
pai
commit
c8a6a0cfb9

+ 34 - 7
dags/dataops_productline_execute_dag.py

@@ -118,6 +118,9 @@ def execute_python_script(script_id, script_name, target_table, update_mode, sch
     
     返回:
         bool: 脚本执行结果
+        
+    异常:
+        Exception: 当脚本执行失败时抛出异常,确保任务在Airflow中标记为失败状态
     """
     # 获取执行日期
     logical_date = kwargs.get('logical_date', datetime.now())
@@ -190,6 +193,12 @@ def execute_python_script(script_id, script_name, target_table, update_mode, sch
                 result = bool(result)
                 logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
             
+            # 检查结果,如果为False则抛出异常
+            if not result:
+                error_msg = f"脚本 {script_name} 执行失败,返回值为False"
+                logger.error(error_msg)
+                raise Exception(error_msg)
+            
             # 记录结束时间和结果
             end_time = datetime.now()
             duration = (end_time - start_time).total_seconds()
@@ -209,8 +218,8 @@ def execute_python_script(script_id, script_name, target_table, update_mode, sch
         import traceback
         logger.error(traceback.format_exc())
         
-        # 确保不会阻塞DAG
-        return False
+        # 重新抛出异常,确保任务在Airflow中标记为失败状态
+        raise
 
 # 使用execute_sql函数代替之前的execute_sql_script
 def execute_sql(script_id, script_name, target_table, update_mode, schedule_frequency, **kwargs):
@@ -227,6 +236,9 @@ def execute_sql(script_id, script_name, target_table, update_mode, schedule_freq
     
     返回:
         bool: 脚本执行结果
+        
+    异常:
+        Exception: 当脚本执行失败时抛出异常,确保任务在Airflow中标记为失败状态
     """
     # 获取执行日期
     logical_date = kwargs.get('logical_date', datetime.now())
@@ -305,6 +317,12 @@ def execute_sql(script_id, script_name, target_table, update_mode, schedule_freq
                 original_result = result
                 result = bool(result)
                 logger.warning(f"SQL脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+            
+            # 检查结果,如果为False则抛出异常
+            if not result:
+                error_msg = f"SQL脚本 {script_name} 执行失败,返回值为False"
+                logger.error(error_msg)
+                raise Exception(error_msg)
 
             # 记录结束时间和结果
             end_time = datetime.now()
@@ -326,8 +344,8 @@ def execute_sql(script_id, script_name, target_table, update_mode, schedule_freq
         import traceback
         logger.error(traceback.format_exc())
         
-        # 确保不会阻塞DAG
-        return False
+        # 重新抛出异常,确保任务在Airflow中标记为失败状态
+        raise
 
 # 使用execute_python函数代替之前的execute_python_script
 def execute_python(script_id, script_name, target_table, update_mode, schedule_frequency, **kwargs):
@@ -344,6 +362,9 @@ def execute_python(script_id, script_name, target_table, update_mode, schedule_f
     
     返回:
         bool: 脚本执行结果
+        
+    异常:
+        Exception: 当脚本执行失败时抛出异常,确保任务在Airflow中标记为失败状态
     """
     # 获取执行日期
     logical_date = kwargs.get('logical_date', datetime.now())
@@ -422,6 +443,12 @@ def execute_python(script_id, script_name, target_table, update_mode, schedule_f
                 original_result = result
                 result = bool(result)
                 logger.warning(f"Python脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+            
+            # 检查结果,如果为False则抛出异常
+            if not result:
+                error_msg = f"Python脚本 {script_name} 执行失败,返回值为False"
+                logger.error(error_msg)
+                raise Exception(error_msg)
 
             # 记录结束时间和结果
             end_time = datetime.now()
@@ -443,8 +470,8 @@ def execute_python(script_id, script_name, target_table, update_mode, schedule_f
         import traceback
         logger.error(traceback.format_exc())
         
-        # 确保不会阻塞DAG
-        return False
+        # 重新抛出异常,确保任务在Airflow中标记为失败状态
+        raise
 
 #############################################
 # 执行计划获取和处理函数
@@ -979,7 +1006,7 @@ def create_execution_plan(**kwargs):
         # 如果执行计划中没有execution_order或为空,使用NetworkX优化
         if not execution_order:
             logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
-            execution_order = optimize_script_execution_order(scripts, script_dependencies)
+            execution_order = optimize_script_execution_order(scripts, script_dependencies, nx.DiGraph())
             execution_plan["execution_order"] = execution_order
         
         # 保存完整的执行计划到XCom

+ 2 - 1
dags/dataops_productline_prepare_dag.py

@@ -1184,7 +1184,8 @@ def prepare_productline_dag_schedule(**kwargs):
         logger.info(f"开始准备执行日期 {exec_date} 的创建执行计划的调度任务")
         
         # 检查是否需要创建新的执行计划
-        need_create_plan = False
+        # 出于测试目的,直接设置为True
+        need_create_plan = True
         
         # 条件1: 数据库中不存在当天的执行计划
         try:

+ 90 - 129
dataops_scripts/load_data.py

@@ -45,6 +45,9 @@ def get_source_database_info(table_name, script_name=None):
         
     返回:
         dict: 数据库连接信息字典
+        
+    异常:
+        Exception: 当无法获取数据库连接信息时抛出异常
     """
     logger.info(f"获取表 {table_name} 的源数据库连接信息")
     
@@ -53,7 +56,7 @@ def get_source_database_info(table_name, script_name=None):
         with driver.session() as session:
             # 首先查询表对应的源节点
             query = """
-                MATCH (target {en_name: $table_name})-[rel]->(source)
+                MATCH (target {en_name: $table_name})-[rel]->(source:DataSource)
                 RETURN source.en_name AS source_name, 
                        source.database AS database,
                        source.host AS host,
@@ -70,8 +73,9 @@ def get_source_database_info(table_name, script_name=None):
             record = result.single()
             
             if not record:
-                logger.warning(f"未找到表 {table_name} 对应的源节点")
-                return None
+                error_msg = f"未找到表 {table_name} 对应的源节点"
+                logger.error(error_msg)
+                raise Exception(error_msg)
             
             # 获取源节点的数据库连接信息
             database_info = {
@@ -89,15 +93,16 @@ def get_source_database_info(table_name, script_name=None):
             
             # 检查是否包含数据库连接信息
             if not database_info.get("database"):
-                logger.warning(f"源节点 {database_info['source_name']} 没有数据库连接信息")
-                return None
+                error_msg = f"源节点 {database_info['source_name']} 没有数据库连接信息"
+                logger.error(error_msg)
+                raise Exception(error_msg)
             
             logger.info(f"成功获取表 {table_name} 的源数据库连接信息: {database_info['host']}:{database_info['port']}/{database_info['database']}")
             return database_info
     except Exception as e:
         logger.error(f"获取源数据库连接信息时出错: {str(e)}")
         logger.error(traceback.format_exc())
-        return None
+        raise Exception(f"获取源数据库连接信息失败: {str(e)}")
     finally:
         driver.close()
 
@@ -195,6 +200,9 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
         
     返回:
         bool: 操作是否成功
+        
+    异常:
+        Exception: 当源表不存在或无法创建目标表时抛出异常
     """
     logger.info(f"检查目标表 {target_table} 是否存在,不存在则创建")
     
@@ -211,15 +219,17 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
         source_inspector = inspect(source_engine)
         
         if not source_inspector.has_table(source_table):
-            logger.error(f"源表 {source_table} 不存在")
-            return False
+            error_msg = f"源表 {source_table} 不存在"
+            logger.error(error_msg)
+            raise Exception(error_msg)
         
         # 获取源表的列信息
         source_columns = source_inspector.get_columns(source_table)
         
         if not source_columns:
-            logger.error(f"源表 {source_table} 没有列信息")
-            return False
+            error_msg = f"源表 {source_table} 没有列信息"
+            logger.error(error_msg)
+            raise Exception(error_msg)
         
         # 创建元数据对象
         metadata = MetaData()
@@ -240,29 +250,14 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
     except Exception as e:
         logger.error(f"创建表时出错: {str(e)}")
         logger.error(traceback.format_exc())
-        return False
+        raise Exception(f"创建表失败: {str(e)}")
 
 def load_data_from_source(table_name, exec_date=None, update_mode=None, script_name=None, 
                           schedule_frequency=None, is_manual_dag_trigger=False, **kwargs):
     """
     从源数据库加载数据到目标数据库
-    
-    参数:
-        table_name (str): 表名
-        exec_date: 执行日期
-        update_mode (str): 更新模式,append或full_refresh
-        script_name (str): 脚本名称
-        schedule_frequency (str): 调度频率,daily、weekly、monthly等
-        is_manual_dag_trigger (bool): 是否手动DAG触发
-        **kwargs: 其他参数
-    
-    返回:
-        bool: 操作是否成功
     """
-    # 记录开始时间
     start_time = datetime.now()
-    
-    # 使用logger.info输出所有参数
     logger.info(f"===== 开始从源加载数据 =====")
     logger.info(f"表名: {table_name}")
     logger.info(f"执行日期: {exec_date}")
@@ -270,95 +265,65 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
     logger.info(f"脚本名称: {script_name}")
     logger.info(f"调度频率: {schedule_frequency}")
     logger.info(f"是否手动DAG触发: {is_manual_dag_trigger}")
-    
-    # 记录其他参数
     for key, value in kwargs.items():
         logger.info(f"其他参数 - {key}: {value}")
-    
-    # 如果exec_date为None,使用当前日期
+
     if exec_date is None:
         exec_date = datetime.now().strftime('%Y-%m-%d')
         logger.info(f"执行日期为空,使用当前日期: {exec_date}")
-    
-    # 如果schedule_frequency为None,默认使用daily
+
     if schedule_frequency is None:
         schedule_frequency = "daily"
         logger.info(f"调度频率为空,使用默认值: {schedule_frequency}")
-    
+
     try:
-        # 1. 获取源数据库连接信息
         source_db_info = get_source_database_info(table_name, script_name)
-        
-        if not source_db_info:
-            logger.error(f"无法获取表 {table_name} 的源数据库连接信息,无法加载数据")
-            return False
-        
-        # 2. 获取目标数据库连接信息
         target_db_info = get_target_database_info()
-        
-        # 3. 创建数据库引擎
+
         source_engine = get_sqlalchemy_engine(source_db_info)
         target_engine = get_sqlalchemy_engine(target_db_info)
-        
+
         if not source_engine or not target_engine:
-            logger.error("无法创建数据库引擎,无法加载数据")
-            return False
-        
-        # 获取源表名
-        source_table = source_db_info.get("source_table", table_name)
-        
-        # 4. 检查目标表是否存在,不存在则创建
+            raise Exception("无法创建数据库引擎,无法加载数据")
+
+        source_table = source_db_info.get("source_table", table_name) or table_name
+
         if not create_table_if_not_exists(source_engine, target_engine, source_table, table_name):
-            logger.error(f"无法创建目标表 {table_name},无法加载数据")
-            return False
-        
-        # 5. 根据update_mode处理数据
+            raise Exception(f"无法创建目标表 {table_name},无法加载数据")
+
         if update_mode == "full_refresh":
-            # 清空目标表
             logger.info(f"执行全量刷新,清空表 {table_name}")
             with target_engine.connect() as conn:
-                truncate_sql = f"TRUNCATE TABLE {table_name}"
-                conn.execute(text(truncate_sql))
+                conn.execute(text(f"TRUNCATE TABLE {table_name}"))
             logger.info(f"成功清空表 {table_name}")
-        else:  # append 模式
-            # 获取目标日期列
+        else:
             target_dt_column = get_target_dt_column(table_name, script_name)
-            
             if not target_dt_column:
                 logger.error(f"无法获取表 {table_name} 的目标日期列,无法执行增量加载")
                 return False
-            
-            # 计算日期范围
+
             try:
                 if is_manual_dag_trigger:
-                    # 手动DAG触发,使用target_dt_column进行数据清理
                     start_date, end_date = get_date_range(exec_date, schedule_frequency)
                     logger.info(f"手动DAG触发,日期范围: {start_date} 到 {end_date}")
-                    
-                    # 删除目标日期范围内的数据
                     delete_sql = f"""
                         DELETE FROM {table_name}
                         WHERE {target_dt_column} >= '{start_date}'
                         AND {target_dt_column} < '{end_date}'
                     """
-                    
                     with target_engine.connect() as conn:
                         conn.execute(text(delete_sql))
                         logger.info(f"成功删除表 {table_name} 中 {target_dt_column} 从 {start_date} 到 {end_date} 的数据")
                 else:
-                    # 自动调度,使用create_time进行数据清理
                     start_datetime, end_datetime = get_one_day_range(exec_date)
                     start_date = start_datetime.strftime('%Y-%m-%d %H:%M:%S')
                     end_date = end_datetime.strftime('%Y-%m-%d %H:%M:%S')
                     logger.info(f"自动调度,日期范围: {start_date} 到 {end_date}")
-                    
-                    # 删除create_time在当天的数据
                     delete_sql = f"""
                         DELETE FROM {table_name}
                         WHERE create_time >= '{start_date}'
                         AND create_time < '{end_date}'
                     """
-                    
                     try:
                         with target_engine.connect() as conn:
                             conn.execute(text(delete_sql))
@@ -370,74 +335,70 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
                 logger.error(f"计算日期范围时出错: {str(date_err)}")
                 logger.error(traceback.format_exc())
                 return False
-        
-        # 6. 加载数据
-        try:
-            # 构建查询条件
-            if update_mode == "full_refresh":
-                # 全量加载,不需要条件
-                query = f"SELECT * FROM {source_table}"
+
+        # 第6步:加载数据(这是你最关键的修复点)
+        logger.info(f"执行查询构建")
+
+        if update_mode == "full_refresh":
+            query = f"SELECT * FROM {source_table}"
+        else:
+            start_date, end_date = get_date_range(exec_date, schedule_frequency)
+            source_inspector = inspect(source_engine)
+            source_columns = [col['name'].lower() for col in source_inspector.get_columns(source_table)]
+            if target_dt_column.lower() in source_columns:
+                query = f"""
+                    SELECT * FROM {source_table}
+                    WHERE {target_dt_column} >= '{start_date}'
+                    AND {target_dt_column} < '{end_date}'
+                """
             else:
-                # 增量加载,使用目标日期列过滤
-                # 获取日期范围
-                start_date, end_date = get_date_range(exec_date, schedule_frequency)
-                
-                # 检查源表是否有目标日期列
-                source_inspector = inspect(source_engine)
-                source_columns = [col['name'].lower() for col in source_inspector.get_columns(source_table)]
-                
-                if target_dt_column.lower() in source_columns:
-                    # 源表有目标日期列,使用它过滤数据
-                    query = f"""
-                        SELECT * FROM {source_table}
-                        WHERE {target_dt_column} >= '{start_date}'
-                        AND {target_dt_column} < '{end_date}'
-                    """
-                else:
-                    # 源表没有目标日期列,加载全部数据
-                    logger.warning(f"源表 {source_table} 没有目标日期列 {target_dt_column},将加载全部数据")
-                    query = f"SELECT * FROM {source_table}"
-            
-            # 读取源数据
-            logger.info(f"执行查询: {query}")
-            df = pd.read_sql(query, source_engine)
-            
-            # 检查数据是否为空
-            if df.empty:
-                logger.warning(f"查询结果为空,没有数据需要加载")
-                return True
-            
-            # 添加create_time列(如果没有)
-            if 'create_time' not in df.columns:
-                df['create_time'] = datetime.now()
-            
-            # 写入目标数据库
-            logger.info(f"开始写入数据到目标表 {table_name},共 {len(df)} 行")
-            df.to_sql(
-                name=table_name,
-                con=target_engine,
-                if_exists='append',
-                index=False,
-                schema=target_db_info.get("schema", "public")
-            )
-            
-            logger.info(f"成功写入数据到目标表 {table_name}")
+                logger.warning(f"源表 {source_table} 没有目标日期列 {target_dt_column},将加载全部数据")
+                query = f"SELECT * FROM {source_table}"
+
+        logger.info(f"执行查询: {query}")
+        # with source_engine.connect() as conn:
+        #     df = pd.read_sql(query, conn)
+        #df = pd.read_sql(query, source_engine)
+        from sqlalchemy.orm import sessionmaker
+
+        Session = sessionmaker(bind=source_engine)
+        session = Session()
+
+        try:
+            df = pd.read_sql(query, session.connection())
+        finally:
+            session.close()
+
+        if df.empty:
+            logger.warning(f"查询结果为空,没有数据需要加载")
             return True
-        except Exception as load_err:
-            logger.error(f"加载数据时出错: {str(load_err)}")
-            logger.error(traceback.format_exc())
-            return False
+
+        if 'create_time' not in df.columns:
+            df['create_time'] = datetime.now()
+
+        logger.info(f"开始写入数据到目标表 {table_name},共 {len(df)} 行")
+        df.to_sql(
+            name=table_name,
+            con=target_engine,
+            if_exists='append',
+            index=False,
+            schema=target_db_info.get("schema", "public")
+        )
+        logger.info(f"成功写入数据到目标表 {table_name}")
+        return True
+
     except Exception as e:
         logger.error(f"执行数据加载过程时出错: {str(e)}")
         logger.error(traceback.format_exc())
-        return False
+        raise Exception(f"数据加载失败: {str(e)}")
+
     finally:
-        # 记录结束时间和耗时
         end_time = datetime.now()
         duration = (end_time - start_time).total_seconds()
         logger.info(f"数据加载过程结束,耗时: {int(duration // 60)}分钟 {int(duration % 60)}秒")
         logger.info(f"===== 数据加载结束 =====")
 
+
 def run(table_name, update_mode, schedule_frequency=None, script_name=None, exec_date=None, is_manual_dag_trigger=False, **kwargs):
     """
     统一入口函数,符合Airflow动态脚本调用规范
@@ -452,7 +413,7 @@ def run(table_name, update_mode, schedule_frequency=None, script_name=None, exec
         **kwargs: 其他可能的参数
     
     返回:
-        bool: 执行成功返回True,否则返回False
+        bool: 执行成功返回True,否则抛出异常
     """
     logger.info(f"开始执行脚本...")
     
@@ -474,7 +435,7 @@ def run(table_name, update_mode, schedule_frequency=None, script_name=None, exec
         logger.info(f"额外参数 - {key}: {value}")
     logger.info(f"========================")
     
-    # 实际调用内部处理函数
+    # 实际调用内部处理函数,不再捕获异常,让异常直接传递给上层调用者
     return load_data_from_source(
         table_name=table_name, 
         exec_date=exec_date,