load_file.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. import pandas as pd
  7. import psycopg2
  8. from datetime import datetime
  9. import csv
  10. import glob
  11. import shutil
  12. import re
  13. # 配置日志记录器
  14. logging.basicConfig(
  15. level=logging.INFO,
  16. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  17. handlers=[
  18. logging.StreamHandler(sys.stdout)
  19. ]
  20. )
  21. logger = logging.getLogger("load_file")
  22. # 添加健壮的导入机制
  23. def get_config():
  24. """
  25. 从config模块导入配置
  26. 返回:
  27. tuple: (PG_CONFIG, STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH)
  28. """
  29. # 默认配置
  30. default_pg_config = {
  31. "host": "localhost",
  32. "port": 5432,
  33. "user": "postgres",
  34. "password": "postgres",
  35. "database": "dataops",
  36. }
  37. default_upload_path = '/tmp/uploads'
  38. default_archive_path = '/tmp/uploads/archive'
  39. try:
  40. # 动态导入,避免IDE警告
  41. config = __import__('config')
  42. logger.info("从config模块直接导入配置")
  43. pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
  44. upload_path = getattr(config, 'STRUCTURE_UPLOAD_BASE_PATH', default_upload_path)
  45. archive_path = getattr(config, 'STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH', default_archive_path)
  46. return pg_config, upload_path, archive_path
  47. except ImportError:
  48. # 使用默认配置
  49. logger.warning("无法导入config模块,使用默认值")
  50. return default_pg_config, default_upload_path, default_archive_path
  51. # 导入配置
  52. PG_CONFIG, STRUCTURE_UPLOAD_BASE_PATH, STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH = get_config()
  53. logger.info(f"配置加载完成: 上传路径={STRUCTURE_UPLOAD_BASE_PATH}, 归档路径={STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
  54. def get_pg_conn():
  55. """获取PostgreSQL连接"""
  56. return psycopg2.connect(**PG_CONFIG)
  57. def get_table_columns(table_name):
  58. """
  59. 获取表的列信息,包括列名和注释
  60. 返回:
  61. dict: {列名: 列注释} 的字典
  62. """
  63. conn = get_pg_conn()
  64. cursor = conn.cursor()
  65. try:
  66. # 查询表列信息
  67. cursor.execute("""
  68. SELECT
  69. column_name,
  70. col_description((table_schema || '.' || table_name)::regclass::oid, ordinal_position) as column_comment
  71. FROM
  72. information_schema.columns
  73. WHERE
  74. table_schema = 'public' -- 明确指定 schema,如果需要
  75. AND table_name = %s
  76. ORDER BY
  77. ordinal_position
  78. """, (table_name.lower(),))
  79. columns = {}
  80. for row in cursor.fetchall():
  81. col_name = row[0]
  82. col_comment = row[1] if row[1] else col_name # 如果注释为空,使用列名
  83. columns[col_name] = col_comment
  84. if not columns:
  85. logger.warning(f"未能获取到表 '{table_name}' 的列信息,请检查表是否存在、schema是否正确以及权限。")
  86. return columns
  87. except Exception as e:
  88. logger.error(f"获取表 '{table_name}' 的列信息时出错: {str(e)}")
  89. return {}
  90. finally:
  91. cursor.close()
  92. conn.close()
  93. def match_file_columns(file_headers, table_columns):
  94. """
  95. 匹配文件列名与表列名
  96. 策略:
  97. 1. 尝试通过表字段注释匹配文件列名 (忽略大小写和空格)
  98. 2. 尝试通过名称直接匹配 (忽略大小写和空格)
  99. 参数:
  100. file_headers (list): 文件的列名列表
  101. table_columns (dict): {数据库列名: 列注释} 的字典
  102. 返回:
  103. dict: {文件列名: 数据库列名} 的映射字典
  104. """
  105. mapping = {}
  106. matched_table_cols = set()
  107. # 数据库列名通常不区分大小写(除非加引号),注释可能区分
  108. # 为了匹配更健壮,我们将文件和数据库列名/注释都转为小写处理
  109. processed_table_columns_lower = {col.lower(): col for col in table_columns.keys()}
  110. processed_comment_to_column_lower = {
  111. str(comment).lower(): col
  112. for col, comment in table_columns.items() if comment
  113. }
  114. # 预处理文件headers
  115. processed_file_headers_lower = {str(header).lower(): header for header in file_headers}
  116. # 1. 通过注释匹配 (忽略大小写)
  117. for processed_header, original_header in processed_file_headers_lower.items():
  118. if processed_header in processed_comment_to_column_lower:
  119. table_col_original_case = processed_comment_to_column_lower[processed_header]
  120. if table_col_original_case not in matched_table_cols:
  121. mapping[original_header] = table_col_original_case
  122. matched_table_cols.add(table_col_original_case)
  123. logger.info(f"通过注释匹配: 文件列 '{original_header}' -> 表列 '{table_col_original_case}'")
  124. # 2. 通过名称直接匹配 (忽略大小写),仅匹配尚未映射的列
  125. for processed_header, original_header in processed_file_headers_lower.items():
  126. if original_header not in mapping: # 仅当此文件列尚未映射时才进行名称匹配
  127. if processed_header in processed_table_columns_lower:
  128. table_col_original_case = processed_table_columns_lower[processed_header]
  129. if table_col_original_case not in matched_table_cols:
  130. mapping[original_header] = table_col_original_case
  131. matched_table_cols.add(table_col_original_case)
  132. logger.info(f"通过名称匹配: 文件列 '{original_header}' -> 表列 '{table_col_original_case}'")
  133. unmapped_file = [h for h in file_headers if h not in mapping]
  134. if unmapped_file:
  135. logger.warning(f"以下文件列未能匹配到表列: {unmapped_file}")
  136. unmapped_table = [col for col in table_columns if col not in matched_table_cols]
  137. if unmapped_table:
  138. logger.warning(f"以下表列未能匹配到文件列: {unmapped_table}")
  139. return mapping
  140. def read_excel_file(excel_file, sheet_name=0):
  141. """
  142. 读取Excel文件内容
  143. 参数:
  144. excel_file (str): Excel文件路径
  145. sheet_name: 工作表名称或索引,默认为第一个工作表
  146. 返回:
  147. pandas.DataFrame: 读取的数据
  148. """
  149. try:
  150. # 尝试读取Excel文件,使用dtype=str确保所有列按字符串读取
  151. df = pd.read_excel(excel_file, sheet_name=sheet_name, keep_default_na=False,
  152. na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
  153. logger.info(f"成功读取Excel文件: {os.path.basename(excel_file)}, 共 {len(df)} 行")
  154. return df
  155. except Exception as e:
  156. logger.error(f"读取Excel文件 {excel_file} 时发生错误: {str(e)}")
  157. raise
  158. def read_csv_file(csv_file):
  159. """
  160. 读取CSV文件内容,尝试自动检测编码
  161. 参数:
  162. csv_file (str): CSV文件路径
  163. 返回:
  164. pandas.DataFrame: 读取的数据
  165. """
  166. try:
  167. # 使用 dtype=str 确保所有列按字符串读取,避免类型推断问题
  168. df = pd.read_csv(csv_file, encoding='utf-8', keep_default_na=False,
  169. na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
  170. return df
  171. except UnicodeDecodeError:
  172. try:
  173. logger.warning(f"UTF-8 读取失败,尝试 GBK: {csv_file}")
  174. df = pd.read_csv(csv_file, encoding='gbk', keep_default_na=False,
  175. na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
  176. return df
  177. except UnicodeDecodeError:
  178. logger.warning(f"GBK 读取也失败,尝试 latin1: {csv_file}")
  179. df = pd.read_csv(csv_file, encoding='latin1', keep_default_na=False,
  180. na_values=[r'\N', '', 'NULL', 'null'], dtype=str)
  181. return df
  182. except Exception as e:
  183. logger.error(f"读取CSV文件 {csv_file} 时发生错误: {str(e)}")
  184. raise
  185. def load_dataframe_to_table(df, file_path, table_name):
  186. """
  187. 将DataFrame数据加载到目标表
  188. 参数:
  189. df (pandas.DataFrame): 需要加载的数据
  190. file_path (str): 源文件路径(仅用于日志记录)
  191. table_name (str): 目标表名
  192. 返回:
  193. bool: 成功返回True,失败返回False
  194. """
  195. conn = None
  196. cursor = None
  197. try:
  198. # 清理列名中的潜在空白符
  199. df.columns = df.columns.str.strip()
  200. # 如果DataFrame为空,则直接认为成功并返回
  201. if df.empty:
  202. logger.info(f"文件 {file_path} 为空,无需加载数据。")
  203. return True
  204. # 获取文件列名(清理后)
  205. file_headers = df.columns.tolist()
  206. logger.info(f"清理后的文件列名: {file_headers}")
  207. # 获取表结构
  208. table_columns = get_table_columns(table_name)
  209. if not table_columns:
  210. logger.error(f"无法获取表 '{table_name}' 的列信息,跳过文件 {file_path}")
  211. return False
  212. # 匹配文件列与表列
  213. column_mapping = match_file_columns(file_headers, table_columns)
  214. # 检查是否有任何列成功映射
  215. if not column_mapping:
  216. logger.error(f"文件 {file_path} 的列无法与表 '{table_name}' 的列建立任何映射关系,跳过此文件。")
  217. return False
  218. # 仅选择成功映射的列进行加载
  219. mapped_file_headers = list(column_mapping.keys())
  220. # 避免SettingWithCopyWarning
  221. df_mapped = df[mapped_file_headers].copy()
  222. df_mapped.rename(columns=column_mapping, inplace=True)
  223. logger.info(f"将加载以下映射后的列: {df_mapped.columns.tolist()}")
  224. # 将空字符串替换为None
  225. for col in df_mapped.columns:
  226. df_mapped[col] = df_mapped[col].map(lambda x: None if isinstance(x, str) and x == '' else x)
  227. # 连接数据库
  228. conn = get_pg_conn()
  229. cursor = conn.cursor()
  230. # 构建INSERT语句
  231. columns = ', '.join([f'"{col}"' for col in df_mapped.columns])
  232. placeholders = ', '.join(['%s'] * len(df_mapped.columns))
  233. insert_sql = f'INSERT INTO public."{table_name}" ({columns}) VALUES ({placeholders})'
  234. # 批量插入数据
  235. rows = [tuple(row) for row in df_mapped.values]
  236. try:
  237. cursor.executemany(insert_sql, rows)
  238. conn.commit()
  239. logger.info(f"成功将文件 {os.path.basename(file_path)} 的 {len(rows)} 行数据插入到表 '{table_name}'")
  240. return True
  241. except Exception as insert_err:
  242. logger.error(f"向表 '{table_name}' 插入数据时出错: {str(insert_err)}")
  243. logger.error(f"出错的 SQL 语句大致为: {insert_sql}")
  244. try:
  245. logger.error(f"出错的前3行数据 (部分): {rows[:3]}")
  246. except:
  247. pass
  248. conn.rollback()
  249. return False
  250. except Exception as e:
  251. logger.error(f"处理文件 {file_path} 加载到表 '{table_name}' 时发生意外错误", exc_info=True)
  252. if conn:
  253. conn.rollback()
  254. return False
  255. finally:
  256. if cursor:
  257. cursor.close()
  258. if conn:
  259. conn.close()
  260. def load_file_to_table(file_path, table_name, execution_mode='append'):
  261. """
  262. 根据文件类型,加载文件数据到目标表
  263. 参数:
  264. file_path (str): 文件路径
  265. table_name (str): 目标表名
  266. execution_mode (str): 执行模式,'append'或'full_refresh'
  267. 返回:
  268. bool: 成功返回True,失败返回False
  269. """
  270. logger.info(f"开始处理文件: {file_path}")
  271. try:
  272. file_extension = os.path.splitext(file_path)[1].lower()
  273. # 根据文件扩展名选择合适的加载方法
  274. if file_extension == '.csv':
  275. # CSV文件处理
  276. df = read_csv_file(file_path)
  277. return load_dataframe_to_table(df, file_path, table_name)
  278. elif file_extension in ['.xlsx', '.xls']:
  279. # Excel文件处理
  280. df = read_excel_file(file_path)
  281. return load_dataframe_to_table(df, file_path, table_name)
  282. else:
  283. logger.error(f"不支持的文件类型: {file_extension},文件: {file_path}")
  284. return False
  285. except pd.errors.EmptyDataError:
  286. logger.info(f"文件 {file_path} 为空或只有表头,无需加载数据。")
  287. return True
  288. except Exception as e:
  289. logger.error(f"处理文件 {file_path} 时发生意外错误", exc_info=True)
  290. return False
  291. def run(table_name, execution_mode='append', exec_date=None, target_type=None,
  292. storage_location=None, frequency=None, script_name=None, **kwargs):
  293. """
  294. 统一入口函数,支持通配符路径,处理并归档文件
  295. """
  296. if script_name is None:
  297. script_name = os.path.basename(__file__)
  298. # 修正之前的日志记录格式错误
  299. exec_mode_str = '全量刷新' if execution_mode == 'full_refresh' else '增量追加'
  300. logger.info(f"===== 开始执行 {script_name} ({exec_mode_str}) =====")
  301. logger.info(f"表名: {table_name}")
  302. logger.info(f"执行模式: {execution_mode}")
  303. logger.info(f"执行日期: {exec_date}")
  304. logger.info(f"目标类型: {target_type}")
  305. logger.info(f"资源类型: {target_type}, 文件相对路径模式: {storage_location}")
  306. logger.info(f"基准上传路径: {STRUCTURE_UPLOAD_BASE_PATH}")
  307. logger.info(f"基准归档路径: {STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
  308. logger.info(f"更新频率: {frequency}")
  309. # 记录其他参数
  310. for key, value in kwargs.items():
  311. logger.info(f"其他参数 - {key}: {value}")
  312. # 检查必要参数
  313. if not storage_location:
  314. logger.error("未提供 storage_location (文件查找路径模式)")
  315. return False
  316. if not STRUCTURE_UPLOAD_BASE_PATH:
  317. logger.error("配置错误: STRUCTURE_UPLOAD_BASE_PATH 未设置")
  318. return False
  319. if not STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH:
  320. logger.error("配置错误: STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH 未设置")
  321. return False
  322. # 记录执行开始时间
  323. overall_start_time = datetime.now()
  324. # 构建完整搜索路径
  325. # 使用 os.path.normpath 确保路径分隔符正确
  326. # 如果storage_location以斜杠开头,移除开头的斜杠以避免被当作绝对路径处理
  327. if storage_location.startswith('/'):
  328. storage_location = storage_location.lstrip('/')
  329. logger.info(f"检测到storage_location以斜杠开头,已移除: {storage_location}")
  330. # 检查storage_location是否包含扩展名
  331. has_extension = bool(re.search(r'\.[a-zA-Z0-9]+$', storage_location))
  332. full_search_patterns = []
  333. if has_extension:
  334. # 如果指定了扩展名,使用原始模式
  335. full_search_patterns.append(os.path.normpath(os.path.join(STRUCTURE_UPLOAD_BASE_PATH, storage_location)))
  336. else:
  337. # 如果没有指定扩展名,自动添加所有支持的扩展名
  338. base_pattern = storage_location.rstrip('/')
  339. if base_pattern.endswith('*'):
  340. # 如果已经以*结尾,添加扩展名
  341. for ext in ['.csv', '.xlsx', '.xls']:
  342. pattern = os.path.normpath(os.path.join(STRUCTURE_UPLOAD_BASE_PATH, f"{base_pattern}{ext}"))
  343. full_search_patterns.append(pattern)
  344. else:
  345. # 如果不以*结尾,添加/*.扩展名
  346. if not base_pattern.endswith('/*'):
  347. base_pattern = f"{base_pattern}/*"
  348. for ext in ['.csv', '.xlsx', '.xls']:
  349. pattern = os.path.normpath(os.path.join(STRUCTURE_UPLOAD_BASE_PATH, f"{base_pattern}{ext}"))
  350. full_search_patterns.append(pattern)
  351. logger.info(f"完整文件搜索模式: {full_search_patterns}")
  352. # 查找匹配的文件
  353. found_files = []
  354. for search_pattern in full_search_patterns:
  355. # 检查目录是否存在
  356. search_dir = os.path.dirname(search_pattern)
  357. if not os.path.exists(search_dir):
  358. logger.warning(f"搜索目录不存在: {search_dir}")
  359. continue
  360. try:
  361. # 查找匹配的文件
  362. matching_files = glob.glob(search_pattern, recursive=False)
  363. if matching_files:
  364. found_files.extend(matching_files)
  365. logger.info(f"在模式 {search_pattern} 下找到 {len(matching_files)} 个文件")
  366. except Exception as glob_err:
  367. logger.error(f"查找文件时发生错误 (模式: {search_pattern}): {str(glob_err)}")
  368. # 继续查找其他模式
  369. if not found_files:
  370. logger.warning(f"使用搜索模式 {full_search_patterns} 未找到任何匹配文件")
  371. return True # 找不到文件视为正常情况,返回成功
  372. found_files = list(set(found_files)) # 去重
  373. logger.info(f"总共找到 {len(found_files)} 个匹配文件: {found_files}")
  374. # 如果是全量刷新,在处理任何文件前清空表
  375. if execution_mode == 'full_refresh':
  376. conn = None
  377. cursor = None
  378. try:
  379. conn = get_pg_conn()
  380. cursor = conn.cursor()
  381. # 假设表在 public schema,并为表名加引号
  382. logger.info(f"执行全量刷新,清空表 public.\"{table_name}\"")
  383. cursor.execute(f'TRUNCATE TABLE public.\"{table_name}\"')
  384. conn.commit()
  385. logger.info("表 public.\"" + table_name + "\" 已清空。")
  386. except Exception as e:
  387. logger.error("清空表 public.\"" + table_name + "\" 时出错: " + str(e))
  388. if conn:
  389. conn.rollback()
  390. return False # 清空失败则直接失败退出
  391. finally:
  392. if cursor:
  393. cursor.close()
  394. if conn:
  395. conn.close()
  396. # 处理并归档每个找到的文件
  397. processed_files_count = 0
  398. for file_path in found_files:
  399. file_start_time = datetime.now()
  400. # 使用 normpath 统一路径表示
  401. normalized_file_path = os.path.normpath(file_path)
  402. logger.info(f"--- 开始处理文件: {os.path.basename(normalized_file_path)} ---")
  403. try:
  404. # 根据文件类型加载数据到表
  405. load_success = load_file_to_table(normalized_file_path, table_name, 'append')
  406. if load_success:
  407. logger.info(f"文件 {os.path.basename(normalized_file_path)} 加载成功。")
  408. processed_files_count += 1
  409. # 归档文件
  410. try:
  411. # 计算相对路径部分
  412. relative_dir = os.path.dirname(storage_location)
  413. # 获取当前日期
  414. date_str = datetime.now().strftime('%Y-%m-%d')
  415. # 构建归档目录路径
  416. archive_dir = os.path.normpath(os.path.join(STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH, relative_dir, date_str))
  417. # 创建归档目录(如果不存在)
  418. os.makedirs(archive_dir, exist_ok=True)
  419. # 获取当前unix时间戳并转换为字符串
  420. unix_timestamp = int(datetime.now().timestamp())
  421. # 修改文件名,添加时间戳
  422. original_filename = os.path.basename(normalized_file_path)
  423. filename_parts = os.path.splitext(original_filename)
  424. new_filename = f"{filename_parts[0]}_{unix_timestamp}{filename_parts[1]}"
  425. # 构建文件在归档目录中的最终路径
  426. archive_dest_path = os.path.join(archive_dir, new_filename)
  427. # 移动文件
  428. shutil.move(normalized_file_path, archive_dest_path)
  429. logger.info(f"文件已成功移动到归档目录并重命名: {archive_dest_path}")
  430. except Exception as move_err:
  431. # 记录错误,但由于数据已加载,不将整体任务标记为失败
  432. logger.error(f"加载文件 {os.path.basename(normalized_file_path)} 成功,但移动到归档目录时出错", exc_info=True)
  433. logger.error(f"原始文件路径: {normalized_file_path}")
  434. else:
  435. logger.error(f"文件 {os.path.basename(normalized_file_path)} 加载失败,中止处理。")
  436. # 任何一个文件加载失败就直接返回 False
  437. overall_end_time = datetime.now()
  438. overall_duration = (overall_end_time - overall_start_time).total_seconds()
  439. logger.info(f"===== {script_name} 执行完成 (失败) =====")
  440. logger.info(f"总耗时: {overall_duration:.2f}秒")
  441. logger.info(f"共找到文件: {len(found_files)}")
  442. logger.info(f"成功处理文件数: {processed_files_count}")
  443. logger.error(f"首个失败文件: {os.path.basename(normalized_file_path)}")
  444. return False
  445. except Exception as file_proc_err:
  446. logger.error(f"处理文件 {os.path.basename(normalized_file_path)} 时发生意外错误", exc_info=True)
  447. # 任何一个文件处理异常就直接返回 False
  448. overall_end_time = datetime.now()
  449. overall_duration = (overall_end_time - overall_start_time).total_seconds()
  450. logger.info(f"===== {script_name} 执行完成 (异常) =====")
  451. logger.info(f"总耗时: {overall_duration:.2f}秒")
  452. logger.info(f"共找到文件: {len(found_files)}")
  453. logger.info(f"成功处理文件数: {processed_files_count}")
  454. logger.error(f"首个异常文件: {os.path.basename(normalized_file_path)}")
  455. return False
  456. finally:
  457. file_end_time = datetime.now()
  458. file_duration = (file_end_time - file_start_time).total_seconds()
  459. logger.info(f"--- 文件 {os.path.basename(normalized_file_path)} 处理结束,耗时: {file_duration:.2f}秒 ---")
  460. # 记录总体执行结果(全部成功的情况)
  461. overall_end_time = datetime.now()
  462. overall_duration = (overall_end_time - overall_start_time).total_seconds()
  463. logger.info(f"===== {script_name} 执行完成 (成功) =====")
  464. logger.info(f"总耗时: {overall_duration:.2f}秒")
  465. logger.info(f"共找到文件: {len(found_files)}")
  466. logger.info(f"成功处理文件数: {processed_files_count}")
  467. return True # 所有文件处理成功
  468. if __name__ == "__main__":
  469. # 直接执行时的测试代码
  470. import argparse
  471. parser = argparse.ArgumentParser(description='从CSV或Excel文件加载数据到表(支持通配符)')
  472. parser.add_argument('--table', type=str, required=True, help='目标表名')
  473. parser.add_argument('--pattern', type=str, required=True, help='文件查找模式 (相对于基准上传路径的相对路径,例如: data/*.csv 或 data/*.xlsx 或 data/*)')
  474. parser.add_argument('--mode', type=str, default='append', choices=['append', 'full_refresh'], help='执行模式: append 或 full_refresh')
  475. args = parser.parse_args()
  476. # 构造必要的 kwargs
  477. run_kwargs = {
  478. "table_name": args.table,
  479. "execution_mode": args.mode,
  480. "storage_location": args.pattern,
  481. "target_type": 'structure',
  482. "exec_date": datetime.now().strftime('%Y-%m-%d'),
  483. "frequency": "manual",
  484. "script_name": os.path.basename(__file__)
  485. }
  486. logger.info("命令行测试执行参数: " + str(run_kwargs))
  487. success = run(**run_kwargs)
  488. if success:
  489. print("文件加载任务执行完毕,所有文件处理成功。")
  490. sys.exit(0)
  491. else:
  492. print("文件加载任务执行完毕,但有部分或全部文件处理失败。")
  493. sys.exit(1)