|
@@ -0,0 +1,483 @@
|
|
|
|
|
+"""
|
|
|
|
|
+数据流程脚本:DF_DO202601130001
|
|
|
|
|
+仓库库存汇总表 数据流程
|
|
|
|
|
+
|
|
|
|
|
+功能:
|
|
|
|
|
+- 从产品库存表(test_product_inventory)中读取数据
|
|
|
|
|
+- 按仓库(warehouse)汇总库存数量(current_stock)
|
|
|
|
|
+- 输出仓库编号和总库存数量到目标表(warehouse_inventory_summary)
|
|
|
|
|
+- 更新模式:Full Refresh (全量更新)
|
|
|
|
|
+
|
|
|
|
|
+任务信息:
|
|
|
|
|
+- DataFlow ID: 2220
|
|
|
|
|
+- DataFlow Name: 仓库库存汇总表_数据流程
|
|
|
|
|
+- Order ID: 17
|
|
|
|
|
+- Order No: DO202601130001
|
|
|
|
|
+
|
|
|
|
|
+作者:cursor (自动生成)
|
|
|
|
|
+创建时间:2026-01-13
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+from __future__ import annotations
|
|
|
|
|
+
|
|
|
|
|
+import argparse
|
|
|
|
|
+import logging
|
|
|
|
|
+import os
|
|
|
|
|
+import sys
|
|
|
|
|
+from datetime import datetime
|
|
|
|
|
+from typing import Any
|
|
|
|
|
+
|
|
|
|
|
+# 添加项目根目录到路径
|
|
|
|
|
+sys.path.insert(
|
|
|
|
|
+ 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+try:
|
|
|
|
|
+ from sqlalchemy import create_engine, text
|
|
|
|
|
+ from sqlalchemy.orm import sessionmaker
|
|
|
|
|
+except ImportError:
|
|
|
|
|
+ print("错误:请安装 sqlalchemy 库")
|
|
|
|
|
+ sys.exit(1)
|
|
|
|
|
+
|
|
|
|
|
+# 配置日志
|
|
|
|
|
+logging.basicConfig(
|
|
|
|
|
+ level=logging.INFO,
|
|
|
|
|
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
|
|
|
+)
|
|
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class WarehouseInventorySummaryFlow:
|
|
|
|
|
+ """仓库库存汇总表数据流程处理器"""
|
|
|
|
|
+
|
|
|
|
|
+ # 配置常量
|
|
|
|
|
+ SOURCE_TABLE = "test_product_inventory"
|
|
|
|
|
+ TARGET_TABLE = "warehouse_inventory_summary"
|
|
|
|
|
+ SOURCE_SCHEMA = "public"
|
|
|
|
|
+ TARGET_SCHEMA = "public"
|
|
|
|
|
+ UPDATE_MODE = "full" # full = 全量更新
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, db_uri: str | None = None):
|
|
|
|
|
+ """
|
|
|
|
|
+ 初始化数据流程处理器
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ db_uri: 数据库连接 URI,如果不提供则从配置中获取
|
|
|
|
|
+ """
|
|
|
|
|
+ self.db_uri = db_uri or self._get_db_uri()
|
|
|
|
|
+ self.engine = None
|
|
|
|
|
+ self.session = None
|
|
|
|
|
+ self.processed_count = 0
|
|
|
|
|
+ self.error_count = 0
|
|
|
|
|
+
|
|
|
|
|
+ def _get_db_uri(self) -> str:
|
|
|
|
|
+ """获取数据库连接 URI"""
|
|
|
|
|
+ # 优先从环境变量获取
|
|
|
|
|
+ db_uri = os.environ.get("DATABASE_URL")
|
|
|
|
|
+ if db_uri:
|
|
|
|
|
+ return db_uri
|
|
|
|
|
+
|
|
|
|
|
+ # 尝试从 Flask 配置获取
|
|
|
|
|
+ try:
|
|
|
|
|
+ from app.config.config import config, get_environment
|
|
|
|
|
+
|
|
|
|
|
+ env = get_environment()
|
|
|
|
|
+ cfg = config.get(env, config["default"])
|
|
|
|
|
+ return cfg.SQLALCHEMY_DATABASE_URI
|
|
|
|
|
+ except ImportError:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # 默认使用开发环境配置
|
|
|
|
|
+ return "postgresql://postgres:postgres@localhost:5432/dataops"
|
|
|
|
|
+
|
|
|
|
|
+ def connect(self) -> bool:
|
|
|
|
|
+ """
|
|
|
|
|
+ 连接数据库
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 连接是否成功
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.engine = create_engine(self.db_uri)
|
|
|
|
|
+ Session = sessionmaker(bind=self.engine)
|
|
|
|
|
+ self.session = Session()
|
|
|
|
|
+
|
|
|
|
|
+ # 测试连接
|
|
|
|
|
+ with self.engine.connect() as conn:
|
|
|
|
|
+ conn.execute(text("SELECT 1"))
|
|
|
|
|
+
|
|
|
|
|
+ # 隐藏密码显示连接信息
|
|
|
|
|
+ safe_uri = self.db_uri.split("@")[-1] if "@" in self.db_uri else self.db_uri
|
|
|
|
|
+ logger.info(f"成功连接数据库: {safe_uri}")
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"连接数据库失败: {str(e)}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ def ensure_target_table(self) -> bool:
|
|
|
|
|
+ """
|
|
|
|
|
+ 确保目标表存在,如果不存在则创建
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 操作是否成功
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ if not self.session:
|
|
|
|
|
+ logger.error("数据库会话未初始化")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ # 检查目标表是否存在
|
|
|
|
|
+ check_sql = text("""
|
|
|
|
|
+ SELECT EXISTS (
|
|
|
|
|
+ SELECT FROM information_schema.tables
|
|
|
|
|
+ WHERE table_schema = :schema
|
|
|
|
|
+ AND table_name = :table_name
|
|
|
|
|
+ )
|
|
|
|
|
+ """)
|
|
|
|
|
+
|
|
|
|
|
+ result = self.session.execute(
|
|
|
|
|
+ check_sql,
|
|
|
|
|
+ {"schema": self.TARGET_SCHEMA, "table_name": self.TARGET_TABLE},
|
|
|
|
|
+ )
|
|
|
|
|
+ exists = result.scalar()
|
|
|
|
|
+
|
|
|
|
|
+ if exists:
|
|
|
|
|
+ logger.info(f"目标表 {self.TARGET_SCHEMA}.{self.TARGET_TABLE} 已存在")
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ # 创建目标表
|
|
|
|
|
+ create_sql = text(f"""
|
|
|
|
|
+ CREATE TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE} (
|
|
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
|
|
+ warehouse VARCHAR(100) NOT NULL COMMENT '仓库编号',
|
|
|
|
|
+ total_stock BIGINT NOT NULL DEFAULT 0 COMMENT '总库存数量',
|
|
|
|
|
+ create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间',
|
|
|
|
|
+ update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据更新时间'
|
|
|
|
|
+ );
|
|
|
|
|
+ COMMENT ON TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
|
|
|
|
|
+ IS '仓库库存汇总表';
|
|
|
|
|
+ COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.warehouse
|
|
|
|
|
+ IS '仓库编号';
|
|
|
|
|
+ COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.total_stock
|
|
|
|
|
+ IS '总库存数量';
|
|
|
|
|
+ COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.create_time
|
|
|
|
|
+ IS '数据创建时间';
|
|
|
|
|
+ COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.update_time
|
|
|
|
|
+ IS '数据更新时间';
|
|
|
|
|
+ """)
|
|
|
|
|
+
|
|
|
|
|
+ self.session.execute(create_sql)
|
|
|
|
|
+ self.session.commit()
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"成功创建目标表: {self.TARGET_SCHEMA}.{self.TARGET_TABLE}")
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ if self.session:
|
|
|
|
|
+ self.session.rollback()
|
|
|
|
|
+ logger.error(f"创建目标表失败: {str(e)}")
|
|
|
|
|
+ # 尝试使用简化的 DDL(PostgreSQL 不支持 COMMENT 在列定义中)
|
|
|
|
|
+ return self._create_table_simple()
|
|
|
|
|
+
|
|
|
|
|
+ def _create_table_simple(self) -> bool:
|
|
|
|
|
+ """使用简化的 DDL 创建目标表(PostgreSQL 兼容)"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ if not self.session:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ # PostgreSQL 简化的建表语句
|
|
|
|
|
+ create_sql = text(f"""
|
|
|
|
|
+ CREATE TABLE IF NOT EXISTS {self.TARGET_SCHEMA}.{self.TARGET_TABLE} (
|
|
|
|
|
+ id SERIAL PRIMARY KEY,
|
|
|
|
|
+ warehouse VARCHAR(100) NOT NULL,
|
|
|
|
|
+ total_stock BIGINT NOT NULL DEFAULT 0,
|
|
|
|
|
+ create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
+ update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
|
|
+ )
|
|
|
|
|
+ """)
|
|
|
|
|
+
|
|
|
|
|
+ self.session.execute(create_sql)
|
|
|
|
|
+
|
|
|
|
|
+ # 添加表注释
|
|
|
|
|
+ comment_table_sql = text(f"""
|
|
|
|
|
+ COMMENT ON TABLE {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
|
|
|
|
|
+ IS '仓库库存汇总表'
|
|
|
|
|
+ """)
|
|
|
|
|
+ self.session.execute(comment_table_sql)
|
|
|
|
|
+
|
|
|
|
|
+ # 添加列注释
|
|
|
|
|
+ comments = [
|
|
|
|
|
+ ("warehouse", "仓库编号"),
|
|
|
|
|
+ ("total_stock", "总库存数量"),
|
|
|
|
|
+ ("create_time", "数据创建时间"),
|
|
|
|
|
+ ("update_time", "数据更新时间"),
|
|
|
|
|
+ ]
|
|
|
|
|
+ for col_name, col_comment in comments:
|
|
|
|
|
+ comment_col_sql = text(f"""
|
|
|
|
|
+ COMMENT ON COLUMN {self.TARGET_SCHEMA}.{self.TARGET_TABLE}.{col_name}
|
|
|
|
|
+ IS '{col_comment}'
|
|
|
|
|
+ """)
|
|
|
|
|
+ self.session.execute(comment_col_sql)
|
|
|
|
|
+
|
|
|
|
|
+ self.session.commit()
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ f"成功创建目标表(简化模式): {self.TARGET_SCHEMA}.{self.TARGET_TABLE}"
|
|
|
|
|
+ )
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ if self.session:
|
|
|
|
|
+ self.session.rollback()
|
|
|
|
|
+ logger.error(f"创建目标表(简化模式)失败: {str(e)}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ def extract_and_transform(self) -> list[dict[str, Any]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 从源表提取数据并进行转换(按仓库汇总)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 转换后的数据列表
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ if not self.session:
|
|
|
|
|
+ logger.error("数据库会话未初始化")
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ # 执行汇总查询
|
|
|
|
|
+ # 1. 从产品库存表中提取字段:仓库编号、产品编号、库存数量
|
|
|
|
|
+ # 2. 对库存数量进行按仓库编号进行求和计算
|
|
|
|
|
+ # 3. 无特殊过滤条件
|
|
|
|
|
+ # 4. 最终输出数据格式包含字段:仓库编号、总库存数量
|
|
|
|
|
+ query_sql = text(f"""
|
|
|
|
|
+ SELECT
|
|
|
|
|
+ warehouse,
|
|
|
|
|
+ SUM(current_stock) as total_stock
|
|
|
|
|
+ FROM {self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}
|
|
|
|
|
+ GROUP BY warehouse
|
|
|
|
|
+ ORDER BY warehouse
|
|
|
|
|
+ """)
|
|
|
|
|
+
|
|
|
|
|
+ result = self.session.execute(query_sql)
|
|
|
|
|
+ rows = result.fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ data_list = []
|
|
|
|
|
+ for row in rows:
|
|
|
|
|
+ data_list.append(
|
|
|
|
|
+ {
|
|
|
|
|
+ "warehouse": row.warehouse,
|
|
|
|
|
+ "total_stock": int(row.total_stock) if row.total_stock else 0,
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"从源表提取并汇总了 {len(data_list)} 条仓库库存记录")
|
|
|
|
|
+ return data_list
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"提取和转换数据失败: {str(e)}")
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ def load_to_target(self, data_list: list[dict[str, Any]]) -> bool:
|
|
|
|
|
+ """
|
|
|
|
|
+ 将转换后的数据加载到目标表
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ data_list: 转换后的数据列表
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 加载是否成功
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ if not data_list:
|
|
|
|
|
+ logger.warning("没有数据需要加载")
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ if not self.session:
|
|
|
|
|
+ logger.error("数据库会话未初始化")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ # 全量更新模式:先清空目标表
|
|
|
|
|
+ if self.UPDATE_MODE == "full":
|
|
|
|
|
+ delete_sql = text(
|
|
|
|
|
+ f"DELETE FROM {self.TARGET_SCHEMA}.{self.TARGET_TABLE}"
|
|
|
|
|
+ )
|
|
|
|
|
+ self.session.execute(delete_sql)
|
|
|
|
|
+ logger.info(f"目标表 {self.TARGET_TABLE} 已清空(全量更新模式)")
|
|
|
|
|
+
|
|
|
|
|
+ # 插入新数据
|
|
|
|
|
+ current_time = datetime.now()
|
|
|
|
|
+ insert_sql = text(f"""
|
|
|
|
|
+ INSERT INTO {self.TARGET_SCHEMA}.{self.TARGET_TABLE}
|
|
|
|
|
+ (warehouse, total_stock, create_time, update_time)
|
|
|
|
|
+ VALUES
|
|
|
|
|
+ (:warehouse, :total_stock, :create_time, :update_time)
|
|
|
|
|
+ """)
|
|
|
|
|
+
|
|
|
|
|
+ for data in data_list:
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.session.execute(
|
|
|
|
|
+ insert_sql,
|
|
|
|
|
+ {
|
|
|
|
|
+ "warehouse": data["warehouse"],
|
|
|
|
|
+ "total_stock": data["total_stock"],
|
|
|
|
|
+ "create_time": current_time,
|
|
|
|
|
+ "update_time": current_time,
|
|
|
|
|
+ },
|
|
|
|
|
+ )
|
|
|
|
|
+ self.processed_count += 1
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ self.error_count += 1
|
|
|
|
|
+ logger.error(f"插入数据失败: {str(e)}, 数据: {data}")
|
|
|
|
|
+
|
|
|
|
|
+ self.session.commit()
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ f"数据加载完成: 成功 {self.processed_count} 条, 失败 {self.error_count} 条"
|
|
|
|
|
+ )
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ if self.session:
|
|
|
|
|
+ self.session.rollback()
|
|
|
|
|
+ logger.error(f"加载数据到目标表失败: {str(e)}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ def close(self) -> None:
|
|
|
|
|
+ """关闭数据库连接"""
|
|
|
|
|
+ if self.session:
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.session.close()
|
|
|
|
|
+ logger.info("数据库会话已关闭")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"关闭数据库会话失败: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ if self.engine:
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.engine.dispose()
|
|
|
|
|
+ logger.info("数据库引擎已释放")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"释放数据库引擎失败: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ def run(self) -> dict[str, Any]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 执行完整的 ETL 流程
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 执行结果字典
|
|
|
|
|
+ """
|
|
|
|
|
+ result = {
|
|
|
|
|
+ "success": False,
|
|
|
|
|
+ "processed_count": 0,
|
|
|
|
|
+ "error_count": 0,
|
|
|
|
|
+ "update_mode": self.UPDATE_MODE,
|
|
|
|
|
+ "source_table": f"{self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}",
|
|
|
|
|
+ "target_table": f"{self.TARGET_SCHEMA}.{self.TARGET_TABLE}",
|
|
|
|
|
+ "message": "",
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ logger.info("=" * 60)
|
|
|
|
|
+ logger.info("开始执行数据流程: DF_DO202601130001")
|
|
|
|
|
+ logger.info(f"源表: {self.SOURCE_SCHEMA}.{self.SOURCE_TABLE}")
|
|
|
|
|
+ logger.info(f"目标表: {self.TARGET_SCHEMA}.{self.TARGET_TABLE}")
|
|
|
|
|
+ logger.info(f"更新模式: {self.UPDATE_MODE}")
|
|
|
|
|
+ logger.info("=" * 60)
|
|
|
|
|
+
|
|
|
|
|
+ # 1. 连接数据库
|
|
|
|
|
+ if not self.connect():
|
|
|
|
|
+ result["message"] = "连接数据库失败"
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 确保目标表存在
|
|
|
|
|
+ if not self.ensure_target_table():
|
|
|
|
|
+ result["message"] = "创建目标表失败"
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 提取和转换数据
|
|
|
|
|
+ data_list = self.extract_and_transform()
|
|
|
|
|
+
|
|
|
|
|
+ if not data_list:
|
|
|
|
|
+ result["message"] = "未提取到数据"
|
|
|
|
|
+ result["success"] = True # 没有数据不算失败
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ # 4. 加载到目标表
|
|
|
|
|
+ if self.load_to_target(data_list):
|
|
|
|
|
+ result["success"] = True
|
|
|
|
|
+ result["processed_count"] = self.processed_count
|
|
|
|
|
+ result["error_count"] = self.error_count
|
|
|
|
|
+ result["message"] = (
|
|
|
|
|
+ f"数据流程执行成功: "
|
|
|
|
|
+ f"处理 {self.processed_count} 条, 失败 {self.error_count} 条"
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ result["message"] = "加载数据到目标表失败"
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"数据流程执行异常: {str(e)}")
|
|
|
|
|
+ result["message"] = f"数据流程执行异常: {str(e)}"
|
|
|
|
|
+
|
|
|
|
|
+ finally:
|
|
|
|
|
+ self.close()
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("=" * 60)
|
|
|
|
|
+ logger.info(f"执行结果: {result['message']}")
|
|
|
|
|
+ logger.info("=" * 60)
|
|
|
|
|
+
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def main():
|
|
|
|
|
+ """主函数"""
|
|
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
|
|
+ description="DF_DO202601130001 - 仓库库存汇总表数据流程"
|
|
|
|
|
+ )
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ "--db-uri",
|
|
|
|
|
+ type=str,
|
|
|
|
|
+ default=None,
|
|
|
|
|
+ help="数据库连接 URI (可选,默认从配置获取)",
|
|
|
|
|
+ )
|
|
|
|
|
+ parser.add_argument(
|
|
|
|
|
+ "--dry-run",
|
|
|
|
|
+ action="store_true",
|
|
|
|
|
+ help="仅测试连接和查询,不执行写入",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ args = parser.parse_args()
|
|
|
|
|
+
|
|
|
|
|
+ # 创建并执行数据流程
|
|
|
|
|
+ flow = WarehouseInventorySummaryFlow(db_uri=args.db_uri)
|
|
|
|
|
+
|
|
|
|
|
+ if args.dry_run:
|
|
|
|
|
+ logger.info("Dry-run 模式: 仅测试连接和查询")
|
|
|
|
|
+ if flow.connect():
|
|
|
|
|
+ data_list = flow.extract_and_transform()
|
|
|
|
|
+ logger.info(f"预览数据 ({len(data_list)} 条):")
|
|
|
|
|
+ for data in data_list:
|
|
|
|
|
+ logger.info(f" {data}")
|
|
|
|
|
+ flow.close()
|
|
|
|
|
+ print("\nDry-run 完成,未执行写入操作")
|
|
|
|
|
+ sys.exit(0)
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("\n连接失败")
|
|
|
|
|
+ sys.exit(1)
|
|
|
|
|
+
|
|
|
|
|
+ result = flow.run()
|
|
|
|
|
+
|
|
|
|
|
+ # 输出结果
|
|
|
|
|
+ print("\n" + "=" * 60)
|
|
|
|
|
+ print(f"数据流程执行结果: {'成功' if result['success'] else '失败'}")
|
|
|
|
|
+ print(f"消息: {result['message']}")
|
|
|
|
|
+ print(f"处理记录数: {result['processed_count']}")
|
|
|
|
|
+ print(f"失败记录数: {result['error_count']}")
|
|
|
|
|
+ print(f"更新模式: {result['update_mode']}")
|
|
|
|
|
+ print(f"源表: {result['source_table']}")
|
|
|
|
|
+ print(f"目标表: {result['target_table']}")
|
|
|
|
|
+ print("=" * 60)
|
|
|
|
|
+
|
|
|
|
|
+ # 设置退出代码
|
|
|
|
|
+ sys.exit(0 if result["success"] else 1)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ main()
|