import_resource_data.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. """
  2. 数据资源导入工具
  3. 功能:从远程数据源读取数据,按照指定的更新模式写入到目标数据资源表中
  4. 支持:
  5. - 灵活的数据源配置(PostgreSQL/MySQL等)
  6. - 灵活的目标表配置
  7. - 两种更新模式:append(追加)/ full(全量更新)
  8. 作者:cursor
  9. 创建时间:2025-11-28
  10. 更新时间:2025-11-28
  11. """
  12. import argparse
  13. import json
  14. import logging
  15. import os
  16. import sys
  17. from typing import Any, Dict, List, Optional
  18. import psycopg2
  19. from sqlalchemy import create_engine, inspect, text
  20. from sqlalchemy.engine import Engine
  21. from sqlalchemy.orm import Session, sessionmaker
  22. # 添加项目根目录到路径
  23. sys.path.insert(
  24. 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
  25. )
  26. try:
  27. from app.config.config import config, get_environment # type: ignore
  28. # 获取当前环境的配置类
  29. _current_env = get_environment()
  30. Config = config.get(_current_env, config["default"])
  31. except ImportError:
  32. # 如果无法导入,使用环境变量
  33. class Config: # type: ignore
  34. SQLALCHEMY_DATABASE_URI = os.environ.get(
  35. "DATABASE_URI", "postgresql://user:password@localhost:5432/database"
  36. )
  37. try:
  38. import pymysql # type: ignore
  39. MYSQL_AVAILABLE = True
  40. except ImportError:
  41. MYSQL_AVAILABLE = False
  42. pymysql = None # type: ignore
  43. # 配置日志
  44. logging.basicConfig(
  45. level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  46. )
  47. logger = logging.getLogger(__name__)
  48. class ResourceDataImporter:
  49. """数据资源导入器"""
  50. # 目标表所在的 schema
  51. TARGET_SCHEMA = "dags"
  52. def __init__(
  53. self,
  54. source_config: Dict[str, Any],
  55. target_table_name: str,
  56. update_mode: str = "append",
  57. ):
  58. """
  59. 初始化导入器
  60. Args:
  61. source_config: 源数据库配置
  62. {
  63. 'type': 'postgresql', # 或 'mysql'
  64. 'host': '10.52.31.104',
  65. 'port': 5432,
  66. 'database': 'source_db',
  67. 'username': 'user',
  68. 'password': 'password',
  69. 'table_name': 'TB_JC_KSDZB' # 源表名
  70. }
  71. target_table_name: 目标表名(数据资源的英文名)
  72. update_mode: 更新模式,'append'(追加)或 'full'(全量更新)
  73. """
  74. self.source_config = source_config
  75. self.target_table_name = target_table_name
  76. self.update_mode = update_mode.lower()
  77. self.source_connection: Optional[Any] = None
  78. self.target_engine: Optional[Engine] = None
  79. self.target_session: Optional[Session] = None
  80. self.imported_count = 0
  81. self.updated_count = 0
  82. self.error_count = 0
  83. # 验证更新模式
  84. if self.update_mode not in ["append", "full"]:
  85. raise ValueError(
  86. f"不支持的更新模式: {update_mode},仅支持 'append' 或 'full'"
  87. )
  88. logger.info(
  89. f"初始化数据导入器: 目标表={self.TARGET_SCHEMA}.{target_table_name}, 更新模式={update_mode}"
  90. )
  91. def connect_target_database(self) -> bool:
  92. """
  93. 连接目标数据库(从 config.py 获取配置)
  94. Returns:
  95. 连接是否成功
  96. """
  97. try:
  98. # 从 Config 获取 PostgreSQL 配置
  99. db_uri = Config.SQLALCHEMY_DATABASE_URI
  100. if not db_uri:
  101. logger.error("未找到目标数据库配置(SQLALCHEMY_DATABASE_URI)")
  102. return False
  103. # 创建目标数据库引擎
  104. self.target_engine = create_engine(db_uri)
  105. Session = sessionmaker(bind=self.target_engine)
  106. self.target_session = Session()
  107. # 测试连接
  108. self.target_engine.connect()
  109. logger.info(f"成功连接目标数据库: {db_uri.split('@')[-1]}") # 隐藏密码
  110. return True
  111. except Exception as e:
  112. logger.error(f"连接目标数据库失败: {str(e)}")
  113. return False
  114. def connect_source_database(self) -> bool:
  115. """
  116. 连接源数据库
  117. Returns:
  118. 连接是否成功
  119. """
  120. try:
  121. db_type = self.source_config["type"].lower()
  122. if db_type == "postgresql":
  123. self.source_connection = psycopg2.connect(
  124. host=self.source_config["host"],
  125. port=self.source_config["port"],
  126. database=self.source_config["database"],
  127. user=self.source_config["username"],
  128. password=self.source_config["password"],
  129. )
  130. logger.info(
  131. f"成功连接源数据库(PostgreSQL): {self.source_config['host']}:{self.source_config['port']}/{self.source_config['database']}"
  132. )
  133. return True
  134. elif db_type == "mysql":
  135. if not MYSQL_AVAILABLE or pymysql is None:
  136. logger.error("pymysql未安装,无法连接MySQL数据库")
  137. return False
  138. self.source_connection = pymysql.connect(
  139. host=self.source_config["host"],
  140. port=self.source_config["port"],
  141. database=self.source_config["database"],
  142. user=self.source_config["username"],
  143. password=self.source_config["password"],
  144. )
  145. logger.info(
  146. f"成功连接源数据库(MySQL): {self.source_config['host']}:{self.source_config['port']}/{self.source_config['database']}"
  147. )
  148. return True
  149. else:
  150. logger.error(f"不支持的数据库类型: {db_type}")
  151. return False
  152. except Exception as e:
  153. logger.error(f"连接源数据库失败: {str(e)}")
  154. return False
  155. def get_full_table_name(self) -> str:
  156. """
  157. 获取带 schema 的完整表名
  158. Returns:
  159. 完整表名 (schema.table_name)
  160. """
  161. return f"{self.TARGET_SCHEMA}.{self.target_table_name}"
  162. def get_target_table_columns(self) -> List[str]:
  163. """
  164. 获取目标表的列名
  165. Returns:
  166. 列名列表
  167. """
  168. try:
  169. if not self.target_engine:
  170. logger.error("目标数据库引擎未初始化")
  171. return []
  172. inspector = inspect(self.target_engine)
  173. # 指定 schema 来获取表的列名
  174. columns = inspector.get_columns(
  175. self.target_table_name, schema=self.TARGET_SCHEMA
  176. )
  177. column_names = [
  178. col["name"] for col in columns if col["name"] != "create_time"
  179. ]
  180. logger.info(f"目标表 {self.get_full_table_name()} 的列: {column_names}")
  181. return column_names
  182. except Exception as e:
  183. logger.error(f"获取目标表列名失败: {str(e)}")
  184. return []
  185. def extract_source_data(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
  186. """
  187. 从源数据库提取数据
  188. Args:
  189. limit: 限制提取的数据行数(None 表示不限制)
  190. Returns:
  191. 数据行列表
  192. """
  193. try:
  194. if not self.source_connection:
  195. logger.error("源数据库连接未建立")
  196. return []
  197. cursor = self.source_connection.cursor()
  198. source_table = self.source_config.get("table_name")
  199. if not source_table:
  200. logger.error("源表名未指定")
  201. return []
  202. # 构建查询语句
  203. query = f"SELECT * FROM {source_table}"
  204. # 添加过滤条件(如果有)
  205. where_clause = self.source_config.get("where_clause", "")
  206. if where_clause:
  207. query += f" WHERE {where_clause}"
  208. # 添加排序(如果有)
  209. order_by = self.source_config.get("order_by", "")
  210. if order_by:
  211. query += f" ORDER BY {order_by}"
  212. # 添加限制
  213. if limit:
  214. query += f" LIMIT {limit}"
  215. logger.info(f"执行查询: {query}")
  216. cursor.execute(query)
  217. # 获取列名
  218. columns = [desc[0] for desc in cursor.description]
  219. # 提取数据
  220. rows = []
  221. for row in cursor.fetchall():
  222. row_dict = dict(zip(columns, row))
  223. rows.append(row_dict)
  224. cursor.close()
  225. logger.info(f"从源表 {source_table} 提取了 {len(rows)} 条数据")
  226. return rows
  227. except Exception as e:
  228. logger.error(f"提取源数据失败: {str(e)}")
  229. return []
  230. def clear_target_table(self) -> bool:
  231. """
  232. 清空目标表(用于全量更新模式)
  233. Returns:
  234. 清空是否成功
  235. """
  236. try:
  237. if not self.target_session:
  238. logger.error("目标数据库会话未初始化")
  239. return False
  240. full_table_name = self.get_full_table_name()
  241. delete_sql = text(f"DELETE FROM {full_table_name}")
  242. self.target_session.execute(delete_sql)
  243. self.target_session.commit()
  244. logger.info(f"目标表 {full_table_name} 已清空")
  245. return True
  246. except Exception as e:
  247. if self.target_session:
  248. self.target_session.rollback()
  249. logger.error(f"清空目标表失败: {str(e)}")
  250. return False
  251. def map_source_to_target_columns(
  252. self, source_row: Dict[str, Any], target_columns: List[str]
  253. ) -> Dict[str, Any]:
  254. """
  255. 将源数据列映射到目标表列
  256. Args:
  257. source_row: 源数据行
  258. target_columns: 目标表列名列表
  259. Returns:
  260. 映射后的数据行
  261. """
  262. mapped_row = {}
  263. for col in target_columns:
  264. # 优先使用精确匹配(不区分大小写)
  265. col_lower = col.lower()
  266. for source_col, value in source_row.items():
  267. if source_col.lower() == col_lower:
  268. mapped_row[col] = value
  269. break
  270. else:
  271. # 如果没有匹配到,设置为 None
  272. mapped_row[col] = None
  273. return mapped_row
  274. def insert_data_to_target(self, data_rows: List[Dict[str, Any]]) -> bool:
  275. """
  276. 将数据插入目标表
  277. Args:
  278. data_rows: 数据行列表
  279. Returns:
  280. 插入是否成功
  281. """
  282. try:
  283. if not data_rows:
  284. logger.warning("没有数据需要插入")
  285. return True
  286. if not self.target_session:
  287. logger.error("目标数据库会话未初始化")
  288. return False
  289. # 获取目标表列名
  290. target_columns = self.get_target_table_columns()
  291. if not target_columns:
  292. logger.error("无法获取目标表列名")
  293. return False
  294. # 全量更新模式:先清空目标表
  295. if self.update_mode == "full" and not self.clear_target_table():
  296. return False
  297. # 构建插入 SQL(使用带 schema 的完整表名)
  298. full_table_name = self.get_full_table_name()
  299. columns_str = ", ".join(target_columns + ["create_time"])
  300. placeholders = ", ".join(
  301. [f":{col}" for col in target_columns] + ["CURRENT_TIMESTAMP"]
  302. )
  303. insert_sql = text(f"""
  304. INSERT INTO {full_table_name} ({columns_str})
  305. VALUES ({placeholders})
  306. """)
  307. # 批量插入
  308. success_count = 0
  309. for source_row in data_rows:
  310. try:
  311. # 映射列名
  312. mapped_row = self.map_source_to_target_columns(
  313. source_row, target_columns
  314. )
  315. # 执行插入
  316. self.target_session.execute(insert_sql, mapped_row)
  317. success_count += 1
  318. # 每 100 条提交一次
  319. if success_count % 100 == 0:
  320. self.target_session.commit()
  321. logger.info(f"已插入 {success_count} 条数据...")
  322. except Exception as e:
  323. self.error_count += 1
  324. logger.error(f"插入数据失败: {str(e)}, 数据: {source_row}")
  325. # 最终提交
  326. self.target_session.commit()
  327. self.imported_count = success_count
  328. logger.info(
  329. f"数据插入完成: 成功 {self.imported_count} 条, 失败 {self.error_count} 条"
  330. )
  331. return True
  332. except Exception as e:
  333. if self.target_session:
  334. self.target_session.rollback()
  335. logger.error(f"批量插入数据失败: {str(e)}")
  336. return False
  337. def close_connections(self):
  338. """关闭所有数据库连接"""
  339. # 关闭源数据库连接
  340. if self.source_connection:
  341. try:
  342. self.source_connection.close()
  343. logger.info("源数据库连接已关闭")
  344. except Exception as e:
  345. logger.error(f"关闭源数据库连接失败: {str(e)}")
  346. # 关闭目标数据库连接
  347. if self.target_session:
  348. try:
  349. self.target_session.close()
  350. logger.info("目标数据库会话已关闭")
  351. except Exception as e:
  352. logger.error(f"关闭目标数据库会话失败: {str(e)}")
  353. if self.target_engine:
  354. try:
  355. self.target_engine.dispose()
  356. logger.info("目标数据库引擎已释放")
  357. except Exception as e:
  358. logger.error(f"释放目标数据库引擎失败: {str(e)}")
  359. def run(self, limit: Optional[int] = None) -> Dict[str, Any]:
  360. """
  361. 执行导入流程
  362. Args:
  363. limit: 限制导入的数据行数(None 表示不限制)
  364. Returns:
  365. 执行结果
  366. """
  367. result = {
  368. "success": False,
  369. "imported_count": 0,
  370. "error_count": 0,
  371. "update_mode": self.update_mode,
  372. "message": "",
  373. }
  374. try:
  375. logger.info("=" * 60)
  376. logger.info("开始数据导入")
  377. logger.info(f"源表: {self.source_config.get('table_name')}")
  378. logger.info(f"目标表: {self.get_full_table_name()}")
  379. logger.info(f"更新模式: {self.update_mode}")
  380. logger.info("=" * 60)
  381. # 1. 连接源数据库
  382. if not self.connect_source_database():
  383. result["message"] = "连接源数据库失败"
  384. return result
  385. # 2. 连接目标数据库
  386. if not self.connect_target_database():
  387. result["message"] = "连接目标数据库失败"
  388. return result
  389. # 3. 提取源数据
  390. data_rows = self.extract_source_data(limit=limit)
  391. if not data_rows:
  392. result["message"] = "未提取到数据"
  393. result["success"] = True # 没有数据不算失败
  394. return result
  395. # 4. 插入数据到目标表
  396. if self.insert_data_to_target(data_rows):
  397. result["success"] = True
  398. result["imported_count"] = self.imported_count
  399. result["error_count"] = self.error_count
  400. result["message"] = (
  401. f"导入完成: 成功 {self.imported_count} 条, 失败 {self.error_count} 条"
  402. )
  403. else:
  404. result["message"] = "插入数据到目标表失败"
  405. except Exception as e:
  406. logger.error(f"导入过程发生异常: {str(e)}")
  407. result["message"] = f"导入失败: {str(e)}"
  408. finally:
  409. # 5. 关闭连接
  410. self.close_connections()
  411. logger.info("=" * 60)
  412. logger.info(f"导入结果: {result['message']}")
  413. logger.info("=" * 60)
  414. return result
  415. def import_resource_data(
  416. source_config: Dict[str, Any],
  417. target_table_name: str,
  418. update_mode: str = "append",
  419. limit: Optional[int] = None,
  420. ) -> Dict[str, Any]:
  421. """
  422. 导入数据资源(入口函数)
  423. Args:
  424. source_config: 源数据库配置
  425. {
  426. 'type': 'postgresql', # 或 'mysql'
  427. 'host': '10.52.31.104',
  428. 'port': 5432,
  429. 'database': 'source_db',
  430. 'username': 'user',
  431. 'password': 'password',
  432. 'table_name': 'TB_JC_KSDZB', # 源表名
  433. 'where_clause': "TBRQ >= '2025-01-01'", # 可选:WHERE条件
  434. 'order_by': 'TBRQ DESC' # 可选:排序
  435. }
  436. target_table_name: 目标表名(数据资源的英文名)
  437. update_mode: 更新模式,'append'(追加)或 'full'(全量更新)
  438. limit: 限制导入的数据行数(None 表示不限制)
  439. Returns:
  440. 导入结果
  441. """
  442. importer = ResourceDataImporter(
  443. source_config=source_config,
  444. target_table_name=target_table_name,
  445. update_mode=update_mode,
  446. )
  447. return importer.run(limit=limit)
  448. def parse_args():
  449. """解析命令行参数"""
  450. parser = argparse.ArgumentParser(description="数据资源导入工具")
  451. parser.add_argument(
  452. "--source-config",
  453. type=str,
  454. required=True,
  455. help="源数据库配置(JSON格式字符串或文件路径)",
  456. )
  457. parser.add_argument(
  458. "--target-table", type=str, required=True, help="目标表名(数据资源的英文名)"
  459. )
  460. parser.add_argument(
  461. "--update-mode",
  462. type=str,
  463. choices=["append", "full"],
  464. default="append",
  465. help="更新模式:append(追加)或 full(全量更新)",
  466. )
  467. parser.add_argument("--limit", type=int, default=None, help="限制导入的数据行数")
  468. return parser.parse_args()
  469. if __name__ == "__main__":
  470. # 解析命令行参数
  471. args = parse_args()
  472. # 解析源数据库配置
  473. try:
  474. # 尝试作为JSON字符串解析
  475. source_config = json.loads(args.source_config)
  476. except json.JSONDecodeError:
  477. # 尝试作为文件路径读取
  478. try:
  479. with open(args.source_config, encoding="utf-8") as f:
  480. source_config = json.load(f)
  481. except Exception as e:
  482. logger.error(f"解析源数据库配置失败: {str(e)}")
  483. exit(1)
  484. # 执行导入
  485. result = import_resource_data(
  486. source_config=source_config,
  487. target_table_name=args.target_table,
  488. update_mode=args.update_mode,
  489. limit=args.limit,
  490. )
  491. # 输出结果
  492. print("\n" + "=" * 60)
  493. print(f"导入结果: {'成功' if result['success'] else '失败'}")
  494. print(f"消息: {result['message']}")
  495. print(f"成功: {result['imported_count']} 条")
  496. print(f"失败: {result['error_count']} 条")
  497. print(f"更新模式: {result['update_mode']}")
  498. print("=" * 60)
  499. # 设置退出代码
  500. exit(0 if result["success"] else 1)