""" 数据流程脚本:DF_DO202601130001 仓库库存汇总表 数据流程 功能: - 从产品库存表(test_product_inventory)中读取数据 - 按仓库(warehouse)汇总库存数量(current_stock) - 输出仓库编号和总库存数量到目标表(warehouse_inventory_summary) - 更新模式:Full Refresh (全量更新) 任务信息: - DataFlow ID: 2220 - DataFlow Name: 仓库库存汇总表_数据流程 - Order ID: 17 - Order No: DO202601130001 作者:cursor (自动生成) 创建时间:2026-01-13 """ from __future__ import annotations import argparse import logging import os import sys from datetime import datetime from typing import Any # 添加项目根目录到路径 sys.path.insert( 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) try: from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker except ImportError: print("错误:请安装 sqlalchemy 库") sys.exit(1) # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) class WarehouseInventorySummaryFlow: """仓库库存汇总表数据流程处理器""" # 配置常量 SOURCE_TABLE = "test_product_inventory" TARGET_TABLE = "warehouse_inventory_summary" SOURCE_SCHEMA = "public" TARGET_SCHEMA = "public" UPDATE_MODE = "full" # full = 全量更新 def __init__(self, db_uri: str | None = None): """ 初始化数据流程处理器 Args: db_uri: 数据库连接 URI,如果不提供则从配置中获取 """ self.db_uri = db_uri or self._get_db_uri() self.engine = None self.session = None self.processed_count = 0 self.error_count = 0 def _get_db_uri(self) -> str: """获取数据库连接 URI""" # 优先从环境变量获取 db_uri = os.environ.get("DATABASE_URL") if db_uri: return db_uri # 尝试从 Flask 配置获取 try: from app.config.config import config, get_environment env = get_environment() cfg = config.get(env, config["default"]) return cfg.SQLALCHEMY_DATABASE_URI except ImportError: pass # 默认使用开发环境配置 return "postgresql://postgres:postgres@localhost:5432/dataops" def connect(self) -> bool: """ 连接数据库 Returns: 连接是否成功 """ try: self.engine = create_engine(self.db_uri) Session = sessionmaker(bind=self.engine) self.session = Session() # 测试连接 with self.engine.connect() as conn: conn.execute(text("SELECT 1")) # 隐藏密码显示连接信息 safe_uri = self.db_uri.split("@")[-1] if "@" in self.db_uri else self.db_uri logger.info(f"成功连接数据库: {safe_uri}") return True except Exception as e: logger.error(f"连接数据库失败: {str(e)}") return False def ensure_target_table(self) -> bool: """ 确保目标表存在,如果不存在则创建 Returns: 操作是否成功 """ try: if not self.session: logger.error("数据库会话未初始化") return False # 检查目标表是否存在 check_sql = text(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = :schema AND table_name = :table_name ) """) result = self.session.execute( check_sql, {"schema": self.TARGET_SCHEMA, "table_name": self.TARGET_TABLE}, ) exists = result.scalar() if exists: logger.info(f"目标表 {self.TARGET_SCHEMA}.{self.TARGET_TABLE} 已存在") return True # 创建目标表 create_sql = text(f""" CREATE TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE} ( id SERIAL PRIMARY KEY, warehouse VARCHAR(100) NOT NULL COMMENT '仓库编号', total_stock BIGINT NOT NULL DEFAULT 0 COMMENT '总库存数量', create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间', update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据更新时间' ); COMMENT ON TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE} IS '仓库库存汇总表'; COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.warehouse IS '仓库编号'; COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.total_stock IS '总库存数量'; COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.create_time IS '数据创建时间'; COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.update_time IS '数据更新时间'; """) self.session.execute(create_sql) self.session.commit() logger.info(f"成功创建目标表: {self.TARGET_SCHEMA}.{self.TARGET_TABLE}") return True except Exception as e: if self.session: self.session.rollback() logger.error(f"创建目标表失败: {str(e)}") # 尝试使用简化的 DDL(PostgreSQL 不支持 COMMENT 在列定义中) return self._create_table_simple() def _create_table_simple(self) -> bool: """使用简化的 DDL 创建目标表(PostgreSQL 兼容)""" try: if not self.session: return False # PostgreSQL 简化的建表语句 create_sql = text(f""" CREATE TABLE IF NOT EXISTS {self.TARGET_SCHEMA}.{self.TARGET_TABLE} ( id SERIAL PRIMARY KEY, warehouse VARCHAR(100) NOT NULL, total_stock BIGINT NOT NULL DEFAULT 0, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) self.session.execute(create_sql) # 添加表注释 comment_table_sql = text(f""" COMMENT ON TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE} IS '仓库库存汇总表' """) self.session.execute(comment_table_sql) # 添加列注释 comments = [ ("warehouse", "仓库编号"), ("total_stock", "总库存数量"), ("create_time", "数据创建时间"), ("update_time", "数据更新时间"), ] for col_name, col_comment in comments: comment_col_sql = text(f""" COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.{col_name} IS '{col_comment}' """) self.session.execute(comment_col_sql) self.session.commit() logger.info( f"成功创建目标表(简化模式): {self.TARGET_SCHEMA}.{self.TARGET_TABLE}" ) return True except Exception as e: if self.session: self.session.rollback() logger.error(f"创建目标表(简化模式)失败: {str(e)}") return False def extract_and_transform(self) -> list[dict[str, Any]]: """ 从源表提取数据并进行转换(按仓库汇总) Returns: 转换后的数据列表 """ try: if not self.session: logger.error("数据库会话未初始化") return [] # 执行汇总查询 # 1. 从产品库存表中提取字段:仓库编号、产品编号、库存数量 # 2. 对库存数量进行按仓库编号进行求和计算 # 3. 无特殊过滤条件 # 4. 最终输出数据格式包含字段:仓库编号、总库存数量 query_sql = text(f""" SELECT warehouse, SUM(current_stock) as total_stock FROM {self.SOURCE_SCHEMA}.{self.SOURCE_TABLE} GROUP BY warehouse ORDER BY warehouse """) result = self.session.execute(query_sql) rows = result.fetchall() data_list = [] for row in rows: data_list.append( { "warehouse": row.warehouse, "total_stock": int(row.total_stock) if row.total_stock else 0, } ) logger.info(f"从源表提取并汇总了 {len(data_list)} 条仓库库存记录") return data_list except Exception as e: logger.error(f"提取和转换数据失败: {str(e)}") return [] def load_to_target(self, data_list: list[dict[str, Any]]) -> bool: """ 将转换后的数据加载到目标表 Args: data_list: 转换后的数据列表 Returns: 加载是否成功 """ try: if not data_list: logger.warning("没有数据需要加载") return True if not self.session: logger.error("数据库会话未初始化") return False # 全量更新模式:先清空目标表 if self.UPDATE_MODE == "full": delete_sql = text( f"DELETE FROM {self.TARGET_SCHEMA}.{self.TARGET_TABLE}" ) self.session.execute(delete_sql) logger.info(f"目标表 {self.TARGET_TABLE} 已清空(全量更新模式)") # 插入新数据 current_time = datetime.now() insert_sql = text(f""" INSERT INTO {self.TARGET_SCHEMA}.{self.TARGET_TABLE} (warehouse, total_stock, create_time, update_time) VALUES (:warehouse, :total_stock, :create_time, :update_time) """) for data in data_list: try: self.session.execute( insert_sql, { "warehouse": data["warehouse"], "total_stock": data["total_stock"], "create_time": current_time, "update_time": current_time, }, ) self.processed_count += 1 except Exception as e: self.error_count += 1 logger.error(f"插入数据失败: {str(e)}, 数据: {data}") self.session.commit() logger.info( f"数据加载完成: 成功 {self.processed_count} 条, 失败 {self.error_count} 条" ) return True except Exception as e: if self.session: self.session.rollback() logger.error(f"加载数据到目标表失败: {str(e)}") return False def close(self) -> None: """关闭数据库连接""" if self.session: try: self.session.close() logger.info("数据库会话已关闭") except Exception as e: logger.error(f"关闭数据库会话失败: {str(e)}") if self.engine: try: self.engine.dispose() logger.info("数据库引擎已释放") except Exception as e: logger.error(f"释放数据库引擎失败: {str(e)}") def run(self) -> dict[str, Any]: """ 执行完整的 ETL 流程 Returns: 执行结果字典 """ result = { "success": False, "processed_count": 0, "error_count": 0, "update_mode": self.UPDATE_MODE, "source_table": f"{self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}", "target_table": f"{self.TARGET_SCHEMA}.{self.TARGET_TABLE}", "message": "", } try: logger.info("=" * 60) logger.info("开始执行数据流程: DF_DO202601130001") logger.info(f"源表: {self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}") logger.info(f"目标表: {self.TARGET_SCHEMA}.{self.TARGET_TABLE}") logger.info(f"更新模式: {self.UPDATE_MODE}") logger.info("=" * 60) # 1. 连接数据库 if not self.connect(): result["message"] = "连接数据库失败" return result # 2. 确保目标表存在 if not self.ensure_target_table(): result["message"] = "创建目标表失败" return result # 3. 提取和转换数据 data_list = self.extract_and_transform() if not data_list: result["message"] = "未提取到数据" result["success"] = True # 没有数据不算失败 return result # 4. 加载到目标表 if self.load_to_target(data_list): result["success"] = True result["processed_count"] = self.processed_count result["error_count"] = self.error_count result["message"] = ( f"数据流程执行成功: " f"处理 {self.processed_count} 条, 失败 {self.error_count} 条" ) else: result["message"] = "加载数据到目标表失败" except Exception as e: logger.error(f"数据流程执行异常: {str(e)}") result["message"] = f"数据流程执行异常: {str(e)}" finally: self.close() logger.info("=" * 60) logger.info(f"执行结果: {result['message']}") logger.info("=" * 60) return result def main(): """主函数""" parser = argparse.ArgumentParser( description="DF_DO202601130001 - 仓库库存汇总表数据流程" ) parser.add_argument( "--db-uri", type=str, default=None, help="数据库连接 URI (可选,默认从配置获取)", ) parser.add_argument( "--dry-run", action="store_true", help="仅测试连接和查询,不执行写入", ) args = parser.parse_args() # 创建并执行数据流程 flow = WarehouseInventorySummaryFlow(db_uri=args.db_uri) if args.dry_run: logger.info("Dry-run 模式: 仅测试连接和查询") if flow.connect(): data_list = flow.extract_and_transform() logger.info(f"预览数据 ({len(data_list)} 条):") for data in data_list: logger.info(f" {data}") flow.close() print("\nDry-run 完成,未执行写入操作") sys.exit(0) else: print("\n连接失败") sys.exit(1) result = flow.run() # 输出结果 print("\n" + "=" * 60) print(f"数据流程执行结果: {'成功' if result['success'] else '失败'}") print(f"消息: {result['message']}") print(f"处理记录数: {result['processed_count']}") print(f"失败记录数: {result['error_count']}") print(f"更新模式: {result['update_mode']}") print(f"源表: {result['source_table']}") print(f"目标表: {result['target_table']}") print("=" * 60) # 设置退出代码 sys.exit(0 if result["success"] else 1) if __name__ == "__main__": main()