| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- #!/usr/bin/env python
- """
- 任务ID: 38
- 任务名称: DF_DO202601160001
- 任务描述: 仓库库存汇总统计
- 1. 从标签为数据资源的产品库存表中提取字段:仓库编号、库存数量
- 2. 按照仓库进行分组,对库存数量进行求和计算
- 3. 无特殊过滤条件
- 4. 最终输出数据格式包含字段:仓库编号、总库存数量
- 更新模式: Full Refresh (全量更新)
- 源表: dags.test_product_inventory (数据资源-产品库存表)
- 目标表: dags.warehouse_inventory_summary (仓库库存汇总表)
- 创建时间: 2026-01-16
- """
- from __future__ import annotations
- import os
- import sys
- from datetime import datetime
- from typing import Any
- import pandas as pd
- import psycopg2
- from loguru import logger
- # 添加项目根目录到Python路径
- PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
- sys.path.insert(0, PROJECT_ROOT)
- from app.config.config import config, current_env
- # 获取配置
- app_config = config[current_env]
- def get_database_connection() -> psycopg2.extensions.connection:
- """
- 获取数据库连接
- 根据任务描述,数据库配置:
- - Host: 192.168.3.143
- - Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
- - Database: dataops
- - Schema: dags (源表 test_product_inventory 和目标表 warehouse_inventory_summary 都在 dags schema)
- Returns:
- psycopg2 连接对象
- """
- conn = psycopg2.connect(
- host="192.168.3.143",
- port=5432,
- database="dataops",
- user="postgres",
- password="dataOps",
- options="-c search_path=dags,public", # 确保可以访问 dags 和 public schema
- )
- logger.info("数据库连接成功: 192.168.3.143:5432/dataops (schema: dags,public)")
- return conn
- def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
- """
- 确保目标表存在,如果不存在则创建
- Args:
- conn: 数据库连接
- """
- cursor = conn.cursor()
- target_table = "warehouse_inventory_summary"
- target_schema = "dags"
- try:
- # 检查表是否存在
- cursor.execute(
- """
- SELECT EXISTS(
- SELECT 1 FROM information_schema.tables
- WHERE table_schema = %s
- AND table_name = %s
- )
- """,
- (target_schema, target_table),
- )
- result = cursor.fetchone()
- exists = result[0] if result else False
- if not exists:
- logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...")
- # PostgreSQL 不支持在列定义中使用 COMMENT,需要分开
- create_table_sql = f"""
- CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
- id BIGSERIAL PRIMARY KEY,
- warehouse VARCHAR(100) NOT NULL,
- total_stock INTEGER NOT NULL DEFAULT 0,
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- );
- """
- cursor.execute(create_table_sql)
- # 添加注释
- cursor.execute(
- f"COMMENT ON TABLE {target_schema}.{target_table} IS '仓库库存汇总表'"
- )
- cursor.execute(
- f"COMMENT ON COLUMN {target_schema}.{target_table}.warehouse IS '仓库编号'"
- )
- cursor.execute(
- f"COMMENT ON COLUMN {target_schema}.{target_table}.total_stock IS '总库存数量'"
- )
- cursor.execute(
- f"COMMENT ON COLUMN {target_schema}.{target_table}.create_time IS '数据创建时间'"
- )
- conn.commit()
- logger.info(f"目标表 {target_schema}.{target_table} 创建成功")
- else:
- logger.info(f"目标表 {target_schema}.{target_table} 已存在")
- except Exception as e:
- conn.rollback()
- logger.error(f"创建目标表失败: {e}")
- raise
- finally:
- cursor.close()
- def extract_and_transform(conn: psycopg2.extensions.connection) -> pd.DataFrame:
- """
- 从源表提取数据并进行转换
- 根据任务描述:
- 1. 从产品库存表中提取字段:仓库编号(warehouse)、库存数量(current_stock)
- 2. 按照仓库进行分组,对库存数量进行求和计算
- Args:
- conn: 数据库连接
- Returns:
- 转换后的DataFrame,包含 warehouse 和 total_stock 列
- """
- # 源表位于 dags schema(由任务 37 创建)
- query = """
- SELECT
- warehouse,
- SUM(current_stock) AS total_stock
- FROM dags.test_product_inventory
- GROUP BY warehouse
- ORDER BY warehouse
- """
- logger.info("正在从源表提取并汇总数据...")
- try:
- df = pd.read_sql(query, conn)
- logger.info(f"成功汇总 {len(df)} 个仓库的库存数据")
- return df
- except Exception as e:
- logger.error(f"数据提取转换失败: {e}")
- raise
- def load_to_target(
- df: pd.DataFrame,
- conn: psycopg2.extensions.connection,
- ) -> int:
- """
- 将数据加载到目标表(全量更新模式)
- Args:
- df: 要加载的DataFrame
- conn: 数据库连接
- Returns:
- 插入的记录数
- """
- if df.empty:
- logger.warning("没有数据需要加载")
- return 0
- cursor = conn.cursor()
- target_table = "dags.warehouse_inventory_summary"
- try:
- # 全量更新模式:先清空目标表
- logger.info("全量更新模式:清空目标表...")
- cursor.execute(f"TRUNCATE TABLE {target_table}")
- logger.info("目标表已清空")
- # 插入新数据
- insert_sql = f"""
- INSERT INTO {target_table} (warehouse, total_stock, create_time)
- VALUES (%s, %s, %s)
- """
- current_time = datetime.now()
- records = [
- (row["warehouse"], int(row["total_stock"]), current_time)
- for _, row in df.iterrows()
- ]
- cursor.executemany(insert_sql, records)
- conn.commit()
- inserted_count = len(records)
- logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
- return inserted_count
- except Exception as e:
- conn.rollback()
- logger.error(f"数据加载失败: {e}")
- raise
- finally:
- cursor.close()
- def main() -> dict[str, Any]:
- """
- 主函数:执行ETL流程
- Returns:
- 执行结果字典
- """
- result = {
- "task_id": 38,
- "task_name": "DF_DO202601160001",
- "status": "failed",
- "warehouses_processed": 0,
- "records_loaded": 0,
- "error_message": None,
- "execution_time": None,
- }
- start_time = datetime.now()
- conn = None
- try:
- logger.info("=" * 60)
- logger.info("任务开始: DF_DO202601160001 - 仓库库存汇总")
- logger.info("=" * 60)
- # 步骤1: 建立数据库连接
- logger.info("[Step 1/4] 建立数据库连接...")
- conn = get_database_connection()
- # 步骤2: 确保目标表存在
- logger.info("[Step 2/4] 检查/创建目标表...")
- ensure_target_table_exists(conn)
- # 步骤3: 提取并转换数据
- logger.info("[Step 3/4] 提取并转换数据...")
- df = extract_and_transform(conn)
- result["warehouses_processed"] = len(df)
- # 输出汇总结果预览
- if not df.empty:
- logger.info("仓库库存汇总预览:")
- for _, row in df.iterrows():
- logger.info(f" {row['warehouse']}: {row['total_stock']:,} 件")
- # 步骤4: 加载到目标表(全量更新)
- logger.info("[Step 4/4] 加载数据到目标表...")
- records_loaded = load_to_target(df, conn)
- result["records_loaded"] = records_loaded
- result["status"] = "success"
- logger.info("=" * 60)
- logger.info(
- f"任务完成! 处理仓库数: {result['warehouses_processed']}, 加载记录数: {result['records_loaded']}"
- )
- logger.info("=" * 60)
- except Exception as e:
- result["status"] = "failed"
- result["error_message"] = str(e)
- logger.error(f"任务执行失败: {e}")
- raise
- finally:
- # 关闭数据库连接
- if conn:
- conn.close()
- logger.debug("数据库连接已关闭")
- result["execution_time"] = str(datetime.now() - start_time)
- return result
- if __name__ == "__main__":
- # 配置日志
- # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
- logger.remove()
- logger.add(
- sys.stdout,
- level="INFO",
- format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
- )
- logger.add(
- os.path.join(PROJECT_ROOT, "logs", "task_38_DF_DO202601160001.log"),
- level="DEBUG",
- rotation="10 MB",
- retention="7 days",
- encoding="utf-8",
- )
- try:
- result = main()
- if result["status"] == "success":
- sys.exit(0)
- else:
- sys.exit(1)
- except Exception as e:
- logger.exception(f"脚本执行异常: {e}")
- sys.exit(1)
|