Explorar el Código

测试load_file,load_data修改没有写入create_time的bug.

wangxq hace 1 mes
padre
commit
e22e6dc35f

+ 2 - 3
dags/dataops_productline_prepare_dag.py

@@ -770,9 +770,8 @@ def check_execution_plan_in_db(**kwargs):
                 logger.info(f"数据库中已存在此次运行的执行计划")
                 return True
             else:
-                logger.info(f"数据库中不存在此次运行的执行计划")
-                
-                # 删除历史执行计划,只保留最近N条
+                logger.info(f"数据库中不存在此次运行的执行计划")                
+                # 删除历史执行计划,只保留最近N条。
                 if EXECUTION_PLAN_KEEP_COUNT > 0:
                     cursor.execute(f"""
                         WITH to_keep AS (

+ 1 - 1
dataops_scripts/execution_sql.py

@@ -117,7 +117,7 @@ def get_script_content(target_table, script_name):
         result = cursor.fetchone()
         
         if result is None:
-            logger.error(f"未找到目标表 '{target_table}' 和脚本名 '{script_name}' 对应的脚本")
+            logger.error(f"未找到目标表 '{target_table}' 和脚本名 '{script_name}' 对应的脚本.")
             return None, None
         
         # 获取脚本内容和目标日期列    

+ 46 - 10
dataops_scripts/load_data.py

@@ -168,7 +168,7 @@ def get_sqlalchemy_engine(db_info):
         db_type = db_info.get("db_type", "").lower()
         
         if db_type == "postgresql":
-            url = f"postgresql://{db_info['username']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}"
+            url = f"postgresql+psycopg2://{db_info['username']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}"
         elif db_type == "mysql":
             url = f"mysql+pymysql://{db_info['username']}:{db_info['password']}@{db_info['host']}:{db_info['port']}/{db_info['database']}"
         elif db_type == "oracle":
@@ -179,7 +179,7 @@ def get_sqlalchemy_engine(db_info):
             logger.error(f"不支持的数据库类型: {db_type}")
             return None
         
-        # 创建数据库引擎
+        # 创建数据库引擎,让SQLAlchemy处理数据库差异
         engine = create_engine(url)
         return engine
     except Exception as e:
@@ -218,13 +218,20 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
         # 目标表不存在,从源表获取表结构
         source_inspector = inspect(source_engine)
         
-        if not source_inspector.has_table(source_table):
+        # 处理表名中可能包含的schema信息
+        source_schema = None
+        if '.' in source_table:
+            source_schema, source_table = source_table.split('.', 1)
+        
+        if not source_inspector.has_table(source_table, schema=source_schema):
             error_msg = f"源表 {source_table} 不存在"
+            if source_schema:
+                error_msg = f"源表 {source_schema}.{source_table} 不存在"
             logger.error(error_msg)
             raise Exception(error_msg)
         
         # 获取源表的列信息
-        source_columns = source_inspector.get_columns(source_table)
+        source_columns = source_inspector.get_columns(source_table, schema=source_schema)
         
         if not source_columns:
             error_msg = f"源表 {source_table} 没有列信息"
@@ -234,17 +241,37 @@ def create_table_if_not_exists(source_engine, target_engine, source_table, targe
         # 创建元数据对象
         metadata = MetaData()
         
-        # 定义目标表结构
+        # 检查源表中是否已存在create_time和update_time字段
+        existing_column_names = [col['name'].lower() for col in source_columns]
+        has_create_time = 'create_time' in existing_column_names
+        has_update_time = 'update_time' in existing_column_names
+        
+        # 构建列定义列表
+        columns = [Column(col['name'], col['type']) for col in source_columns]
+        
+        # 如果不存在create_time字段,则添加
+        if not has_create_time:
+            from sqlalchemy import TIMESTAMP
+            columns.append(Column('create_time', TIMESTAMP, nullable=True))
+            logger.info(f"为表 {target_table} 添加 create_time 字段")
+        
+        # 如果不存在update_time字段,则添加
+        if not has_update_time:
+            from sqlalchemy import TIMESTAMP
+            columns.append(Column('update_time', TIMESTAMP, nullable=True))
+            logger.info(f"为表 {target_table} 添加 update_time 字段")
+        
+        # 定义目标表结构,让SQLAlchemy处理数据类型映射
         table_def = Table(
             target_table,
             metadata,
-            *[Column(col['name'], col['type']) for col in source_columns],
+            *columns,
             schema=schema
         )
         
         # 在目标数据库中创建表
         metadata.create_all(target_engine)
-        logger.info(f"成功在目标数据库中创建表 {target_table}")
+        logger.info(f"成功在目标数据库中创建表 {schema}.{target_table}")
         
         return True
     except Exception as e:
@@ -394,9 +421,18 @@ def load_data_from_source(table_name, exec_date=None, update_mode=None, script_n
                 logger.warning(f"查询结果为空,没有数据需要加载")
                 return True
             
-            # 添加create_time列(如果不存在)
-            if 'create_time' not in df.columns:
-                df['create_time'] = datetime.now()
+            # 获取当前时间戳
+            current_time = datetime.now()
+            
+            # 设置create_time列为当前时间 - 记录数据加载时间
+            df['create_time'] = current_time
+            logger.info(f"设置 create_time 字段为当前时间: {current_time}")
+            
+            # update_time字段保持为NULL,因为这是数据加载而非数据更新
+            # update_time只有在数据被修改时才应该被设置
+            if 'update_time' not in df.columns:
+                df['update_time'] = None  # 显式设置为NULL
+                logger.info(f"为数据添加 update_time 字段,初始值为: NULL (数据加载时不设置更新时间)")
             
             # 写入数据到目标表
             logger.info(f"开始写入数据到目标表 {table_name},共 {len(df)} 行")

+ 27 - 15
dataops_scripts/load_file.py

@@ -13,18 +13,15 @@ import re
 import argparse
 import psycopg2.extras
 
-from script_utils import get_config_param
-SCHEDULE_TABLE_SCHEMA = get_config_param("SCHEDULE_TABLE_SCHEMA")
-
 # 修改Python导入路径,确保能找到同目录下的script_utils模块
 current_dir = os.path.dirname(os.path.abspath(__file__))
 if current_dir not in sys.path:
     sys.path.insert(0, current_dir)
-
 # 先导入整个模块,确保script_utils对象在全局作用域可用
 import script_utils
 # 再导入具体的方法
-from script_utils import get_pg_config, get_upload_paths, logger as utils_logger
+from script_utils import get_pg_config, get_upload_paths, get_config_param, logger as utils_logger
+SCHEDULE_TABLE_SCHEMA = get_config_param("SCHEDULE_TABLE_SCHEMA")
 
 # 配置日志记录器
 logging.basicConfig(
@@ -46,16 +43,7 @@ try:
     logger.info(f"通过script_utils获取配置成功: 上传路径={STRUCTURE_UPLOAD_BASE_PATH}, 归档路径={STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
 except Exception as e:
     logger.error(f"获取配置失败,使用默认值: {str(e)}")
-    # 默认配置
-    PG_CONFIG = {
-        "host": "localhost",
-        "port": 5432,
-        "user": "postgres",
-        "password": "postgres",
-        "database": "dataops",
-    }
-    STRUCTURE_UPLOAD_BASE_PATH = '/tmp/uploads'
-    STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH = '/tmp/uploads/archive'
+    raise
 
 def get_pg_conn():
     """获取PostgreSQL连接"""
@@ -262,6 +250,30 @@ def load_dataframe_to_table(df, file_path, table_name):
         # 将空字符串替换为None
         for col in df_mapped.columns:
             df_mapped[col] = df_mapped[col].map(lambda x: None if isinstance(x, str) and x == '' else x)
+        
+        # 记录处理前的行数
+        original_row_count = len(df_mapped)
+        
+        # 丢弃全部为空值的行
+        df_mapped = df_mapped.dropna(how='all')
+        final_row_count = len(df_mapped)
+        
+        # 计算删除的空行数量
+        empty_rows_count = original_row_count - final_row_count
+        
+        if empty_rows_count > 0:
+            logger.warning(f"发现并删除了 {empty_rows_count} 行全空行")
+            logger.info(f"丢弃全空行后剩余数据行数: {final_row_count}")
+        else:
+            logger.info(f"数据行数: {final_row_count}")
+
+        # 检查目标表是否有create_time字段,如果有则添加当前时间
+        if 'create_time' in table_columns:
+            current_time = datetime.now()
+            df_mapped['create_time'] = current_time
+            logger.info(f"目标表有 create_time 字段,设置值为: {current_time}")
+        else:
+            logger.warning(f"目标表 '{table_name}' 没有 create_time 字段,跳过添加时间戳")
 
         # 连接数据库
         conn = get_pg_conn()

+ 4 - 12
dataops_scripts/script_utils.py

@@ -144,14 +144,6 @@ def get_pg_config():
         
         if pg_config is None:
             logger.warning("配置模块中未找到PG_CONFIG")
-            # 返回默认配置
-            return {
-                "host": "localhost",
-                "port": 5432,
-                "user": "postgres",
-                "password": "postgres",
-                "database": "dataops"
-            }
         
         logger.info(f"已获取PostgreSQL配置: {pg_config}")
         return pg_config
@@ -175,15 +167,15 @@ def get_upload_paths():
     """
     try:
         config_module = load_config_module()
-        upload_path = getattr(config_module, "STRUCTURE_UPLOAD_BASE_PATH", "/data/csv")
-        archive_path = getattr(config_module, "STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH", "/data/archive")
+        upload_path = getattr(config_module, "STRUCTURE_UPLOAD_BASE_PATH")
+        archive_path = getattr(config_module, "STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH")
         
-        logger.info(f"获取上传路径: {upload_path}, 归档路径: {archive_path}")
+        logger.info(f"获取上传路径: {upload_path}, 归档路径: {archive_path}")
         return upload_path, archive_path
     except Exception as e:
         logger.error(f"获取上传路径时出错: {str(e)}")
         # 返回默认路径
-        return "/data/csv", "/data/archive"
+        return "/data/upload", "/data/archive"
 
 def get_date_range(exec_date, frequency):
     """

+ 1 - 0
requirements.txt

@@ -1,5 +1,6 @@
 apache-airflow==2.10.5
 psycopg2-binary>=2.9.10
+PyMySQL>=1.1.1
 neo4j>=5.19.0
 pendulum>=3.0.0
 networkx>=3.4.2