Selaa lähdekoodia

load_file.py测试通过,准备修改多节点依赖。

wangxq 3 viikkoa sitten
vanhempi
commit
c50bbf0d0d

+ 4 - 0
dags/config.py

@@ -34,6 +34,10 @@ TASK_RETRY_CONFIG = {
 AIRFLOW_BASE_PATH='/opt/airflow'
 SCRIPTS_BASE_PATH = "/opt/airflow/dataops/scripts"
 
+# 上传的CSV/EXCEL文件的基准上传路径
+STRUCTURE_UPLOAD_BASE_PATH ="/data/csv"
+STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH ="/data/archive"
+
 # 本地开发环境脚本路径(如果需要区分环境)
 # LOCAL_SCRIPTS_BASE_PATH = "/path/to/local/scripts"
 

+ 1 - 1
dags/dag_dataops_pipeline_data_scheduler.py

@@ -171,7 +171,7 @@ def execute_python_script(target_table, script_name, script_exec_mode, exec_date
         # 导入和执行脚本模块
         import importlib.util
         import sys
-        
+        # SCRIPTS_BASE_PATH = "/opt/airflow/dataops/scripts"
         script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
         
         if not os.path.exists(script_path):

+ 0 - 51
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -259,57 +259,6 @@ def process_dependencies(tables_info):
     
     return list(all_tables.values())
 
-def process_resource(target_table, script_name, script_exec_mode, exec_date, **kwargs):
-    """处理单个资源表"""
-    task_id = f"resource_{target_table}"
-    logger.info(f"===== 开始执行 {task_id} =====")
-    logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
-    
-    # 确保exec_date是字符串
-    if not isinstance(exec_date, str):
-        exec_date = str(exec_date)
-        logger.info(f"将exec_date转换为字符串: {exec_date}")
-    
-    # 获取额外参数
-    target_type = kwargs.get('target_type')
-    storage_location = kwargs.get('storage_location')
-    frequency = kwargs.get('frequency')
-    
-    try:
-        # 使用新的函数执行脚本,传递相应参数
-        logger.info(f"调用execute_python_script: target_table={target_table}, script_name={script_name}")
-        
-        # 构建参数字典
-        script_params = {
-            "target_table": target_table,
-            "script_name": script_name,
-            "script_exec_mode": script_exec_mode,
-            "exec_date": exec_date
-        }
-        
-        # 添加特殊参数(如果有)
-        if target_type == "structure":
-            logger.info(f"处理structure类型的资源表,文件路径: {storage_location}")
-            script_params["target_type"] = target_type
-            script_params["storage_location"] = storage_location
-        
-        if frequency:
-            script_params["frequency"] = frequency
-        
-        # 执行脚本
-        result = execute_python_script(**script_params)
-        logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
-        return result
-    except Exception as e:
-        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        logger.info(f"===== 结束执行 {task_id} (失败) =====")
-        return False
-    finally:
-        logger.info(f"===== 结束执行 {task_id} =====")
-        
-
 def filter_invalid_tables(tables_info):
     """过滤无效表及其依赖,使用NetworkX构建依赖图"""
     # 构建表名到索引的映射

+ 471 - 97
dataops/scripts/load_file.py

@@ -3,7 +3,12 @@
 import logging
 import sys
 import os
+import pandas as pd
+import psycopg2
 from datetime import datetime
+import csv
+import glob
+import shutil
 
 # 配置日志记录器
 logging.basicConfig(
@@ -14,121 +19,490 @@ logging.basicConfig(
     ]
 )
 
-logger = logging.getLogger("load_file_mock") # 使用 mock 后缀以区分
+logger = logging.getLogger("load_file")
 
-def mock_load_file(table_name=None, execution_mode='append', exec_date=None, 
-                   target_type=None, storage_location=None, frequency=None, script_name=None, **kwargs):
-    """模拟加载文件数据,仅打印参数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
+# 添加健壮的导入机制
+def get_config():
+    """
+    从config模块导入配置
+    
+    返回:
+        tuple: (PG_CONFIG, STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH)
+    """
+    # 默认配置
+    default_pg_config = {
+        "host": "localhost",
+        "port": 5432,
+        "user": "postgres",
+        "password": "postgres",
+        "database": "dataops",
+    }
+    default_upload_path = '/tmp/uploads'
+    default_archive_path = '/tmp/uploads/archive'
+    
+    try:
+        # 动态导入,避免IDE警告
+        config = __import__('config')
+        logger.info("从config模块直接导入配置")
+        pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
+        upload_path = getattr(config, 'STRUCTURE_UPLOAD_BASE_PATH', default_upload_path)
+        archive_path = getattr(config, 'STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH', default_archive_path)
+        return pg_config, upload_path, archive_path
+    except ImportError:
+        # 使用默认配置
+        logger.warning("无法导入config模块,使用默认值")
+        return default_pg_config, default_upload_path, default_archive_path
 
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (模拟处理函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"target_type: {target_type}")
-    logger.info(f"storage_location: {storage_location}")
-    logger.info(f"frequency: {frequency}")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"=========================================")
+# 导入配置
+PG_CONFIG, STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH = get_config()
+logger.info(f"配置加载完成: 上传路径={STRUCTURE_UPLOAD_BASE_PATH}, 归档路径={STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
 
-    logger.info(f"开始模拟文件加载 - 脚本: {script_name}, 表: {table_name}")
+def get_pg_conn():
+    """获取PostgreSQL连接"""
+    return psycopg2.connect(**PG_CONFIG)
 
+def get_table_columns(table_name):
+    """
+    获取表的列信息,包括列名和注释
+    
+    返回:
+        dict: {列名: 列注释} 的字典
+    """
+    conn = get_pg_conn()
+    cursor = conn.cursor()
     try:
-        logger.info("模拟检查参数...")
-        if not storage_location:
-            logger.warning("警告: 未提供 storage_location (文件路径)")
-        else:
-            logger.info(f"模拟检查文件是否存在: {storage_location}")
-
-        logger.info(f"模拟执行模式: {execution_mode}")
-        if execution_mode == 'full_refresh':
-            logger.info(f"模拟: 如果是全量刷新,将清空表 {table_name}")
+        # 查询表列信息
+        cursor.execute("""
+            SELECT 
+                column_name, 
+                col_description((table_schema || '.' || table_name)::regclass::oid, ordinal_position) as column_comment
+            FROM 
+                information_schema.columns
+            WHERE 
+                table_schema = 'public' -- 明确指定 schema,如果需要
+                AND table_name = %s
+            ORDER BY 
+                ordinal_position
+        """, (table_name.lower(),))
         
-        logger.info("模拟读取和处理文件...")
-        # 模拟成功
-        logger.info(f"模拟: 表 {table_name} 文件加载成功")
-        return True
+        columns = {}
+        for row in cursor.fetchall():
+            col_name = row[0]
+            col_comment = row[1] if row[1] else col_name  # 如果注释为空,使用列名
+            columns[col_name] = col_comment
+            
+        if not columns:
+             logger.warning(f"未能获取到表 '{table_name}' 的列信息,请检查表是否存在、schema是否正确以及权限。")
+
+        return columns
     except Exception as e:
-        logger.error(f"模拟加载文件时出错: {str(e)}")
-        return False
+        logger.error(f"获取表 '{table_name}' 的列信息时出错: {str(e)}")
+        return {}
+    finally:
+        cursor.close()
+        conn.close()
 
-def run(table_name, execution_mode='append', exec_date=None, target_type=None, 
-        storage_location=None, frequency=None, script_name=None, **kwargs):
+def match_csv_columns(csv_headers, table_columns):
     """
-    统一入口函数,符合Airflow动态脚本调用规范 (模拟版本)
-
+    匹配CSV列名与表列名
+    
+    策略:
+    1. 尝试通过表字段注释匹配CSV列名 (忽略大小写和空格)
+    2. 尝试通过名称直接匹配 (忽略大小写和空格)
+    
     参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        exec_date: 执行日期
-        target_type: 目标类型
-        storage_location: 文件路径
-        frequency: 更新频率
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
+        csv_headers (list): CSV文件的列名列表
+        table_columns (dict): {数据库列名: 列注释} 的字典
+    
+    返回:
+        dict: {CSV列名: 数据库列名} 的映射字典
+    """
+    mapping = {}
+    matched_table_cols = set()
+
+    # 数据库列名通常不区分大小写(除非加引号),注释可能区分
+    # 为了匹配更健壮,我们将CSV和数据库列名/注释都转为小写处理
+    processed_table_columns_lower = {col.lower(): col for col in table_columns.keys()}
+    processed_comment_to_column_lower = {
+        str(comment).lower(): col
+        for col, comment in table_columns.items() if comment
+    }
+
+    # 预处理 CSV headers
+    processed_csv_headers_lower = {str(header).lower(): header for header in csv_headers}
+
+    # 1. 通过注释匹配 (忽略大小写)
+    for processed_header, original_header in processed_csv_headers_lower.items():
+        if processed_header in processed_comment_to_column_lower:
+            table_col_original_case = processed_comment_to_column_lower[processed_header]
+            if table_col_original_case not in matched_table_cols:
+                mapping[original_header] = table_col_original_case
+                matched_table_cols.add(table_col_original_case)
+                logger.info(f"通过注释匹配: CSV 列 '{original_header}' -> 表列 '{table_col_original_case}'")
+
+    # 2. 通过名称直接匹配 (忽略大小写),仅匹配尚未映射的列
+    for processed_header, original_header in processed_csv_headers_lower.items():
+         if original_header not in mapping: # 仅当此 CSV 列尚未映射时才进行名称匹配
+            if processed_header in processed_table_columns_lower:
+                table_col_original_case = processed_table_columns_lower[processed_header]
+                if table_col_original_case not in matched_table_cols:
+                    mapping[original_header] = table_col_original_case
+                    matched_table_cols.add(table_col_original_case)
+                    logger.info(f"通过名称匹配: CSV 列 '{original_header}' -> 表列 '{table_col_original_case}'")
+
+    unmapped_csv = [h for h in csv_headers if h not in mapping]
+    if unmapped_csv:
+         logger.warning(f"以下 CSV 列未能匹配到表列: {unmapped_csv}")
+
+    unmapped_table = [col for col in table_columns if col not in matched_table_cols]
+    if unmapped_table:
+        logger.warning(f"以下表列未能匹配到 CSV 列: {unmapped_table}")
+
+    return mapping
 
+def load_csv_to_table(csv_file, table_name, execution_mode='append'):
+    """
+    将单个CSV文件数据加载到目标表
+    
+    参数:
+        csv_file (str): CSV文件路径
+        table_name (str): 目标表名 (大小写可能敏感,取决于数据库)
+        execution_mode (str): 执行模式,'append'或'full_refresh'
+    
     返回:
-        bool: 执行成功返回True,否则返回False
+        bool: 成功返回True,失败返回False
     """
-    # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
-    logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)}) ")
-    logger.info(f"target_type: {target_type}")
-    logger.info(f"storage_location: {storage_location}")
-    logger.info(f"frequency: {frequency}")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"=========================================")
+    conn = None
+    cursor = None # 初始化 cursor
+    logger.info(f"开始处理文件: {csv_file}")
+    try:
+        # 读取CSV文件,尝试自动检测编码
+        try:
+            # 使用 dtype=str 确保所有列按字符串读取,避免类型推断问题,特别是对于ID类字段
+            df = pd.read_csv(csv_file, encoding='utf-8', keep_default_na=False, na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
+        except UnicodeDecodeError:
+            try:
+                logger.warning(f"UTF-8 读取失败,尝试 GBK: {csv_file}")
+                df = pd.read_csv(csv_file, encoding='gbk', keep_default_na=False, na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
+            except UnicodeDecodeError:
+                logger.warning(f"GBK 读取也失败,尝试 latin1: {csv_file}")
+                df = pd.read_csv(csv_file, encoding='latin1', keep_default_na=False, na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
+        except Exception as read_err:
+             logger.error(f"读取 CSV 文件 {csv_file} 时发生未知错误: {str(read_err)}")
+             return False
 
-    # 如果没有提供脚本名,使用当前脚本的文件名
+        logger.info(f"成功读取CSV文件: {os.path.basename(csv_file)}, 共 {len(df)} 行")
+        
+        # 清理列名中的潜在空白符
+        df.columns = df.columns.str.strip()
+
+        # 如果CSV为空,则直接认为成功并返回
+        if df.empty:
+             logger.info(f"CSV 文件 {csv_file} 为空,无需加载数据。")
+             return True
+
+        # 获取CSV列名 (清理后)
+        csv_headers = df.columns.tolist()
+        logger.info(f"清理后的 CSV 列名: {csv_headers}")
+        
+        # 获取表结构
+        table_columns = get_table_columns(table_name)
+        if not table_columns:
+            logger.error(f"无法获取表 '{table_name}' 的列信息,跳过文件 {csv_file}")
+            return False
+        
+        logger.info(f"表 '{table_name}' 的列信息 (列名: 注释): {table_columns}")
+        
+        # 匹配CSV列与表列
+        column_mapping = match_csv_columns(csv_headers, table_columns)
+        logger.info(f"列映射关系 (CSV列名: 表列名): {column_mapping}")
+        
+        # 检查是否有任何列成功映射
+        if not column_mapping:
+            logger.error(f"文件 {csv_file} 的列无法与表 '{table_name}' 的列建立任何映射关系,跳过此文件。")
+            return False # 如果一个都没匹配上,则认为失败
+        
+        # 仅选择成功映射的列进行加载
+        mapped_csv_headers = list(column_mapping.keys())
+        # 使用 .copy() 避免 SettingWithCopyWarning
+        df_mapped = df[mapped_csv_headers].copy()
+        df_mapped.rename(columns=column_mapping, inplace=True)
+        logger.info(f"将加载以下映射后的列: {df_mapped.columns.tolist()}")
+
+        # 将空字符串 '' 替换为 None,以便插入数据库时为 NULL
+        # 使用 map 替代已废弃的 applymap 方法
+        # 对每一列单独应用 map 函数
+        for col in df_mapped.columns:
+            df_mapped[col] = df_mapped[col].map(lambda x: None if isinstance(x, str) and x == '' else x)
+
+        # 连接数据库
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
+        # 根据执行模式确定操作 - 注意:full_refresh 在 run 函数层面控制,这里仅处理单个文件追加
+        # if execution_mode == 'full_refresh':
+        #     logger.warning(f"在 load_csv_to_table 中收到 full_refresh,但清空操作应在 run 函数完成。此处按 append 处理文件:{csv_file}")
+            # # 如果是全量刷新,先清空表 - 这个逻辑移到 run 函数
+            # logger.info(f"执行全量刷新,清空表 {table_name}")
+            # cursor.execute(f"TRUNCATE TABLE {table_name}")
+        
+        # 构建INSERT语句
+        # 使用原始大小写的数据库列名(从 column_mapping 的 value 获取)并加引号
+        columns = ', '.join([f'"{col}"' for col in df_mapped.columns])
+        placeholders = ', '.join(['%s'] * len(df_mapped.columns))
+        # 假设表在 public schema,并为表名加引号以处理大小写或特殊字符
+        insert_sql = f'INSERT INTO public."{table_name}" ({columns}) VALUES ({placeholders})'
+        
+        # 批量插入数据
+        # df_mapped.values 会产生 numpy array,需要转换为 list of tuples
+        # 确保 None 值正确传递
+        rows = [tuple(row) for row in df_mapped.values]
+        
+        try:
+            cursor.executemany(insert_sql, rows)
+            conn.commit()
+            logger.info(f"成功将文件 {os.path.basename(csv_file)} 的 {len(rows)} 行数据插入到表 '{table_name}'")
+        except Exception as insert_err:
+             logger.error(f"向表 '{table_name}' 插入数据时出错: {str(insert_err)}")
+             logger.error(f"出错的 SQL 语句大致为: {insert_sql}")
+             # 可以考虑记录前几行出错的数据 (注意隐私和日志大小)
+             try:
+                 logger.error(f"出错的前3行数据 (部分): {rows[:3]}")
+             except: pass # 防御性编程
+             conn.rollback() # 回滚事务
+             return False # 插入失败则返回 False
+
+        
+        return True
+    except pd.errors.EmptyDataError:
+         logger.info(f"CSV 文件 {csv_file} 为空或只有表头,无需加载数据。")
+         return True # 空文件视为成功处理
+    except Exception as e:
+        # 使用 exc_info=True 获取更详细的堆栈跟踪信息
+        logger.error(f"处理文件 {csv_file} 加载到表 '{table_name}' 时发生意外错误", exc_info=True)
+        if conn:
+            conn.rollback()
+        return False
+    finally:
+        if cursor:
+             cursor.close()
+        if conn:
+             conn.close()
+
+def run(table_name, execution_mode='append', exec_date=None, target_type=None, 
+        storage_location=None, frequency=None, script_name=None, **kwargs):
+    """
+    统一入口函数,支持通配符路径,处理并归档文件
+    """
     if script_name is None:
         script_name = os.path.basename(__file__)
 
-    # 记录详细的执行信息
-    start_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' (模拟) 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
-
-    # 调用实际处理函数 (模拟版本)
-    result = mock_load_file(
-        table_name=table_name,
-        execution_mode=execution_mode,
-        exec_date=exec_date,
-        target_type=target_type,
-        storage_location=storage_location,
-        frequency=frequency,
-        script_name=script_name,
-        **kwargs  # 将额外参数传递给处理函数
-    )
-
-    end_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' (模拟) 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"总耗时: {end_time - start_time}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-
-    return result
+    # 修正之前的日志记录格式错误
+    exec_mode_str = '全量刷新' if execution_mode == 'full_refresh' else '增量追加'
+    logger.info(f"===== 开始执行 {script_name} ({exec_mode_str}) =====")
+    logger.info(f"表名: {table_name}")
+    logger.info(f"执行模式: {execution_mode}")
+    logger.info(f"执行日期: {exec_date}")
+    logger.info(f"目标类型: {target_type}")
+    logger.info(f"资源类型: {target_type}, 文件相对路径模式: {storage_location}")
+    logger.info(f"基准上传路径: {STRUCTURE_UPLOAD_BASE_PATH}")
+    logger.info(f"基准归档路径: {STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
+    logger.info(f"更新频率: {frequency}")
+    
+    # 记录其他参数
+    for key, value in kwargs.items():
+        logger.info(f"其他参数 - {key}: {value}")
+    
+    # 检查必要参数
+    if not storage_location:
+        logger.error("未提供 storage_location (文件查找路径模式)")
+        return False
+    if not STRUCTURE_UPLOAD_BASE_PATH:
+         logger.error("配置错误: STRUCTURE_UPLOAD_BASE_PATH 未设置")
+         return False
+    if not STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH:
+         logger.error("配置错误: STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH 未设置")
+         return False
+
+    # 记录执行开始时间
+    overall_start_time = datetime.now()
+    
+    # 构建完整搜索路径
+    # 使用 os.path.normpath 确保路径分隔符正确
+    # 如果storage_location以斜杠开头,移除开头的斜杠以避免被当作绝对路径处理
+    if storage_location.startswith('/'):
+        storage_location = storage_location.lstrip('/')
+        logger.info(f"检测到storage_location以斜杠开头,已移除: {storage_location}")
+    
+    full_search_pattern = os.path.normpath(os.path.join(STRUCTURE_UPLOAD_BASE_PATH, storage_location))
+    logger.info(f"完整文件搜索模式: {full_search_pattern}")
+    
+    # 检查路径是否存在(至少目录部分)
+    search_dir = os.path.dirname(full_search_pattern)
+    if not os.path.exists(search_dir):
+        error_msg = f"错误: 搜索目录不存在: {search_dir}"
+        logger.error(error_msg)
+        raise FileNotFoundError(error_msg)  # 抛出异常而不是返回False
+    
+    # 查找匹配的文件
+    try:
+        # 增加 recursive=True 如果需要递归查找子目录中的文件 (例如 storage_location 是 a/b/**/*.csv)
+        # 当前假设模式只在指定目录下匹配,例如 /data/subdir/*.csv
+        found_files = glob.glob(full_search_pattern, recursive=False)
+    except Exception as glob_err:
+         logger.error(f"查找文件时发生错误 (模式: {full_search_pattern}): {str(glob_err)}")
+         raise  # 重新抛出异常
+
+    if not found_files:
+        logger.warning(f"在目录 {search_dir} 下未找到匹配模式 '{os.path.basename(full_search_pattern)}' 的文件")
+        return True  # 找不到文件视为正常情况,返回成功
+
+    logger.info(f"找到 {len(found_files)} 个匹配文件: {found_files}")
+
+    # 如果是全量刷新,在处理任何文件前清空表
+    if execution_mode == 'full_refresh':
+        conn = None
+        cursor = None
+        try:
+            conn = get_pg_conn()
+            cursor = conn.cursor()
+            # 假设表在 public schema,并为表名加引号
+            logger.info(f"执行全量刷新,清空表 public.\"{table_name}\"")
+            cursor.execute(f'TRUNCATE TABLE public.\"{table_name}\"')
+            conn.commit()
+            logger.info("表 public.\"" + table_name + "\" 已清空。")
+        except Exception as e:
+            logger.error("清空表 public.\"" + table_name + "\" 时出错: " + str(e))
+            if conn:
+                conn.rollback()
+            return False # 清空失败则直接失败退出
+        finally:
+            if cursor:
+                 cursor.close()
+            if conn:
+                 conn.close()
+
+    # 处理并归档每个找到的文件
+    processed_files_count = 0
+    failed_files = []
+
+    for file_path in found_files:
+        file_start_time = datetime.now()
+        # 使用 normpath 统一路径表示
+        normalized_file_path = os.path.normpath(file_path)
+        logger.info(f"--- 开始处理文件: {os.path.basename(normalized_file_path)} ---")
+        try:
+            # 加载CSV数据到表 (注意:full_refresh时也是append模式加载,因为表已清空)
+            load_success = load_csv_to_table(normalized_file_path, table_name, 'append')
+            
+            if load_success:
+                logger.info(f"文件 {os.path.basename(normalized_file_path)} 加载成功。")
+                processed_files_count += 1
+                # 归档文件
+                try:
+                    # 计算相对路径部分 (storage_location 可能包含子目录)
+                    # 使用 os.path.dirname 获取 storage_location 的目录部分
+                    relative_dir = os.path.dirname(storage_location)
+                    
+                    # 获取当前日期
+                    date_str = datetime.now().strftime('%Y-%m-%d')
+                    
+                    # 构建归档目录路径
+                    archive_dir = os.path.normpath(os.path.join(STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH, relative_dir, date_str))
+                    
+                    # 创建归档目录(如果不存在)
+                    os.makedirs(archive_dir, exist_ok=True)
+                    
+                    # 获取当前unix时间戳并转换为字符串
+                    unix_timestamp = int(datetime.now().timestamp())
+                    
+                    # 修改文件名,添加时间戳
+                    original_filename = os.path.basename(normalized_file_path)
+                    filename_parts = os.path.splitext(original_filename)
+                    new_filename = f"{filename_parts[0]}_{unix_timestamp}{filename_parts[1]}"
+                    
+                    # 构建文件在归档目录中的最终路径
+                    archive_dest_path = os.path.join(archive_dir, new_filename)
+                    
+                    # 移动文件
+                    shutil.move(normalized_file_path, archive_dest_path)
+                    logger.info(f"文件已成功移动到归档目录并重命名: {archive_dest_path}")
+                    
+                except Exception as move_err:
+                    # 记录错误,但由于数据已加载,不将整体任务标记为失败
+                    logger.error(f"加载文件 {os.path.basename(normalized_file_path)} 成功,但移动到归档目录时出错", exc_info=True)
+                    logger.error(f"原始文件路径: {normalized_file_path}")
+            else:
+                logger.error(f"文件 {os.path.basename(normalized_file_path)} 加载失败,中止处理。")
+                # 修改:任何一个文件加载失败就直接返回 False
+                # 记录最终统计
+                overall_end_time = datetime.now()
+                overall_duration = (overall_end_time - overall_start_time).total_seconds()
+                logger.info(f"===== {script_name} 执行完成 (失败) =====")
+                logger.info(f"总耗时: {overall_duration:.2f}秒")
+                logger.info(f"共找到文件: {len(found_files)}")
+                logger.info(f"成功处理文件数: {processed_files_count}")
+                logger.error(f"首个失败文件: {os.path.basename(normalized_file_path)}")
+                return False
+
+        except Exception as file_proc_err:
+            logger.error(f"处理文件 {os.path.basename(normalized_file_path)} 时发生意外错误", exc_info=True)
+            # 修改:任何一个文件处理异常就直接返回 False
+            # 记录最终统计
+            overall_end_time = datetime.now()
+            overall_duration = (overall_end_time - overall_start_time).total_seconds()
+            logger.info(f"===== {script_name} 执行完成 (异常) =====")
+            logger.info(f"总耗时: {overall_duration:.2f}秒")
+            logger.info(f"共找到文件: {len(found_files)}")
+            logger.info(f"成功处理文件数: {processed_files_count}")
+            logger.error(f"首个异常文件: {os.path.basename(normalized_file_path)}")
+            return False
+            
+        finally:
+            file_end_time = datetime.now()
+            file_duration = (file_end_time - file_start_time).total_seconds()
+            logger.info(f"--- 文件 {os.path.basename(normalized_file_path)} 处理结束,耗时: {file_duration:.2f}秒 ---")
+
+    # 记录总体执行结果(全部成功的情况)
+    overall_end_time = datetime.now()
+    overall_duration = (overall_end_time - overall_start_time).total_seconds()
+    
+    logger.info(f"===== {script_name} 执行完成 (成功) =====")
+    logger.info(f"总耗时: {overall_duration:.2f}秒")
+    logger.info(f"共找到文件: {len(found_files)}")
+    logger.info(f"成功处理文件数: {processed_files_count}")
+
+    return True  # 所有文件处理成功
 
 if __name__ == "__main__":
-    # 提供一些默认值以便直接运行脚本进行测试
-    test_params = {
-        "table_name": "sample_table",
-        "execution_mode": "full_refresh",
+    # 直接执行时的测试代码
+    import argparse
+    
+    parser = argparse.ArgumentParser(description='从CSV文件加载数据到表(支持通配符)')
+    parser.add_argument('--table', type=str, required=True, help='目标表名')
+    parser.add_argument('--pattern', type=str, required=True, help='CSV文件查找模式 (相对于基准上传路径的相对路径,例如: data/*.csv 或 *.csv)')
+    parser.add_argument('--mode', type=str, default='append', choices=['append', 'full_refresh'], help='执行模式: append 或 full_refresh')
+    
+    args = parser.parse_args()
+    
+    # 构造必要的 kwargs
+    run_kwargs = {
+        "table_name": args.table,
+        "execution_mode": args.mode,
+        "storage_location": args.pattern,
+        "target_type": 'structure',
         "exec_date": datetime.now().strftime('%Y-%m-%d'),
-        "target_type": "structure",
-        "storage_location": "/path/to/mock/file.csv",
-        "frequency": "daily",
-        "script_name": os.path.basename(__file__),
-        "custom_param": "abc",
-        "another_param": 456
+        "frequency": "manual",
+        "script_name": os.path.basename(__file__)
     }
-    logger.info(f"以主脚本方式运行 (模拟),使用测试参数: {test_params}")
-    run(**test_params) 
+
+    logger.info("命令行测试执行参数: " + str(run_kwargs))
+
+    success = run(**run_kwargs)
+    
+    if success:
+        print("CSV文件加载任务执行完毕,所有文件处理成功。")
+        sys.exit(0)
+    else:
+        print("CSV文件加载任务执行完毕,但有部分或全部文件处理失败。")
+        sys.exit(1)

+ 0 - 266
dataops/scripts/load_file_real.py

@@ -1,266 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-import logging
-import sys
-import os
-import pandas as pd
-import psycopg2
-from datetime import datetime
-import csv
-from dags.config import PG_CONFIG
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("load_file")
-
-def get_pg_conn():
-    """获取PostgreSQL连接"""
-    return psycopg2.connect(**PG_CONFIG)
-
-def get_table_columns(table_name):
-    """
-    获取表的列信息,包括列名和注释
-    
-    返回:
-        dict: {列名: 列注释} 的字典
-    """
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 查询表列信息
-        cursor.execute("""
-            SELECT 
-                column_name, 
-                col_description((table_schema || '.' || table_name)::regclass::oid, ordinal_position) as column_comment
-            FROM 
-                information_schema.columns
-            WHERE 
-                table_name = %s
-            ORDER BY 
-                ordinal_position
-        """, (table_name,))
-        
-        columns = {}
-        for row in cursor.fetchall():
-            col_name = row[0]
-            col_comment = row[1] if row[1] else col_name  # 如果注释为空,使用列名
-            columns[col_name] = col_comment
-            
-        return columns
-    except Exception as e:
-        logger.error(f"获取表 {table_name} 的列信息时出错: {str(e)}")
-        return {}
-    finally:
-        cursor.close()
-        conn.close()
-
-def match_csv_columns(csv_headers, table_columns):
-    """
-    匹配CSV列名与表列名
-    
-    策略:
-    1. 尝试通过表字段注释匹配CSV列名
-    2. 尝试通过名称直接匹配
-    
-    参数:
-        csv_headers (list): CSV文件的列名列表
-        table_columns (dict): {列名: 列注释} 的字典
-    
-    返回:
-        dict: {CSV列名: 表列名} 的映射字典
-    """
-    mapping = {}
-    
-    # 通过注释匹配
-    comment_to_column = {comment: col for col, comment in table_columns.items()}
-    for header in csv_headers:
-        if header in comment_to_column:
-            mapping[header] = comment_to_column[header]
-            continue
-        
-        # 尝试直接名称匹配
-        if header in table_columns:
-            mapping[header] = header
-    
-    return mapping
-
-def load_csv_to_table(csv_file, table_name, execution_mode='append'):
-    """
-    将CSV文件数据加载到目标表
-    
-    参数:
-        csv_file (str): CSV文件路径
-        table_name (str): 目标表名
-        execution_mode (str): 执行模式,'append'或'full_refresh'
-    
-    返回:
-        bool: 成功返回True,失败返回False
-    """
-    conn = None
-    try:
-        # 读取CSV文件,尝试自动检测编码
-        try:
-            df = pd.read_csv(csv_file, encoding='utf-8')
-        except UnicodeDecodeError:
-            try:
-                df = pd.read_csv(csv_file, encoding='gbk')
-            except UnicodeDecodeError:
-                df = pd.read_csv(csv_file, encoding='latin1')
-        
-        logger.info(f"成功读取CSV文件: {csv_file}, 共 {len(df)} 行")
-        
-        # 获取CSV列名
-        csv_headers = df.columns.tolist()
-        logger.info(f"CSV列名: {csv_headers}")
-        
-        # 获取表结构
-        table_columns = get_table_columns(table_name)
-        if not table_columns:
-            logger.error(f"无法获取表 {table_name} 的列信息")
-            return False
-        
-        logger.info(f"表 {table_name} 的列信息: {table_columns}")
-        
-        # 匹配CSV列与表列
-        column_mapping = match_csv_columns(csv_headers, table_columns)
-        logger.info(f"列映射关系: {column_mapping}")
-        
-        if not column_mapping:
-            logger.error(f"无法建立CSV列与表列的映射关系")
-            return False
-        
-        # 筛选和重命名列
-        df_mapped = df[list(column_mapping.keys())].rename(columns=column_mapping)
-        
-        # 连接数据库
-        conn = get_pg_conn()
-        cursor = conn.cursor()
-        
-        # 根据执行模式确定操作
-        if execution_mode == 'full_refresh':
-            # 如果是全量刷新,先清空表
-            logger.info(f"执行全量刷新,清空表 {table_name}")
-            cursor.execute(f"TRUNCATE TABLE {table_name}")
-        
-        # 构建INSERT语句
-        columns = ', '.join(df_mapped.columns)
-        placeholders = ', '.join(['%s'] * len(df_mapped.columns))
-        insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
-        
-        # 批量插入数据
-        rows = [tuple(row) for row in df_mapped.values]
-        cursor.executemany(insert_sql, rows)
-        
-        # 提交事务
-        conn.commit()
-        logger.info(f"成功插入 {len(rows)} 行数据到表 {table_name}")
-        
-        return True
-    except Exception as e:
-        logger.error(f"加载CSV数据到表时出错: {str(e)}")
-        if conn:
-            conn.rollback()
-        return False
-    finally:
-        if conn:
-            conn.close()
-
-def run(table_name, execution_mode='append', exec_date=None, target_type=None, 
-        storage_location=None, frequency=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        exec_date: 执行日期
-        target_type: 目标类型,对于CSV文件应为'structure'
-        storage_location: CSV文件路径
-        frequency: 更新频率
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"===== 开始执行CSV文件加载 =====")
-    logger.info(f"表名: {table_name}")
-    logger.info(f"执行模式: {execution_mode}")
-    logger.info(f"执行日期: {exec_date}")
-    logger.info(f"目标类型: {target_type}")
-    logger.info(f"文件路径: {storage_location}")
-    logger.info(f"更新频率: {frequency}")
-    
-    # 记录其他参数
-    for key, value in kwargs.items():
-        logger.info(f"其他参数 - {key}: {value}")
-    
-    # 检查必要参数
-    if not storage_location:
-        logger.error("未提供CSV文件路径")
-        return False
-    
-    # 检查文件是否存在
-    if not os.path.exists(storage_location):
-        logger.error(f"CSV文件不存在: {storage_location}")
-        return False
-    
-    # 记录执行开始时间
-    start_time = datetime.now()
-    
-    try:
-        # 加载CSV数据到表
-        result = load_csv_to_table(storage_location, table_name, execution_mode)
-        
-        # 记录执行结束时间
-        end_time = datetime.now()
-        duration = (end_time - start_time).total_seconds()
-        
-        if result:
-            logger.info(f"CSV文件加载成功,耗时: {duration:.2f}秒")
-        else:
-            logger.error(f"CSV文件加载失败,耗时: {duration:.2f}秒")
-        
-        return result
-    except Exception as e:
-        # 记录执行结束时间
-        end_time = datetime.now()
-        duration = (end_time - start_time).total_seconds()
-        
-        logger.error(f"CSV文件加载过程中出错: {str(e)}")
-        logger.error(f"CSV文件加载失败,耗时: {duration:.2f}秒")
-        
-        return False
-    finally:
-        logger.info(f"===== CSV文件加载执行完成 =====")
-
-if __name__ == "__main__":
-    # 直接执行时的测试代码
-    import argparse
-    
-    parser = argparse.ArgumentParser(description='从CSV文件加载数据到表')
-    parser.add_argument('--table', type=str, required=True, help='目标表名')
-    parser.add_argument('--file', type=str, required=True, help='CSV文件路径')
-    parser.add_argument('--mode', type=str, default='append', help='执行模式: append或full_refresh')
-    
-    args = parser.parse_args()
-    
-    success = run(
-        table_name=args.table,
-        execution_mode=args.mode,
-        storage_location=args.file,
-        target_type='structure'
-    )
-    
-    if success:
-        print("CSV文件加载成功")
-        sys.exit(0)
-    else:
-        print("CSV文件加载失败")
-        sys.exit(1)

+ 134 - 0
dataops/scripts/load_file_test.py

@@ -0,0 +1,134 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import sys
+import os
+from datetime import datetime
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("load_file_mock") # 使用 mock 后缀以区分
+
+def mock_load_file(table_name=None, execution_mode='append', exec_date=None, 
+                   target_type=None, storage_location=None, frequency=None, script_name=None, **kwargs):
+    """模拟加载文件数据,仅打印参数"""
+    # 获取当前脚本的文件名(如果没有传入)
+    if script_name is None:
+        script_name = os.path.basename(__file__)
+
+    # 打印所有传入的参数
+    logger.info(f"===== 传入参数信息 (模拟处理函数内) =====")
+    logger.info(f"table_name: {table_name}")
+    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"exec_date: {exec_date}")
+    logger.info(f"target_type: {target_type}")
+    logger.info(f"storage_location: {storage_location}")
+    logger.info(f"frequency: {frequency}")
+    logger.info(f"script_name: {script_name}")
+    # 打印所有可能的额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}")
+    logger.info(f"=========================================")
+
+    logger.info(f"开始模拟文件加载 - 脚本: {script_name}, 表: {table_name}")
+
+    try:
+        logger.info("模拟检查参数...")
+        if not storage_location:
+            logger.warning("警告: 未提供 storage_location (文件路径)")
+        else:
+            logger.info(f"模拟检查文件是否存在: {storage_location}")
+
+        logger.info(f"模拟执行模式: {execution_mode}")
+        if execution_mode == 'full_refresh':
+            logger.info(f"模拟: 如果是全量刷新,将清空表 {table_name}")
+        
+        logger.info("模拟读取和处理文件...")
+        # 模拟成功
+        logger.info(f"模拟: 表 {table_name} 文件加载成功")
+        return True
+    except Exception as e:
+        logger.error(f"模拟加载文件时出错: {str(e)}")
+        return False
+
+def run(table_name, execution_mode='append', exec_date=None, target_type=None, 
+        storage_location=None, frequency=None, script_name=None, **kwargs):
+    """
+    统一入口函数,符合Airflow动态脚本调用规范 (模拟版本)
+
+    参数:
+        table_name (str): 要处理的表名
+        execution_mode (str): 执行模式 (append/full_refresh)
+        exec_date: 执行日期
+        target_type: 目标类型
+        storage_location: 文件路径
+        frequency: 更新频率
+        script_name: 脚本名称
+        **kwargs: 其他可能的参数
+
+    返回:
+        bool: 执行成功返回True,否则返回False
+    """
+    # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
+    logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
+    logger.info(f"table_name: {table_name}")
+    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)}) ")
+    logger.info(f"target_type: {target_type}")
+    logger.info(f"storage_location: {storage_location}")
+    logger.info(f"frequency: {frequency}")
+    logger.info(f"script_name: {script_name}")
+    # 打印所有可能的额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}")
+    logger.info(f"=========================================")
+
+    # 如果没有提供脚本名,使用当前脚本的文件名
+    if script_name is None:
+        script_name = os.path.basename(__file__)
+
+    # 记录详细的执行信息
+    start_time = datetime.now()
+    logger.info(f"脚本 '{script_name}' (模拟) 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
+
+    # 调用实际处理函数 (模拟版本)
+    result = mock_load_file(
+        table_name=table_name,
+        execution_mode=execution_mode,
+        exec_date=exec_date,
+        target_type=target_type,
+        storage_location=storage_location,
+        frequency=frequency,
+        script_name=script_name,
+        **kwargs  # 将额外参数传递给处理函数
+    )
+
+    end_time = datetime.now()
+    logger.info(f"脚本 '{script_name}' (模拟) 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
+    logger.info(f"总耗时: {end_time - start_time}")
+    logger.info(f"处理结果: {'成功' if result else '失败'}")
+
+    return result
+
+if __name__ == "__main__":
+    # 提供一些默认值以便直接运行脚本进行测试
+    test_params = {
+        "table_name": "sample_table",
+        "execution_mode": "full_refresh",
+        "exec_date": datetime.now().strftime('%Y-%m-%d'),
+        "target_type": "structure",
+        "storage_location": "/path/to/mock/file.csv",
+        "frequency": "daily",
+        "script_name": os.path.basename(__file__),
+        "custom_param": "abc",
+        "another_param": 456
+    }
+    logger.info(f"以主脚本方式运行 (模拟),使用测试参数: {test_params}")
+    run(**test_params)