DF_DO202601130001.py 16 KB


  1. """
  2. 数据流程脚本:DF_DO202601130001
  3. 仓库库存汇总表 数据流程
  4. 功能:
  5. - 从产品库存表(test_product_inventory)中读取数据
  6. - 按仓库(warehouse)汇总库存数量(current_stock)
  7. - 输出仓库编号和总库存数量到目标表(warehouse_inventory_summary)
  8. - 更新模式:Full Refresh (全量更新)
  9. 任务信息:
  10. - DataFlow ID: 2220
  11. - DataFlow Name: 仓库库存汇总表_数据流程
  12. - Order ID: 17
  13. - Order No: DO202601130001
  14. 作者:cursor (自动生成)
  15. 创建时间:2026-01-13
  16. """
  17. from __future__ import annotations
  18. import argparse
  19. import logging
  20. import os
  21. import sys
  22. from datetime import datetime
  23. from typing import Any
  24. # 添加项目根目录到路径
  25. sys.path.insert(
  26. 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  27. )
  28. try:
  29. from sqlalchemy import create_engine, text
  30. from sqlalchemy.orm import sessionmaker
  31. except ImportError:
  32. print("错误:请安装 sqlalchemy 库")
  33. sys.exit(1)
  34. # 配置日志
  35. logging.basicConfig(
  36. level=logging.INFO,
  37. format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
  38. )
  39. logger = logging.getLogger(__name__)
  40. class WarehouseInventorySummaryFlow:
  41. """仓库库存汇总表数据流程处理器"""
  42. # 配置常量
  43. SOURCE_TABLE = "test_product_inventory"
  44. TARGET_TABLE = "warehouse_inventory_summary"
  45. SOURCE_SCHEMA = "public"
  46. TARGET_SCHEMA = "public"
  47. UPDATE_MODE = "full" # full = 全量更新
  48. def __init__(self, db_uri: str | None = None):
  49. """
  50. 初始化数据流程处理器
  51. Args:
  52. db_uri: 数据库连接 URI,如果不提供则从配置中获取
  53. """
  54. self.db_uri = db_uri or self._get_db_uri()
  55. self.engine = None
  56. self.session = None
  57. self.processed_count = 0
  58. self.error_count = 0
  59. def _get_db_uri(self) -> str:
  60. """获取数据库连接 URI"""
  61. # 优先从环境变量获取
  62. db_uri = os.environ.get("DATABASE_URL")
  63. if db_uri:
  64. return db_uri
  65. # 尝试从 Flask 配置获取
  66. try:
  67. from app.config.config import config, get_environment
  68. env = get_environment()
  69. cfg = config.get(env, config["default"])
  70. return cfg.SQLALCHEMY_DATABASE_URI
  71. except ImportError:
  72. pass
  73. # 默认使用开发环境配置
  74. return "postgresql://postgres:postgres@localhost:5432/dataops"
  75. def connect(self) -> bool:
  76. """
  77. 连接数据库
  78. Returns:
  79. 连接是否成功
  80. """
  81. try:
  82. self.engine = create_engine(self.db_uri)
  83. Session = sessionmaker(bind=self.engine)
  84. self.session = Session()
  85. # 测试连接
  86. with self.engine.connect() as conn:
  87. conn.execute(text("SELECT 1"))
  88. # 隐藏密码显示连接信息
  89. safe_uri = self.db_uri.split("@")[-1] if "@" in self.db_uri else self.db_uri
  90. logger.info(f"成功连接数据库: {safe_uri}")
  91. return True
  92. except Exception as e:
  93. logger.error(f"连接数据库失败: {str(e)}")
  94. return False
  95. def ensure_target_table(self) -> bool:
  96. """
  97. 确保目标表存在,如果不存在则创建
  98. Returns:
  99. 操作是否成功
  100. """
  101. try:
  102. if not self.session:
  103. logger.error("数据库会话未初始化")
  104. return False
  105. # 检查目标表是否存在
  106. check_sql = text("""
  107. SELECT EXISTS (
  108. SELECT FROM information_schema.tables
  109. WHERE table_schema = :schema
  110. AND table_name = :table_name
  111. )
  112. """)
  113. result = self.session.execute(
  114. check_sql,
  115. {"schema": self.TARGET_SCHEMA, "table_name": self.TARGET_TABLE},
  116. )
  117. exists = result.scalar()
  118. if exists:
  119. logger.info(f"目标表 {self.TARGET_SCHEMA}.{self.TARGET_TABLE} 已存在")
  120. return True
  121. # 创建目标表
  122. create_sql = text(f"""
  123. CREATE TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE} (
  124. id SERIAL PRIMARY KEY,
  125. warehouse VARCHAR(100) NOT NULL COMMENT '仓库编号',
  126. total_stock BIGINT NOT NULL DEFAULT 0 COMMENT '总库存数量',
  127. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间',
  128. update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据更新时间'
  129. );
  130. COMMENT ON TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
  131. IS '仓库库存汇总表';
  132. COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.warehouse
  133. IS '仓库编号';
  134. COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.total_stock
  135. IS '总库存数量';
  136. COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.create_time
  137. IS '数据创建时间';
  138. COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.update_time
  139. IS '数据更新时间';
  140. """)
  141. self.session.execute(create_sql)
  142. self.session.commit()
  143. logger.info(f"成功创建目标表: {self.TARGET_SCHEMA}.{self.TARGET_TABLE}")
  144. return True
  145. except Exception as e:
  146. if self.session:
  147. self.session.rollback()
  148. logger.error(f"创建目标表失败: {str(e)}")
  149. # 尝试使用简化的 DDL(PostgreSQL 不支持 COMMENT 在列定义中)
  150. return self._create_table_simple()
  151. def _create_table_simple(self) -> bool:
  152. """使用简化的 DDL 创建目标表(PostgreSQL 兼容)"""
  153. try:
  154. if not self.session:
  155. return False
  156. # PostgreSQL 简化的建表语句
  157. create_sql = text(f"""
  158. CREATE TABLE IF NOT EXISTS {self.TARGET_SCHEMA}.{self.TARGET_TABLE} (
  159. id SERIAL PRIMARY KEY,
  160. warehouse VARCHAR(100) NOT NULL,
  161. total_stock BIGINT NOT NULL DEFAULT 0,
  162. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  163. update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  164. )
  165. """)
  166. self.session.execute(create_sql)
  167. # 添加表注释
  168. comment_table_sql = text(f"""
  169. COMMENT ON TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
  170. IS '仓库库存汇总表'
  171. """)
  172. self.session.execute(comment_table_sql)
  173. # 添加列注释
  174. comments = [
  175. ("warehouse", "仓库编号"),
  176. ("total_stock", "总库存数量"),
  177. ("create_time", "数据创建时间"),
  178. ("update_time", "数据更新时间"),
  179. ]
  180. for col_name, col_comment in comments:
  181. comment_col_sql = text(f"""
  182. COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.{col_name}
  183. IS '{col_comment}'
  184. """)
  185. self.session.execute(comment_col_sql)
  186. self.session.commit()
  187. logger.info(
  188. f"成功创建目标表(简化模式): {self.TARGET_SCHEMA}.{self.TARGET_TABLE}"
  189. )
  190. return True
  191. except Exception as e:
  192. if self.session:
  193. self.session.rollback()
  194. logger.error(f"创建目标表(简化模式)失败: {str(e)}")
  195. return False
  196. def extract_and_transform(self) -> list[dict[str, Any]]:
  197. """
  198. 从源表提取数据并进行转换(按仓库汇总)
  199. Returns:
  200. 转换后的数据列表
  201. """
  202. try:
  203. if not self.session:
  204. logger.error("数据库会话未初始化")
  205. return []
  206. # 执行汇总查询
  207. # 1. 从产品库存表中提取字段:仓库编号、产品编号、库存数量
  208. # 2. 对库存数量进行按仓库编号进行求和计算
  209. # 3. 无特殊过滤条件
  210. # 4. 最终输出数据格式包含字段:仓库编号、总库存数量
  211. query_sql = text(f"""
  212. SELECT
  213. warehouse,
  214. SUM(current_stock) as total_stock
  215. FROM {self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}
  216. GROUP BY warehouse
  217. ORDER BY warehouse
  218. """)
  219. result = self.session.execute(query_sql)
  220. rows = result.fetchall()
  221. data_list = []
  222. for row in rows:
  223. data_list.append(
  224. {
  225. "warehouse": row.warehouse,
  226. "total_stock": int(row.total_stock) if row.total_stock else 0,
  227. }
  228. )
  229. logger.info(f"从源表提取并汇总了 {len(data_list)} 条仓库库存记录")
  230. return data_list
  231. except Exception as e:
  232. logger.error(f"提取和转换数据失败: {str(e)}")
  233. return []
  234. def load_to_target(self, data_list: list[dict[str, Any]]) -> bool:
  235. """
  236. 将转换后的数据加载到目标表
  237. Args:
  238. data_list: 转换后的数据列表
  239. Returns:
  240. 加载是否成功
  241. """
  242. try:
  243. if not data_list:
  244. logger.warning("没有数据需要加载")
  245. return True
  246. if not self.session:
  247. logger.error("数据库会话未初始化")
  248. return False
  249. # 全量更新模式:先清空目标表
  250. if self.UPDATE_MODE == "full":
  251. delete_sql = text(
  252. f"DELETE FROM {self.TARGET_SCHEMA}.{self.TARGET_TABLE}"
  253. )
  254. self.session.execute(delete_sql)
  255. logger.info(f"目标表 {self.TARGET_TABLE} 已清空(全量更新模式)")
  256. # 插入新数据
  257. current_time = datetime.now()
  258. insert_sql = text(f"""
  259. INSERT INTO {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
  260. (warehouse, total_stock, create_time, update_time)
  261. VALUES
  262. (:warehouse, :total_stock, :create_time, :update_time)
  263. """)
  264. for data in data_list:
  265. try:
  266. self.session.execute(
  267. insert_sql,
  268. {
  269. "warehouse": data["warehouse"],
  270. "total_stock": data["total_stock"],
  271. "create_time": current_time,
  272. "update_time": current_time,
  273. },
  274. )
  275. self.processed_count += 1
  276. except Exception as e:
  277. self.error_count += 1
  278. logger.error(f"插入数据失败: {str(e)}, 数据: {data}")
  279. self.session.commit()
  280. logger.info(
  281. f"数据加载完成: 成功 {self.processed_count} 条, 失败 {self.error_count} 条"
  282. )
  283. return True
  284. except Exception as e:
  285. if self.session:
  286. self.session.rollback()
  287. logger.error(f"加载数据到目标表失败: {str(e)}")
  288. return False
  289. def close(self) -> None:
  290. """关闭数据库连接"""
  291. if self.session:
  292. try:
  293. self.session.close()
  294. logger.info("数据库会话已关闭")
  295. except Exception as e:
  296. logger.error(f"关闭数据库会话失败: {str(e)}")
  297. if self.engine:
  298. try:
  299. self.engine.dispose()
  300. logger.info("数据库引擎已释放")
  301. except Exception as e:
  302. logger.error(f"释放数据库引擎失败: {str(e)}")
  303. def run(self) -> dict[str, Any]:
  304. """
  305. 执行完整的 ETL 流程
  306. Returns:
  307. 执行结果字典
  308. """
  309. result = {
  310. "success": False,
  311. "processed_count": 0,
  312. "error_count": 0,
  313. "update_mode": self.UPDATE_MODE,
  314. "source_table": f"{self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}",
  315. "target_table": f"{self.TARGET_SCHEMA}.{self.TARGET_TABLE}",
  316. "message": "",
  317. }
  318. try:
  319. logger.info("=" * 60)
  320. logger.info("开始执行数据流程: DF_DO202601130001")
  321. logger.info(f"源表: {self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}")
  322. logger.info(f"目标表: {self.TARGET_SCHEMA}.{self.TARGET_TABLE}")
  323. logger.info(f"更新模式: {self.UPDATE_MODE}")
  324. logger.info("=" * 60)
  325. # 1. 连接数据库
  326. if not self.connect():
  327. result["message"] = "连接数据库失败"
  328. return result
  329. # 2. 确保目标表存在
  330. if not self.ensure_target_table():
  331. result["message"] = "创建目标表失败"
  332. return result
  333. # 3. 提取和转换数据
  334. data_list = self.extract_and_transform()
  335. if not data_list:
  336. result["message"] = "未提取到数据"
  337. result["success"] = True # 没有数据不算失败
  338. return result
  339. # 4. 加载到目标表
  340. if self.load_to_target(data_list):
  341. result["success"] = True
  342. result["processed_count"] = self.processed_count
  343. result["error_count"] = self.error_count
  344. result["message"] = (
  345. f"数据流程执行成功: "
  346. f"处理 {self.processed_count} 条, 失败 {self.error_count} 条"
  347. )
  348. else:
  349. result["message"] = "加载数据到目标表失败"
  350. except Exception as e:
  351. logger.error(f"数据流程执行异常: {str(e)}")
  352. result["message"] = f"数据流程执行异常: {str(e)}"
  353. finally:
  354. self.close()
  355. logger.info("=" * 60)
  356. logger.info(f"执行结果: {result['message']}")
  357. logger.info("=" * 60)
  358. return result
  359. def main():
  360. """主函数"""
  361. parser = argparse.ArgumentParser(
  362. description="DF_DO202601130001 - 仓库库存汇总表数据流程"
  363. )
  364. parser.add_argument(
  365. "--db-uri",
  366. type=str,
  367. default=None,
  368. help="数据库连接 URI (可选,默认从配置获取)",
  369. )
  370. parser.add_argument(
  371. "--dry-run",
  372. action="store_true",
  373. help="仅测试连接和查询,不执行写入",
  374. )
  375. args = parser.parse_args()
  376. # 创建并执行数据流程
  377. flow = WarehouseInventorySummaryFlow(db_uri=args.db_uri)
  378. if args.dry_run:
  379. logger.info("Dry-run 模式: 仅测试连接和查询")
  380. if flow.connect():
  381. data_list = flow.extract_and_transform()
  382. logger.info(f"预览数据 ({len(data_list)} 条):")
  383. for data in data_list:
  384. logger.info(f" {data}")
  385. flow.close()
  386. print("\nDry-run 完成,未执行写入操作")
  387. sys.exit(0)
  388. else:
  389. print("\n连接失败")
  390. sys.exit(1)
  391. result = flow.run()
  392. # 输出结果
  393. print("\n" + "=" * 60)
  394. print(f"数据流程执行结果: {'成功' if result['success'] else '失败'}")
  395. print(f"消息: {result['message']}")
  396. print(f"处理记录数: {result['processed_count']}")
  397. print(f"失败记录数: {result['error_count']}")
  398. print(f"更新模式: {result['update_mode']}")
  399. print(f"源表: {result['source_table']}")
  400. print(f"目标表: {result['target_table']}")
  401. print("=" * 60)
  402. # 设置退出代码
  403. sys.exit(0 if result["success"] else 1)
  404. if __name__ == "__main__":
  405. main()