| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 |
- """
- 数据流程脚本:DF_DO202601130001
- 仓库库存汇总表 数据流程
- 功能:
- - 从产品库存表(test_product_inventory)中读取数据
- - 按仓库(warehouse)汇总库存数量(current_stock)
- - 输出仓库编号和总库存数量到目标表(warehouse_inventory_summary)
- - 更新模式:Full Refresh (全量更新)
- 任务信息:
- - DataFlow ID: 2220
- - DataFlow Name: 仓库库存汇总表_数据流程
- - Order ID: 17
- - Order No: DO202601130001
- 作者:cursor (自动生成)
- 创建时间:2026-01-13
- """
- from __future__ import annotations
- import argparse
- import logging
- import os
- import sys
- from datetime import datetime
- from typing import Any
- # 添加项目根目录到路径
- sys.path.insert(
- 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- )
- try:
- from sqlalchemy import create_engine, text
- from sqlalchemy.orm import sessionmaker
- except ImportError:
- print("错误:请安装 sqlalchemy 库")
- sys.exit(1)
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
- )
- logger = logging.getLogger(__name__)
- class WarehouseInventorySummaryFlow:
- """仓库库存汇总表数据流程处理器"""
- # 配置常量
- SOURCE_TABLE = "test_product_inventory"
- TARGET_TABLE = "warehouse_inventory_summary"
- SOURCE_SCHEMA = "public"
- TARGET_SCHEMA = "public"
- UPDATE_MODE = "full" # full = 全量更新
- def __init__(self, db_uri: str | None = None):
- """
- 初始化数据流程处理器
- Args:
- db_uri: 数据库连接 URI,如果不提供则从配置中获取
- """
- self.db_uri = db_uri or self._get_db_uri()
- self.engine = None
- self.session = None
- self.processed_count = 0
- self.error_count = 0
- def _get_db_uri(self) -> str:
- """获取数据库连接 URI"""
- # 优先从环境变量获取
- db_uri = os.environ.get("DATABASE_URL")
- if db_uri:
- return db_uri
- # 尝试从 Flask 配置获取
- try:
- from app.config.config import config, get_environment
- env = get_environment()
- cfg = config.get(env, config["default"])
- return cfg.SQLALCHEMY_DATABASE_URI
- except ImportError:
- pass
- # 默认使用开发环境配置
- return "postgresql://postgres:postgres@localhost:5432/dataops"
- def connect(self) -> bool:
- """
- 连接数据库
- Returns:
- 连接是否成功
- """
- try:
- self.engine = create_engine(self.db_uri)
- Session = sessionmaker(bind=self.engine)
- self.session = Session()
- # 测试连接
- with self.engine.connect() as conn:
- conn.execute(text("SELECT 1"))
- # 隐藏密码显示连接信息
- safe_uri = self.db_uri.split("@")[-1] if "@" in self.db_uri else self.db_uri
- logger.info(f"成功连接数据库: {safe_uri}")
- return True
- except Exception as e:
- logger.error(f"连接数据库失败: {str(e)}")
- return False
- def ensure_target_table(self) -> bool:
- """
- 确保目标表存在,如果不存在则创建
- Returns:
- 操作是否成功
- """
- try:
- if not self.session:
- logger.error("数据库会话未初始化")
- return False
- # 检查目标表是否存在
- check_sql = text("""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_schema = :schema
- AND table_name = :table_name
- )
- """)
- result = self.session.execute(
- check_sql,
- {"schema": self.TARGET_SCHEMA, "table_name": self.TARGET_TABLE},
- )
- exists = result.scalar()
- if exists:
- logger.info(f"目标表 {self.TARGET_SCHEMA}.{self.TARGET_TABLE} 已存在")
- return True
- # 创建目标表
- create_sql = text(f"""
- CREATE TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE} (
- id SERIAL PRIMARY KEY,
- warehouse VARCHAR(100) NOT NULL COMMENT '仓库编号',
- total_stock BIGINT NOT NULL DEFAULT 0 COMMENT '总库存数量',
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间',
- update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据更新时间'
- );
- COMMENT ON TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
- IS '仓库库存汇总表';
- COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.warehouse
- IS '仓库编号';
- COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.total_stock
- IS '总库存数量';
- COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.create_time
- IS '数据创建时间';
- COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.update_time
- IS '数据更新时间';
- """)
- self.session.execute(create_sql)
- self.session.commit()
- logger.info(f"成功创建目标表: {self.TARGET_SCHEMA}.{self.TARGET_TABLE}")
- return True
- except Exception as e:
- if self.session:
- self.session.rollback()
- logger.error(f"创建目标表失败: {str(e)}")
- # 尝试使用简化的 DDL(PostgreSQL 不支持 COMMENT 在列定义中)
- return self._create_table_simple()
- def _create_table_simple(self) -> bool:
- """使用简化的 DDL 创建目标表(PostgreSQL 兼容)"""
- try:
- if not self.session:
- return False
- # PostgreSQL 简化的建表语句
- create_sql = text(f"""
- CREATE TABLE IF NOT EXISTS {self.TARGET_SCHEMA}.{self.TARGET_TABLE} (
- id SERIAL PRIMARY KEY,
- warehouse VARCHAR(100) NOT NULL,
- total_stock BIGINT NOT NULL DEFAULT 0,
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
- self.session.execute(create_sql)
- # 添加表注释
- comment_table_sql = text(f"""
- COMMENT ON TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
- IS '仓库库存汇总表'
- """)
- self.session.execute(comment_table_sql)
- # 添加列注释
- comments = [
- ("warehouse", "仓库编号"),
- ("total_stock", "总库存数量"),
- ("create_time", "数据创建时间"),
- ("update_time", "数据更新时间"),
- ]
- for col_name, col_comment in comments:
- comment_col_sql = text(f"""
- COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.{col_name}
- IS '{col_comment}'
- """)
- self.session.execute(comment_col_sql)
- self.session.commit()
- logger.info(
- f"成功创建目标表(简化模式): {self.TARGET_SCHEMA}.{self.TARGET_TABLE}"
- )
- return True
- except Exception as e:
- if self.session:
- self.session.rollback()
- logger.error(f"创建目标表(简化模式)失败: {str(e)}")
- return False
- def extract_and_transform(self) -> list[dict[str, Any]]:
- """
- 从源表提取数据并进行转换(按仓库汇总)
- Returns:
- 转换后的数据列表
- """
- try:
- if not self.session:
- logger.error("数据库会话未初始化")
- return []
- # 执行汇总查询
- # 1. 从产品库存表中提取字段:仓库编号、产品编号、库存数量
- # 2. 对库存数量进行按仓库编号进行求和计算
- # 3. 无特殊过滤条件
- # 4. 最终输出数据格式包含字段:仓库编号、总库存数量
- query_sql = text(f"""
- SELECT
- warehouse,
- SUM(current_stock) as total_stock
- FROM {self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}
- GROUP BY warehouse
- ORDER BY warehouse
- """)
- result = self.session.execute(query_sql)
- rows = result.fetchall()
- data_list = []
- for row in rows:
- data_list.append(
- {
- "warehouse": row.warehouse,
- "total_stock": int(row.total_stock) if row.total_stock else 0,
- }
- )
- logger.info(f"从源表提取并汇总了 {len(data_list)} 条仓库库存记录")
- return data_list
- except Exception as e:
- logger.error(f"提取和转换数据失败: {str(e)}")
- return []
- def load_to_target(self, data_list: list[dict[str, Any]]) -> bool:
- """
- 将转换后的数据加载到目标表
- Args:
- data_list: 转换后的数据列表
- Returns:
- 加载是否成功
- """
- try:
- if not data_list:
- logger.warning("没有数据需要加载")
- return True
- if not self.session:
- logger.error("数据库会话未初始化")
- return False
- # 全量更新模式:先清空目标表
- if self.UPDATE_MODE == "full":
- delete_sql = text(
- f"DELETE FROM {self.TARGET_SCHEMA}.{self.TARGET_TABLE}"
- )
- self.session.execute(delete_sql)
- logger.info(f"目标表 {self.TARGET_TABLE} 已清空(全量更新模式)")
- # 插入新数据
- current_time = datetime.now()
- insert_sql = text(f"""
- INSERT INTO {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
- (warehouse, total_stock, create_time, update_time)
- VALUES
- (:warehouse, :total_stock, :create_time, :update_time)
- """)
- for data in data_list:
- try:
- self.session.execute(
- insert_sql,
- {
- "warehouse": data["warehouse"],
- "total_stock": data["total_stock"],
- "create_time": current_time,
- "update_time": current_time,
- },
- )
- self.processed_count += 1
- except Exception as e:
- self.error_count += 1
- logger.error(f"插入数据失败: {str(e)}, 数据: {data}")
- self.session.commit()
- logger.info(
- f"数据加载完成: 成功 {self.processed_count} 条, 失败 {self.error_count} 条"
- )
- return True
- except Exception as e:
- if self.session:
- self.session.rollback()
- logger.error(f"加载数据到目标表失败: {str(e)}")
- return False
- def close(self) -> None:
- """关闭数据库连接"""
- if self.session:
- try:
- self.session.close()
- logger.info("数据库会话已关闭")
- except Exception as e:
- logger.error(f"关闭数据库会话失败: {str(e)}")
- if self.engine:
- try:
- self.engine.dispose()
- logger.info("数据库引擎已释放")
- except Exception as e:
- logger.error(f"释放数据库引擎失败: {str(e)}")
- def run(self) -> dict[str, Any]:
- """
- 执行完整的 ETL 流程
- Returns:
- 执行结果字典
- """
- result = {
- "success": False,
- "processed_count": 0,
- "error_count": 0,
- "update_mode": self.UPDATE_MODE,
- "source_table": f"{self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}",
- "target_table": f"{self.TARGET_SCHEMA}.{self.TARGET_TABLE}",
- "message": "",
- }
- try:
- logger.info("=" * 60)
- logger.info("开始执行数据流程: DF_DO202601130001")
- logger.info(f"源表: {self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}")
- logger.info(f"目标表: {self.TARGET_SCHEMA}.{self.TARGET_TABLE}")
- logger.info(f"更新模式: {self.UPDATE_MODE}")
- logger.info("=" * 60)
- # 1. 连接数据库
- if not self.connect():
- result["message"] = "连接数据库失败"
- return result
- # 2. 确保目标表存在
- if not self.ensure_target_table():
- result["message"] = "创建目标表失败"
- return result
- # 3. 提取和转换数据
- data_list = self.extract_and_transform()
- if not data_list:
- result["message"] = "未提取到数据"
- result["success"] = True # 没有数据不算失败
- return result
- # 4. 加载到目标表
- if self.load_to_target(data_list):
- result["success"] = True
- result["processed_count"] = self.processed_count
- result["error_count"] = self.error_count
- result["message"] = (
- f"数据流程执行成功: "
- f"处理 {self.processed_count} 条, 失败 {self.error_count} 条"
- )
- else:
- result["message"] = "加载数据到目标表失败"
- except Exception as e:
- logger.error(f"数据流程执行异常: {str(e)}")
- result["message"] = f"数据流程执行异常: {str(e)}"
- finally:
- self.close()
- logger.info("=" * 60)
- logger.info(f"执行结果: {result['message']}")
- logger.info("=" * 60)
- return result
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(
- description="DF_DO202601130001 - 仓库库存汇总表数据流程"
- )
- parser.add_argument(
- "--db-uri",
- type=str,
- default=None,
- help="数据库连接 URI (可选,默认从配置获取)",
- )
- parser.add_argument(
- "--dry-run",
- action="store_true",
- help="仅测试连接和查询,不执行写入",
- )
- args = parser.parse_args()
- # 创建并执行数据流程
- flow = WarehouseInventorySummaryFlow(db_uri=args.db_uri)
- if args.dry_run:
- logger.info("Dry-run 模式: 仅测试连接和查询")
- if flow.connect():
- data_list = flow.extract_and_transform()
- logger.info(f"预览数据 ({len(data_list)} 条):")
- for data in data_list:
- logger.info(f" {data}")
- flow.close()
- print("\nDry-run 完成,未执行写入操作")
- sys.exit(0)
- else:
- print("\n连接失败")
- sys.exit(1)
- result = flow.run()
- # 输出结果
- print("\n" + "=" * 60)
- print(f"数据流程执行结果: {'成功' if result['success'] else '失败'}")
- print(f"消息: {result['message']}")
- print(f"处理记录数: {result['processed_count']}")
- print(f"失败记录数: {result['error_count']}")
- print(f"更新模式: {result['update_mode']}")
- print(f"源表: {result['source_table']}")
- print(f"目标表: {result['target_table']}")
- print("=" * 60)
- # 设置退出代码
- sys.exit(0 if result["success"] else 1)
- if __name__ == "__main__":
- main()
|