load_file.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. import pandas as pd
  7. import psycopg2
  8. from datetime import datetime
  9. import csv
  10. import glob
  11. import shutil
  12. # 配置日志记录器
  13. logging.basicConfig(
  14. level=logging.INFO,
  15. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  16. handlers=[
  17. logging.StreamHandler(sys.stdout)
  18. ]
  19. )
  20. logger = logging.getLogger("load_file")
  21. # 添加健壮的导入机制
  22. def get_config():
  23. """
  24. 从config模块导入配置
  25. 返回:
  26. tuple: (PG_CONFIG, STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH)
  27. """
  28. # 默认配置
  29. default_pg_config = {
  30. "host": "localhost",
  31. "port": 5432,
  32. "user": "postgres",
  33. "password": "postgres",
  34. "database": "dataops",
  35. }
  36. default_upload_path = '/tmp/uploads'
  37. default_archive_path = '/tmp/uploads/archive'
  38. try:
  39. # 动态导入,避免IDE警告
  40. config = __import__('config')
  41. logger.info("从config模块直接导入配置")
  42. pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
  43. upload_path = getattr(config, 'STRUCTURE_UPLOAD_BASE_PATH', default_upload_path)
  44. archive_path = getattr(config, 'STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH', default_archive_path)
  45. return pg_config, upload_path, archive_path
  46. except ImportError:
  47. # 使用默认配置
  48. logger.warning("无法导入config模块,使用默认值")
  49. return default_pg_config, default_upload_path, default_archive_path
  50. # 导入配置
  51. PG_CONFIG, STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH = get_config()
  52. logger.info(f"配置加载完成: 上传路径={STRUCTURE_UPLOAD_BASE_PATH}, 归档路径={STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
  53. def get_pg_conn():
  54. """获取PostgreSQL连接"""
  55. return psycopg2.connect(**PG_CONFIG)
  56. def get_table_columns(table_name):
  57. """
  58. 获取表的列信息,包括列名和注释
  59. 返回:
  60. dict: {列名: 列注释} 的字典
  61. """
  62. conn = get_pg_conn()
  63. cursor = conn.cursor()
  64. try:
  65. # 查询表列信息
  66. cursor.execute("""
  67. SELECT
  68. column_name,
  69. col_description((table_schema || '.' || table_name)::regclass::oid, ordinal_position) as column_comment
  70. FROM
  71. information_schema.columns
  72. WHERE
  73. table_schema = 'public' -- 明确指定 schema,如果需要
  74. AND table_name = %s
  75. ORDER BY
  76. ordinal_position
  77. """, (table_name.lower(),))
  78. columns = {}
  79. for row in cursor.fetchall():
  80. col_name = row[0]
  81. col_comment = row[1] if row[1] else col_name # 如果注释为空,使用列名
  82. columns[col_name] = col_comment
  83. if not columns:
  84. logger.warning(f"未能获取到表 '{table_name}' 的列信息,请检查表是否存在、schema是否正确以及权限。")
  85. return columns
  86. except Exception as e:
  87. logger.error(f"获取表 '{table_name}' 的列信息时出错: {str(e)}")
  88. return {}
  89. finally:
  90. cursor.close()
  91. conn.close()
  92. def match_csv_columns(csv_headers, table_columns):
  93. """
  94. 匹配CSV列名与表列名
  95. 策略:
  96. 1. 尝试通过表字段注释匹配CSV列名 (忽略大小写和空格)
  97. 2. 尝试通过名称直接匹配 (忽略大小写和空格)
  98. 参数:
  99. csv_headers (list): CSV文件的列名列表
  100. table_columns (dict): {数据库列名: 列注释} 的字典
  101. 返回:
  102. dict: {CSV列名: 数据库列名} 的映射字典
  103. """
  104. mapping = {}
  105. matched_table_cols = set()
  106. # 数据库列名通常不区分大小写(除非加引号),注释可能区分
  107. # 为了匹配更健壮,我们将CSV和数据库列名/注释都转为小写处理
  108. processed_table_columns_lower = {col.lower(): col for col in table_columns.keys()}
  109. processed_comment_to_column_lower = {
  110. str(comment).lower(): col
  111. for col, comment in table_columns.items() if comment
  112. }
  113. # 预处理 CSV headers
  114. processed_csv_headers_lower = {str(header).lower(): header for header in csv_headers}
  115. # 1. 通过注释匹配 (忽略大小写)
  116. for processed_header, original_header in processed_csv_headers_lower.items():
  117. if processed_header in processed_comment_to_column_lower:
  118. table_col_original_case = processed_comment_to_column_lower[processed_header]
  119. if table_col_original_case not in matched_table_cols:
  120. mapping[original_header] = table_col_original_case
  121. matched_table_cols.add(table_col_original_case)
  122. logger.info(f"通过注释匹配: CSV 列 '{original_header}' -> 表列 '{table_col_original_case}'")
  123. # 2. 通过名称直接匹配 (忽略大小写),仅匹配尚未映射的列
  124. for processed_header, original_header in processed_csv_headers_lower.items():
  125. if original_header not in mapping: # 仅当此 CSV 列尚未映射时才进行名称匹配
  126. if processed_header in processed_table_columns_lower:
  127. table_col_original_case = processed_table_columns_lower[processed_header]
  128. if table_col_original_case not in matched_table_cols:
  129. mapping[original_header] = table_col_original_case
  130. matched_table_cols.add(table_col_original_case)
  131. logger.info(f"通过名称匹配: CSV 列 '{original_header}' -> 表列 '{table_col_original_case}'")
  132. unmapped_csv = [h for h in csv_headers if h not in mapping]
  133. if unmapped_csv:
  134. logger.warning(f"以下 CSV 列未能匹配到表列: {unmapped_csv}")
  135. unmapped_table = [col for col in table_columns if col not in matched_table_cols]
  136. if unmapped_table:
  137. logger.warning(f"以下表列未能匹配到 CSV 列: {unmapped_table}")
  138. return mapping
  139. def load_csv_to_table(csv_file, table_name, execution_mode='append'):
  140. """
  141. 将单个CSV文件数据加载到目标表
  142. 参数:
  143. csv_file (str): CSV文件路径
  144. table_name (str): 目标表名 (大小写可能敏感,取决于数据库)
  145. execution_mode (str): 执行模式,'append'或'full_refresh'
  146. 返回:
  147. bool: 成功返回True,失败返回False
  148. """
  149. conn = None
  150. cursor = None # 初始化 cursor
  151. logger.info(f"开始处理文件: {csv_file}")
  152. try:
  153. # 读取CSV文件,尝试自动检测编码
  154. try:
  155. # 使用 dtype=str 确保所有列按字符串读取,避免类型推断问题,特别是对于ID类字段
  156. df = pd.read_csv(csv_file, encoding='utf-8', keep_default_na=False, na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
  157. except UnicodeDecodeError:
  158. try:
  159. logger.warning(f"UTF-8 读取失败,尝试 GBK: {csv_file}")
  160. df = pd.read_csv(csv_file, encoding='gbk', keep_default_na=False, na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
  161. except UnicodeDecodeError:
  162. logger.warning(f"GBK 读取也失败,尝试 latin1: {csv_file}")
  163. df = pd.read_csv(csv_file, encoding='latin1', keep_default_na=False, na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
  164. except Exception as read_err:
  165. logger.error(f"读取 CSV 文件 {csv_file} 时发生未知错误: {str(read_err)}")
  166. return False
  167. logger.info(f"成功读取CSV文件: {os.path.basename(csv_file)}, 共 {len(df)} 行")
  168. # 清理列名中的潜在空白符
  169. df.columns = df.columns.str.strip()
  170. # 如果CSV为空,则直接认为成功并返回
  171. if df.empty:
  172. logger.info(f"CSV 文件 {csv_file} 为空,无需加载数据。")
  173. return True
  174. # 获取CSV列名 (清理后)
  175. csv_headers = df.columns.tolist()
  176. logger.info(f"清理后的 CSV 列名: {csv_headers}")
  177. # 获取表结构
  178. table_columns = get_table_columns(table_name)
  179. if not table_columns:
  180. logger.error(f"无法获取表 '{table_name}' 的列信息,跳过文件 {csv_file}")
  181. return False
  182. logger.info(f"表 '{table_name}' 的列信息 (列名: 注释): {table_columns}")
  183. # 匹配CSV列与表列
  184. column_mapping = match_csv_columns(csv_headers, table_columns)
  185. logger.info(f"列映射关系 (CSV列名: 表列名): {column_mapping}")
  186. # 检查是否有任何列成功映射
  187. if not column_mapping:
  188. logger.error(f"文件 {csv_file} 的列无法与表 '{table_name}' 的列建立任何映射关系,跳过此文件。")
  189. return False # 如果一个都没匹配上,则认为失败
  190. # 仅选择成功映射的列进行加载
  191. mapped_csv_headers = list(column_mapping.keys())
  192. # 使用 .copy() 避免 SettingWithCopyWarning
  193. df_mapped = df[mapped_csv_headers].copy()
  194. df_mapped.rename(columns=column_mapping, inplace=True)
  195. logger.info(f"将加载以下映射后的列: {df_mapped.columns.tolist()}")
  196. # 将空字符串 '' 替换为 None,以便插入数据库时为 NULL
  197. # 使用 map 替代已废弃的 applymap 方法
  198. # 对每一列单独应用 map 函数
  199. for col in df_mapped.columns:
  200. df_mapped[col] = df_mapped[col].map(lambda x: None if isinstance(x, str) and x == '' else x)
  201. # 连接数据库
  202. conn = get_pg_conn()
  203. cursor = conn.cursor()
  204. # 根据执行模式确定操作 - 注意:full_refresh 在 run 函数层面控制,这里仅处理单个文件追加
  205. # if execution_mode == 'full_refresh':
  206. # logger.warning(f"在 load_csv_to_table 中收到 full_refresh,但清空操作应在 run 函数完成。此处按 append 处理文件:{csv_file}")
  207. # # 如果是全量刷新,先清空表 - 这个逻辑移到 run 函数
  208. # logger.info(f"执行全量刷新,清空表 {table_name}")
  209. # cursor.execute(f"TRUNCATE TABLE {table_name}")
  210. # 构建INSERT语句
  211. # 使用原始大小写的数据库列名(从 column_mapping 的 value 获取)并加引号
  212. columns = ', '.join([f'"{col}"' for col in df_mapped.columns])
  213. placeholders = ', '.join(['%s'] * len(df_mapped.columns))
  214. # 假设表在 public schema,并为表名加引号以处理大小写或特殊字符
  215. insert_sql = f'INSERT INTO public."{table_name}" ({columns}) VALUES ({placeholders})'
  216. # 批量插入数据
  217. # df_mapped.values 会产生 numpy array,需要转换为 list of tuples
  218. # 确保 None 值正确传递
  219. rows = [tuple(row) for row in df_mapped.values]
  220. try:
  221. cursor.executemany(insert_sql, rows)
  222. conn.commit()
  223. logger.info(f"成功将文件 {os.path.basename(csv_file)} 的 {len(rows)} 行数据插入到表 '{table_name}'")
  224. except Exception as insert_err:
  225. logger.error(f"向表 '{table_name}' 插入数据时出错: {str(insert_err)}")
  226. logger.error(f"出错的 SQL 语句大致为: {insert_sql}")
  227. # 可以考虑记录前几行出错的数据 (注意隐私和日志大小)
  228. try:
  229. logger.error(f"出错的前3行数据 (部分): {rows[:3]}")
  230. except: pass # 防御性编程
  231. conn.rollback() # 回滚事务
  232. return False # 插入失败则返回 False
  233. return True
  234. except pd.errors.EmptyDataError:
  235. logger.info(f"CSV 文件 {csv_file} 为空或只有表头,无需加载数据。")
  236. return True # 空文件视为成功处理
  237. except Exception as e:
  238. # 使用 exc_info=True 获取更详细的堆栈跟踪信息
  239. logger.error(f"处理文件 {csv_file} 加载到表 '{table_name}' 时发生意外错误", exc_info=True)
  240. if conn:
  241. conn.rollback()
  242. return False
  243. finally:
  244. if cursor:
  245. cursor.close()
  246. if conn:
  247. conn.close()
  248. def run(table_name, execution_mode='append', exec_date=None, target_type=None,
  249. storage_location=None, frequency=None, script_name=None, **kwargs):
  250. """
  251. 统一入口函数,支持通配符路径,处理并归档文件
  252. """
  253. if script_name is None:
  254. script_name = os.path.basename(__file__)
  255. # 修正之前的日志记录格式错误
  256. exec_mode_str = '全量刷新' if execution_mode == 'full_refresh' else '增量追加'
  257. logger.info(f"===== 开始执行 {script_name} ({exec_mode_str}) =====")
  258. logger.info(f"表名: {table_name}")
  259. logger.info(f"执行模式: {execution_mode}")
  260. logger.info(f"执行日期: {exec_date}")
  261. logger.info(f"目标类型: {target_type}")
  262. logger.info(f"资源类型: {target_type}, 文件相对路径模式: {storage_location}")
  263. logger.info(f"基准上传路径: {STRUCTURE_UPLOAD_BASE_PATH}")
  264. logger.info(f"基准归档路径: {STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
  265. logger.info(f"更新频率: {frequency}")
  266. # 记录其他参数
  267. for key, value in kwargs.items():
  268. logger.info(f"其他参数 - {key}: {value}")
  269. # 检查必要参数
  270. if not storage_location:
  271. logger.error("未提供 storage_location (文件查找路径模式)")
  272. return False
  273. if not STRUCTURE_UPLOAD_BASE_PATH:
  274. logger.error("配置错误: STRUCTURE_UPLOAD_BASE_PATH 未设置")
  275. return False
  276. if not STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH:
  277. logger.error("配置错误: STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH 未设置")
  278. return False
  279. # 记录执行开始时间
  280. overall_start_time = datetime.now()
  281. # 构建完整搜索路径
  282. # 使用 os.path.normpath 确保路径分隔符正确
  283. # 如果storage_location以斜杠开头,移除开头的斜杠以避免被当作绝对路径处理
  284. if storage_location.startswith('/'):
  285. storage_location = storage_location.lstrip('/')
  286. logger.info(f"检测到storage_location以斜杠开头,已移除: {storage_location}")
  287. full_search_pattern = os.path.normpath(os.path.join(STRUCTURE_UPLOAD_BASE_PATH, storage_location))
  288. logger.info(f"完整文件搜索模式: {full_search_pattern}")
  289. # 检查路径是否存在(至少目录部分)
  290. search_dir = os.path.dirname(full_search_pattern)
  291. if not os.path.exists(search_dir):
  292. error_msg = f"错误: 搜索目录不存在: {search_dir}"
  293. logger.error(error_msg)
  294. raise FileNotFoundError(error_msg) # 抛出异常而不是返回False
  295. # 查找匹配的文件
  296. try:
  297. # 增加 recursive=True 如果需要递归查找子目录中的文件 (例如 storage_location 是 a/b/**/*.csv)
  298. # 当前假设模式只在指定目录下匹配,例如 /data/subdir/*.csv
  299. found_files = glob.glob(full_search_pattern, recursive=False)
  300. except Exception as glob_err:
  301. logger.error(f"查找文件时发生错误 (模式: {full_search_pattern}): {str(glob_err)}")
  302. raise # 重新抛出异常
  303. if not found_files:
  304. logger.warning(f"在目录 {search_dir} 下未找到匹配模式 '{os.path.basename(full_search_pattern)}' 的文件")
  305. return True # 找不到文件视为正常情况,返回成功
  306. logger.info(f"找到 {len(found_files)} 个匹配文件: {found_files}")
  307. # 如果是全量刷新,在处理任何文件前清空表
  308. if execution_mode == 'full_refresh':
  309. conn = None
  310. cursor = None
  311. try:
  312. conn = get_pg_conn()
  313. cursor = conn.cursor()
  314. # 假设表在 public schema,并为表名加引号
  315. logger.info(f"执行全量刷新,清空表 public.\"{table_name}\"")
  316. cursor.execute(f'TRUNCATE TABLE public.\"{table_name}\"')
  317. conn.commit()
  318. logger.info("表 public.\"" + table_name + "\" 已清空。")
  319. except Exception as e:
  320. logger.error("清空表 public.\"" + table_name + "\" 时出错: " + str(e))
  321. if conn:
  322. conn.rollback()
  323. return False # 清空失败则直接失败退出
  324. finally:
  325. if cursor:
  326. cursor.close()
  327. if conn:
  328. conn.close()
  329. # 处理并归档每个找到的文件
  330. processed_files_count = 0
  331. failed_files = []
  332. for file_path in found_files:
  333. file_start_time = datetime.now()
  334. # 使用 normpath 统一路径表示
  335. normalized_file_path = os.path.normpath(file_path)
  336. logger.info(f"--- 开始处理文件: {os.path.basename(normalized_file_path)} ---")
  337. try:
  338. # 加载CSV数据到表 (注意:full_refresh时也是append模式加载,因为表已清空)
  339. load_success = load_csv_to_table(normalized_file_path, table_name, 'append')
  340. if load_success:
  341. logger.info(f"文件 {os.path.basename(normalized_file_path)} 加载成功。")
  342. processed_files_count += 1
  343. # 归档文件
  344. try:
  345. # 计算相对路径部分 (storage_location 可能包含子目录)
  346. # 使用 os.path.dirname 获取 storage_location 的目录部分
  347. relative_dir = os.path.dirname(storage_location)
  348. # 获取当前日期
  349. date_str = datetime.now().strftime('%Y-%m-%d')
  350. # 构建归档目录路径
  351. archive_dir = os.path.normpath(os.path.join(STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH, relative_dir, date_str))
  352. # 创建归档目录(如果不存在)
  353. os.makedirs(archive_dir, exist_ok=True)
  354. # 获取当前unix时间戳并转换为字符串
  355. unix_timestamp = int(datetime.now().timestamp())
  356. # 修改文件名,添加时间戳
  357. original_filename = os.path.basename(normalized_file_path)
  358. filename_parts = os.path.splitext(original_filename)
  359. new_filename = f"{filename_parts[0]}_{unix_timestamp}{filename_parts[1]}"
  360. # 构建文件在归档目录中的最终路径
  361. archive_dest_path = os.path.join(archive_dir, new_filename)
  362. # 移动文件
  363. shutil.move(normalized_file_path, archive_dest_path)
  364. logger.info(f"文件已成功移动到归档目录并重命名: {archive_dest_path}")
  365. except Exception as move_err:
  366. # 记录错误,但由于数据已加载,不将整体任务标记为失败
  367. logger.error(f"加载文件 {os.path.basename(normalized_file_path)} 成功,但移动到归档目录时出错", exc_info=True)
  368. logger.error(f"原始文件路径: {normalized_file_path}")
  369. else:
  370. logger.error(f"文件 {os.path.basename(normalized_file_path)} 加载失败,中止处理。")
  371. # 修改:任何一个文件加载失败就直接返回 False
  372. # 记录最终统计
  373. overall_end_time = datetime.now()
  374. overall_duration = (overall_end_time - overall_start_time).total_seconds()
  375. logger.info(f"===== {script_name} 执行完成 (失败) =====")
  376. logger.info(f"总耗时: {overall_duration:.2f}秒")
  377. logger.info(f"共找到文件: {len(found_files)}")
  378. logger.info(f"成功处理文件数: {processed_files_count}")
  379. logger.error(f"首个失败文件: {os.path.basename(normalized_file_path)}")
  380. return False
  381. except Exception as file_proc_err:
  382. logger.error(f"处理文件 {os.path.basename(normalized_file_path)} 时发生意外错误", exc_info=True)
  383. # 修改:任何一个文件处理异常就直接返回 False
  384. # 记录最终统计
  385. overall_end_time = datetime.now()
  386. overall_duration = (overall_end_time - overall_start_time).total_seconds()
  387. logger.info(f"===== {script_name} 执行完成 (异常) =====")
  388. logger.info(f"总耗时: {overall_duration:.2f}秒")
  389. logger.info(f"共找到文件: {len(found_files)}")
  390. logger.info(f"成功处理文件数: {processed_files_count}")
  391. logger.error(f"首个异常文件: {os.path.basename(normalized_file_path)}")
  392. return False
  393. finally:
  394. file_end_time = datetime.now()
  395. file_duration = (file_end_time - file_start_time).total_seconds()
  396. logger.info(f"--- 文件 {os.path.basename(normalized_file_path)} 处理结束,耗时: {file_duration:.2f}秒 ---")
  397. # 记录总体执行结果(全部成功的情况)
  398. overall_end_time = datetime.now()
  399. overall_duration = (overall_end_time - overall_start_time).total_seconds()
  400. logger.info(f"===== {script_name} 执行完成 (成功) =====")
  401. logger.info(f"总耗时: {overall_duration:.2f}秒")
  402. logger.info(f"共找到文件: {len(found_files)}")
  403. logger.info(f"成功处理文件数: {processed_files_count}")
  404. return True # 所有文件处理成功
  405. if __name__ == "__main__":
  406. # 直接执行时的测试代码
  407. import argparse
  408. parser = argparse.ArgumentParser(description='从CSV文件加载数据到表(支持通配符)')
  409. parser.add_argument('--table', type=str, required=True, help='目标表名')
  410. parser.add_argument('--pattern', type=str, required=True, help='CSV文件查找模式 (相对于基准上传路径的相对路径,例如: data/*.csv 或 *.csv)')
  411. parser.add_argument('--mode', type=str, default='append', choices=['append', 'full_refresh'], help='执行模式: append 或 full_refresh')
  412. args = parser.parse_args()
  413. # 构造必要的 kwargs
  414. run_kwargs = {
  415. "table_name": args.table,
  416. "execution_mode": args.mode,
  417. "storage_location": args.pattern,
  418. "target_type": 'structure',
  419. "exec_date": datetime.now().strftime('%Y-%m-%d'),
  420. "frequency": "manual",
  421. "script_name": os.path.basename(__file__)
  422. }
  423. logger.info("命令行测试执行参数: " + str(run_kwargs))
  424. success = run(**run_kwargs)
  425. if success:
  426. print("CSV文件加载任务执行完毕,所有文件处理成功。")
  427. sys.exit(0)
  428. else:
  429. print("CSV文件加载任务执行完毕,但有部分或全部文件处理失败。")
  430. sys.exit(1)