Просмотр исходного кода

解决load_data.py多个python不兼容的问题

wangxq 1 неделя назад
Родитель
Сommit
9351e05714
2 измененных файлов с 88 добавлено и 56 удалено
  1. 85 55
      dataops_scripts/load_data.py
  2. 3 1
      requirements.txt

+ 85 - 55
dataops_scripts/load_data.py

@@ -277,115 +277,145 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
         logger.info(f"调度频率为空,使用默认值: {schedule_frequency}")
 
     try:
+        # 获取源数据库和目标数据库信息
         source_db_info = get_source_database_info(table_name, script_name)
         target_db_info = get_target_database_info()
 
+        # 创建数据库引擎
         source_engine = get_sqlalchemy_engine(source_db_info)
         target_engine = get_sqlalchemy_engine(target_db_info)
 
         if not source_engine or not target_engine:
             raise Exception("无法创建数据库引擎,无法加载数据")
 
+        # 获取源表名
         source_table = source_db_info.get("source_table", table_name) or table_name
 
+        # 确保目标表存在
         if not create_table_if_not_exists(source_engine, target_engine, source_table, table_name):
             raise Exception(f"无法创建目标表 {table_name},无法加载数据")
 
+        # 根据更新模式处理数据
         if update_mode == "full_refresh":
+            # 执行全量刷新,清空表
             logger.info(f"执行全量刷新,清空表 {table_name}")
-            with target_engine.connect() as conn:
-                conn.execute(text(f"TRUNCATE TABLE {table_name}"))
+            with target_engine.begin() as conn:  # 使用begin()自动管理事务
+                conn.execute(f"TRUNCATE TABLE {table_name}")
             logger.info(f"成功清空表 {table_name}")
+
+            # 构建全量查询
+            query = f"SELECT * FROM {source_table}"
         else:
+            # 增量更新,需要获取目标日期列和日期范围
             target_dt_column = get_target_dt_column(table_name, script_name)
             if not target_dt_column:
                 logger.error(f"无法获取表 {table_name} 的目标日期列,无法执行增量加载")
                 return False
 
             try:
+                # 根据是否手动DAG触发决定日期范围
                 if is_manual_dag_trigger:
+                    # 手动触发
                     start_date, end_date = get_date_range(exec_date, schedule_frequency)
                     logger.info(f"手动DAG触发,日期范围: {start_date} 到 {end_date}")
+                    
+                    # 执行删除操作
                     delete_sql = f"""
                         DELETE FROM {table_name}
                         WHERE {target_dt_column} >= '{start_date}'
                         AND {target_dt_column} < '{end_date}'
                     """
-                    with target_engine.connect() as conn:
-                        conn.execute(text(delete_sql))
-                        logger.info(f"成功删除表 {table_name} 中 {target_dt_column} 从 {start_date} 到 {end_date} 的数据")
+                    with target_engine.begin() as conn:  # 使用begin()自动管理事务
+                        conn.execute(delete_sql)
+                    logger.info(f"成功删除表 {table_name} 中 {target_dt_column} 从 {start_date} 到 {end_date} 的数据")
                 else:
+                    # 自动调度
                     start_datetime, end_datetime = get_one_day_range(exec_date)
                     start_date = start_datetime.strftime('%Y-%m-%d %H:%M:%S')
                     end_date = end_datetime.strftime('%Y-%m-%d %H:%M:%S')
                     logger.info(f"自动调度,日期范围: {start_date} 到 {end_date}")
+                    
+                    # 执行删除操作
                     delete_sql = f"""
                         DELETE FROM {table_name}
                         WHERE create_time >= '{start_date}'
                         AND create_time < '{end_date}'
                     """
                     try:
-                        with target_engine.connect() as conn:
-                            conn.execute(text(delete_sql))
-                            logger.info(f"成功删除表 {table_name} 中 create_time 从 {start_date} 到 {end_date} 的数据")
+                        with target_engine.begin() as conn:  # 使用begin()自动管理事务
+                            conn.execute(delete_sql)
+                        logger.info(f"成功删除表 {table_name} 中 create_time 从 {start_date} 到 {end_date} 的数据")
                     except Exception as del_err:
                         logger.error(f"删除数据时出错: {str(del_err)}")
                         logger.warning("继续执行数据加载")
