|
@@ -3,11 +3,7 @@
|
|
import logging
|
|
import logging
|
|
import sys
|
|
import sys
|
|
import os
|
|
import os
|
|
-import pandas as pd
|
|
|
|
-import psycopg2
|
|
|
|
from datetime import datetime
|
|
from datetime import datetime
|
|
-import csv
|
|
|
|
-from dags.config import PG_CONFIG
|
|
|
|
|
|
|
|
# 配置日志记录器
|
|
# 配置日志记录器
|
|
logging.basicConfig(
|
|
logging.basicConfig(
|
|
@@ -18,249 +14,121 @@ logging.basicConfig(
|
|
]
|
|
]
|
|
)
|
|
)
|
|
|
|
|
|
-logger = logging.getLogger("load_file")
|
|
|
|
|
|
+logger = logging.getLogger("load_file_mock") # 使用 mock 后缀以区分
|
|
|
|
|
|
-def get_pg_conn():
|
|
|
|
- """获取PostgreSQL连接"""
|
|
|
|
- return psycopg2.connect(**PG_CONFIG)
|
|
|
|
|
|
+def mock_load_file(table_name=None, execution_mode='append', exec_date=None,
|
|
|
|
+ target_type=None, storage_location=None, frequency=None, script_name=None, **kwargs):
|
|
|
|
+ """模拟加载文件数据,仅打印参数"""
|
|
|
|
+ # 获取当前脚本的文件名(如果没有传入)
|
|
|
|
+ if script_name is None:
|
|
|
|
+ script_name = os.path.basename(__file__)
|
|
|
|
|
|
-def get_table_columns(table_name):
|
|
|
|
- """
|
|
|
|
- 获取表的列信息,包括列名和注释
|
|
|
|
-
|
|
|
|
- 返回:
|
|
|
|
- dict: {列名: 列注释} 的字典
|
|
|
|
- """
|
|
|
|
- conn = get_pg_conn()
|
|
|
|
- cursor = conn.cursor()
|
|
|
|
- try:
|
|
|
|
- # 查询表列信息
|
|
|
|
- cursor.execute("""
|
|
|
|
- SELECT
|
|
|
|
- column_name,
|
|
|
|
- col_description((table_schema || '.' || table_name)::regclass::oid, ordinal_position) as column_comment
|
|
|
|
- FROM
|
|
|
|
- information_schema.columns
|
|
|
|
- WHERE
|
|
|
|
- table_name = %s
|
|
|
|
- ORDER BY
|
|
|
|
- ordinal_position
|
|
|
|
- """, (table_name,))
|
|
|
|
-
|
|
|
|
- columns = {}
|
|
|
|
- for row in cursor.fetchall():
|
|
|
|
- col_name = row[0]
|
|
|
|
- col_comment = row[1] if row[1] else col_name # 如果注释为空,使用列名
|
|
|
|
- columns[col_name] = col_comment
|
|
|
|
-
|
|
|
|
- return columns
|
|
|
|
- except Exception as e:
|
|
|
|
- logger.error(f"获取表 {table_name} 的列信息时出错: {str(e)}")
|
|
|
|
- return {}
|
|
|
|
- finally:
|
|
|
|
- cursor.close()
|
|
|
|
- conn.close()
|
|
|
|
|
|
+ # 打印所有传入的参数
|
|
|
|
+ logger.info(f"===== 传入参数信息 (模拟处理函数内) =====")
|
|
|
|
+ logger.info(f"table_name: {table_name}")
|
|
|
|
+ logger.info(f"execution_mode: {execution_mode}")
|
|
|
|
+ logger.info(f"exec_date: {exec_date}")
|
|
|
|
+ logger.info(f"target_type: {target_type}")
|
|
|
|
+ logger.info(f"storage_location: {storage_location}")
|
|
|
|
+ logger.info(f"frequency: {frequency}")
|
|
|
|
+ logger.info(f"script_name: {script_name}")
|
|
|
|
+ # 打印所有可能的额外参数
|
|
|
|
+ for key, value in kwargs.items():
|
|
|
|
+ logger.info(f"额外参数 - {key}: {value}")
|
|
|
|
+ logger.info(f"=========================================")
|
|
|
|
|
|
-def match_csv_columns(csv_headers, table_columns):
|
|
|
|
- """
|
|
|
|
- 匹配CSV列名与表列名
|
|
|
|
-
|
|
|
|
- 策略:
|
|
|
|
- 1. 尝试通过表字段注释匹配CSV列名
|
|
|
|
- 2. 尝试通过名称直接匹配
|
|
|
|
-
|
|
|
|
- 参数:
|
|
|
|
- csv_headers (list): CSV文件的列名列表
|
|
|
|
- table_columns (dict): {列名: 列注释} 的字典
|
|
|
|
-
|
|
|
|
- 返回:
|
|
|
|
- dict: {CSV列名: 表列名} 的映射字典
|
|
|
|
- """
|
|
|
|
- mapping = {}
|
|
|
|
-
|
|
|
|
- # 通过注释匹配
|
|
|
|
- comment_to_column = {comment: col for col, comment in table_columns.items()}
|
|
|
|
- for header in csv_headers:
|
|
|
|
- if header in comment_to_column:
|
|
|
|
- mapping[header] = comment_to_column[header]
|
|
|
|
- continue
|
|
|
|
-
|
|
|
|
- # 尝试直接名称匹配
|
|
|
|
- if header in table_columns:
|
|
|
|
- mapping[header] = header
|
|
|
|
-
|
|
|
|
- return mapping
|
|
|
|
|
|
+ logger.info(f"开始模拟文件加载 - 脚本: {script_name}, 表: {table_name}")
|
|
|
|
|
|
-def load_csv_to_table(csv_file, table_name, execution_mode='append'):
|
|
|
|
- """
|
|
|
|
- 将CSV文件数据加载到目标表
|
|
|
|
-
|
|
|
|
- 参数:
|
|
|
|
- csv_file (str): CSV文件路径
|
|
|
|
- table_name (str): 目标表名
|
|
|
|
- execution_mode (str): 执行模式,'append'或'full_refresh'
|
|
|
|
-
|
|
|
|
- 返回:
|
|
|
|
- bool: 成功返回True,失败返回False
|
|
|
|
- """
|
|
|
|
- conn = None
|
|
|
|
try:
|
|
try:
|
|
- # 读取CSV文件,尝试自动检测编码
|
|
|
|
- try:
|
|
|
|
- df = pd.read_csv(csv_file, encoding='utf-8')
|
|
|
|
- except UnicodeDecodeError:
|
|
|
|
- try:
|
|
|
|
- df = pd.read_csv(csv_file, encoding='gbk')
|
|
|
|
- except UnicodeDecodeError:
|
|
|
|
- df = pd.read_csv(csv_file, encoding='latin1')
|
|
|
|
-
|
|
|
|
- logger.info(f"成功读取CSV文件: {csv_file}, 共 {len(df)} 行")
|
|
|
|
-
|
|
|
|
- # 获取CSV列名
|
|
|
|
- csv_headers = df.columns.tolist()
|
|
|
|
- logger.info(f"CSV列名: {csv_headers}")
|
|
|
|
-
|
|
|
|
- # 获取表结构
|
|
|
|
- table_columns = get_table_columns(table_name)
|
|
|
|
- if not table_columns:
|
|
|
|
- logger.error(f"无法获取表 {table_name} 的列信息")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- logger.info(f"表 {table_name} 的列信息: {table_columns}")
|
|
|
|
-
|
|
|
|
- # 匹配CSV列与表列
|
|
|
|
- column_mapping = match_csv_columns(csv_headers, table_columns)
|
|
|
|
- logger.info(f"列映射关系: {column_mapping}")
|
|
|
|
-
|
|
|
|
- if not column_mapping:
|
|
|
|
- logger.error(f"无法建立CSV列与表列的映射关系")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # 筛选和重命名列
|
|
|
|
- df_mapped = df[list(column_mapping.keys())].rename(columns=column_mapping)
|
|
|
|
-
|
|
|
|
- # 连接数据库
|
|
|
|
- conn = get_pg_conn()
|
|
|
|
- cursor = conn.cursor()
|
|
|
|
-
|
|
|
|
- # 根据执行模式确定操作
|
|
|
|
|
|
+ logger.info("模拟检查参数...")
|
|
|
|
+ if not storage_location:
|
|
|
|
+ logger.warning("警告: 未提供 storage_location (文件路径)")
|
|
|
|
+ else:
|
|
|
|
+ logger.info(f"模拟检查文件是否存在: {storage_location}")
|
|
|
|
+
|
|
|
|
+ logger.info(f"模拟执行模式: {execution_mode}")
|
|
if execution_mode == 'full_refresh':
|
|
if execution_mode == 'full_refresh':
|
|
- # 如果是全量刷新,先清空表
|
|
|
|
- logger.info(f"执行全量刷新,清空表 {table_name}")
|
|
|
|
- cursor.execute(f"TRUNCATE TABLE {table_name}")
|
|
|
|
-
|
|
|
|
- # 构建INSERT语句
|
|
|
|
- columns = ', '.join(df_mapped.columns)
|
|
|
|
- placeholders = ', '.join(['%s'] * len(df_mapped.columns))
|
|
|
|
- insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
|
|
|
|
-
|
|
|
|
- # 批量插入数据
|
|
|
|
- rows = [tuple(row) for row in df_mapped.values]
|
|
|
|
- cursor.executemany(insert_sql, rows)
|
|
|
|
-
|
|
|
|
- # 提交事务
|
|
|
|
- conn.commit()
|
|
|
|
- logger.info(f"成功插入 {len(rows)} 行数据到表 {table_name}")
|
|
|
|
|
|
+ logger.info(f"模拟: 如果是全量刷新,将清空表 {table_name}")
|
|
|
|
|
|
|
|
+ logger.info("模拟读取和处理文件...")
|
|
|
|
+ # 模拟成功
|
|
|
|
+ logger.info(f"模拟: 表 {table_name} 文件加载成功")
|
|
return True
|
|
return True
|
|
except Exception as e:
|
|
except Exception as e:
|
|
- logger.error(f"加载CSV数据到表时出错: {str(e)}")
|
|
|
|
- if conn:
|
|
|
|
- conn.rollback()
|
|
|
|
|
|
+ logger.error(f"模拟加载文件时出错: {str(e)}")
|
|
return False
|
|
return False
|
|
- finally:
|
|
|
|
- if conn:
|
|
|
|
- conn.close()
|
|
|
|
|
|
|
|
def run(table_name, execution_mode='append', exec_date=None, target_type=None,
|
|
def run(table_name, execution_mode='append', exec_date=None, target_type=None,
|
|
- storage_location=None, frequency=None, **kwargs):
|
|
|
|
|
|
+ storage_location=None, frequency=None, script_name=None, **kwargs):
|
|
"""
|
|
"""
|
|
- 统一入口函数,符合Airflow动态脚本调用规范
|
|
|
|
-
|
|
|
|
|
|
+ 统一入口函数,符合Airflow动态脚本调用规范 (模拟版本)
|
|
|
|
+
|
|
参数:
|
|
参数:
|
|
table_name (str): 要处理的表名
|
|
table_name (str): 要处理的表名
|
|
execution_mode (str): 执行模式 (append/full_refresh)
|
|
execution_mode (str): 执行模式 (append/full_refresh)
|
|
exec_date: 执行日期
|
|
exec_date: 执行日期
|
|
- target_type: 目标类型,对于CSV文件应为'structure'
|
|
|
|
- storage_location: CSV文件路径
|
|
|
|
|
|
+ target_type: 目标类型
|
|
|
|
+ storage_location: 文件路径
|
|
frequency: 更新频率
|
|
frequency: 更新频率
|
|
|
|
+ script_name: 脚本名称
|
|
**kwargs: 其他可能的参数
|
|
**kwargs: 其他可能的参数
|
|
-
|
|
|
|
|
|
+
|
|
返回:
|
|
返回:
|
|
bool: 执行成功返回True,否则返回False
|
|
bool: 执行成功返回True,否则返回False
|
|
"""
|
|
"""
|
|
- logger.info(f"===== 开始执行CSV文件加载 =====")
|
|
|
|
- logger.info(f"表名: {table_name}")
|
|
|
|
- logger.info(f"执行模式: {execution_mode}")
|
|
|
|
- logger.info(f"执行日期: {exec_date}")
|
|
|
|
- logger.info(f"目标类型: {target_type}")
|
|
|
|
- logger.info(f"文件路径: {storage_location}")
|
|
|
|
- logger.info(f"更新频率: {frequency}")
|
|
|
|
-
|
|
|
|
- # 记录其他参数
|
|
|
|
|
|
+ # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
|
|
|
|
+ logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
|
|
|
|
+ logger.info(f"table_name: {table_name}")
|
|
|
|
+ logger.info(f"execution_mode: {execution_mode}")
|
|
|
|
+ logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)}) ")
|
|
|
|
+ logger.info(f"target_type: {target_type}")
|
|
|
|
+ logger.info(f"storage_location: {storage_location}")
|
|
|
|
+ logger.info(f"frequency: {frequency}")
|
|
|
|
+ logger.info(f"script_name: {script_name}")
|
|
|
|
+ # 打印所有可能的额外参数
|
|
for key, value in kwargs.items():
|
|
for key, value in kwargs.items():
|
|
- logger.info(f"其他参数 - {key}: {value}")
|
|
|
|
-
|
|
|
|
- # 检查必要参数
|
|
|
|
- if not storage_location:
|
|
|
|
- logger.error("未提供CSV文件路径")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # 检查文件是否存在
|
|
|
|
- if not os.path.exists(storage_location):
|
|
|
|
- logger.error(f"CSV文件不存在: {storage_location}")
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- # 记录执行开始时间
|
|
|
|
|
|
+ logger.info(f"额外参数 - {key}: {value}")
|
|
|
|
+ logger.info(f"=========================================")
|
|
|
|
+
|
|
|
|
+ # 如果没有提供脚本名,使用当前脚本的文件名
|
|
|
|
+ if script_name is None:
|
|
|
|
+ script_name = os.path.basename(__file__)
|
|
|
|
+
|
|
|
|
+ # 记录详细的执行信息
|
|
start_time = datetime.now()
|
|
start_time = datetime.now()
|
|
-
|
|
|
|
- try:
|
|
|
|
- # 加载CSV数据到表
|
|
|
|
- result = load_csv_to_table(storage_location, table_name, execution_mode)
|
|
|
|
-
|
|
|
|
- # 记录执行结束时间
|
|
|
|
- end_time = datetime.now()
|
|
|
|
- duration = (end_time - start_time).total_seconds()
|
|
|
|
-
|
|
|
|
- if result:
|
|
|
|
- logger.info(f"CSV文件加载成功,耗时: {duration:.2f}秒")
|
|
|
|
- else:
|
|
|
|
- logger.error(f"CSV文件加载失败,耗时: {duration:.2f}秒")
|
|
|
|
-
|
|
|
|
- return result
|
|
|
|
- except Exception as e:
|
|
|
|
- # 记录执行结束时间
|
|
|
|
- end_time = datetime.now()
|
|
|
|
- duration = (end_time - start_time).total_seconds()
|
|
|
|
-
|
|
|
|
- logger.error(f"CSV文件加载过程中出错: {str(e)}")
|
|
|
|
- logger.error(f"CSV文件加载失败,耗时: {duration:.2f}秒")
|
|
|
|
-
|
|
|
|
- return False
|
|
|
|
- finally:
|
|
|
|
- logger.info(f"===== CSV文件加载执行完成 =====")
|
|
|
|
|
|
+ logger.info(f"脚本 '{script_name}' (模拟) 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
|
|
-if __name__ == "__main__":
|
|
|
|
- # 直接执行时的测试代码
|
|
|
|
- import argparse
|
|
|
|
-
|
|
|
|
- parser = argparse.ArgumentParser(description='从CSV文件加载数据到表')
|
|
|
|
- parser.add_argument('--table', type=str, required=True, help='目标表名')
|
|
|
|
- parser.add_argument('--file', type=str, required=True, help='CSV文件路径')
|
|
|
|
- parser.add_argument('--mode', type=str, default='append', help='执行模式: append或full_refresh')
|
|
|
|
-
|
|
|
|
- args = parser.parse_args()
|
|
|
|
-
|
|
|
|
- success = run(
|
|
|
|
- table_name=args.table,
|
|
|
|
- execution_mode=args.mode,
|
|
|
|
- storage_location=args.file,
|
|
|
|
- target_type='structure'
|
|
|
|
|
|
+ # 调用实际处理函数 (模拟版本)
|
|
|
|
+ result = mock_load_file(
|
|
|
|
+ table_name=table_name,
|
|
|
|
+ execution_mode=execution_mode,
|
|
|
|
+ exec_date=exec_date,
|
|
|
|
+ target_type=target_type,
|
|
|
|
+ storage_location=storage_location,
|
|
|
|
+ frequency=frequency,
|
|
|
|
+ script_name=script_name,
|
|
|
|
+ **kwargs # 将额外参数传递给处理函数
|
|
)
|
|
)
|
|
-
|
|
|
|
- if success:
|
|
|
|
- print("CSV文件加载成功")
|
|
|
|
- sys.exit(0)
|
|
|
|
- else:
|
|
|
|
- print("CSV文件加载失败")
|
|
|
|
- sys.exit(1)
|
|
|
|
|
|
+
|
|
|
|
+ end_time = datetime.now()
|
|
|
|
+ logger.info(f"脚本 '{script_name}' (模拟) 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
+ logger.info(f"总耗时: {end_time - start_time}")
|
|
|
|
+ logger.info(f"处理结果: {'成功' if result else '失败'}")
|
|
|
|
+
|
|
|
|
+ return result
|
|
|
|
+
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
+ # 提供一些默认值以便直接运行脚本进行测试
|
|
|
|
+ test_params = {
|
|
|
|
+ "table_name": "sample_table",
|
|
|
|
+ "execution_mode": "full_refresh",
|
|
|
|
+ "exec_date": datetime.now().strftime('%Y-%m-%d'),
|
|
|
|
+ "target_type": "structure",
|
|
|
|
+ "storage_location": "/path/to/mock/file.csv",
|
|
|
|
+ "frequency": "daily",
|
|
|
|
+ "script_name": os.path.basename(__file__),
|
|
|
|
+ "custom_param": "abc",
|
|
|
|
+ "another_param": 456
|
|
|
|
+ }
|
|
|
|
+ logger.info(f"以主脚本方式运行 (模拟),使用测试参数: {test_params}")
|
|
|
|
+ run(**test_params)
|