+                
+                # 构建增量查询
+                if not is_manual_dag_trigger:
+                    # 对于自动调度,重新计算日期范围用于查询
+                    start_date, end_date = get_date_range(exec_date, schedule_frequency)
+                
+                # 检查源表是否含有目标日期列
+                source_inspector = inspect(source_engine)
+                source_columns = [col['name'].lower() for col in source_inspector.get_columns(source_table)]
+                
+                if target_dt_column.lower() in source_columns:
+                    # 源表含有目标日期列,构建包含日期条件的查询
+                    query = f"""
+                        SELECT * FROM {source_table}
+                        WHERE {target_dt_column} >= '{start_date}'
+                        AND {target_dt_column} < '{end_date}'
+                    """
+                else:
+                    # 源表不含目标日期列,构建全量查询
+                    logger.warning(f"源表 {source_table} 没有目标日期列 {target_dt_column},将加载全部数据")
+                    query = f"SELECT * FROM {source_table}"
+                
             except Exception as date_err:
                 logger.error(f"计算日期范围时出错: {str(date_err)}")
                 logger.error(traceback.format_exc())
                 return False
 
-        # 第6步:加载数据(这是你最关键的修复点)
-        logger.info(f"执行查询构建")
-
-        if update_mode == "full_refresh":
-            query = f"SELECT * FROM {source_table}"
-        else:
-            start_date, end_date = get_date_range(exec_date, schedule_frequency)
-            source_inspector = inspect(source_engine)
-            source_columns = [col['name'].lower() for col in source_inspector.get_columns(source_table)]
-            if target_dt_column.lower() in source_columns:
-                query = f"""
-                    SELECT * FROM {source_table}
-                    WHERE {target_dt_column} >= '{start_date}'
-                    AND {target_dt_column} < '{end_date}'
-                """
-            else:
-                logger.warning(f"源表 {source_table} 没有目标日期列 {target_dt_column},将加载全部数据")
-                query = f"SELECT * FROM {source_table}"
-
+        # 执行查询加载数据
         logger.info(f"执行查询: {query}")
-        # with source_engine.connect() as conn:
-        #     df = pd.read_sql(query, conn)
-        #df = pd.read_sql(query, source_engine)
-        from sqlalchemy.orm import sessionmaker
-
-        Session = sessionmaker(bind=source_engine)
-        session = Session()
-
+        
         try:
-            df = pd.read_sql(query, session.connection())
-        finally:
-            session.close()
-
-        if df.empty:
-            logger.warning(f"查询结果为空,没有数据需要加载")
+            # 直接使用SQLAlchemy执行查询,然后手动创建DataFrame
+            rows = []
+            column_names = []
+            with source_engine.connect() as connection:
+                result_proxy = connection.execute(text(query))
+                rows = result_proxy.fetchall()
+                column_names = result_proxy.keys()
+            
+            df = pd.DataFrame(rows, columns=column_names)
+            
+            # 检查结果是否为空
+            if df.empty:
+                logger.warning(f"查询结果为空,没有数据需要加载")
+                return True
+            
+            # 添加create_time列(如果不存在)
+            if 'create_time' not in df.columns:
+                df['create_time'] = datetime.now()
+            
+            # 写入数据到目标表
+            logger.info(f"开始写入数据到目标表 {table_name},共 {len(df)} 行")
+            with target_engine.connect() as connection:
+                df.to_sql(
+                    name=table_name,
+                    con=connection,
+                    if_exists='append',
+                    index=False,
+                    schema=target_db_info.get("schema", "public")
+                )
+            
+            logger.info(f"成功写入数据到目标表 {table_name}")
             return True
-
-        if 'create_time' not in df.columns:
-            df['create_time'] = datetime.now()
-
-        logger.info(f"开始写入数据到目标表 {table_name},共 {len(df)} 行")
-        df.to_sql(
-            name=table_name,
-            con=target_engine,
-            if_exists='append',
-            index=False,
-            schema=target_db_info.get("schema", "public")
-        )
-        logger.info(f"成功写入数据到目标表 {table_name}")
-        return True
+            
+        except Exception as query_err:
+            logger.error(f"执行查询或写入数据时出错: {str(query_err)}")
+            logger.error(traceback.format_exc())
+            raise Exception(f"数据查询或写入失败: {str(query_err)}")
 
     except Exception as e:
         logger.error(f"执行数据加载过程时出错: {str(e)}")

+ 3 - 1
requirements.txt

@@ -8,4 +8,6 @@ xlrd>=2.0.1
 openpyxl>=3.1.5
 apache-airflow-providers-postgres>=6.1.3
 apache-airflow-providers-common-sql>=1.26.0
-
+numpy==1.26.4
+SQLAlchemy==1.4.54
+pandas==2.1.1