#!/usr/bin/env python """ 任务ID: 38 任务名称: DF_DO202601160001 任务描述: 仓库库存汇总统计 1. 从标签为数据资源的产品库存表中提取字段:仓库编号、库存数量 2. 按照仓库进行分组,对库存数量进行求和计算 3. 无特殊过滤条件 4. 最终输出数据格式包含字段:仓库编号、总库存数量 更新模式: Full Refresh (全量更新) 源表: dags.test_product_inventory (数据资源-产品库存表) 目标表: dags.warehouse_inventory_summary (仓库库存汇总表) 创建时间: 2026-01-16 """ from __future__ import annotations import os import sys from datetime import datetime from typing import Any import pandas as pd import psycopg2 from loguru import logger # 添加项目根目录到Python路径 PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) sys.path.insert(0, PROJECT_ROOT) from app.config.config import config, current_env # 获取配置 app_config = config[current_env] def get_database_connection() -> psycopg2.extensions.connection: """ 获取数据库连接 根据任务描述,数据库配置: - Host: 192.168.3.143 - Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误) - Database: dataops - Schema: dags (源表 test_product_inventory 和目标表 warehouse_inventory_summary 都在 dags schema) Returns: psycopg2 连接对象 """ conn = psycopg2.connect( host="192.168.3.143", port=5432, database="dataops", user="postgres", password="dataOps", options="-c search_path=dags,public", # 确保可以访问 dags 和 public schema ) logger.info("数据库连接成功: 192.168.3.143:5432/dataops (schema: dags,public)") return conn def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None: """ 确保目标表存在,如果不存在则创建 Args: conn: 数据库连接 """ cursor = conn.cursor() target_table = "warehouse_inventory_summary" target_schema = "dags" try: # 检查表是否存在 cursor.execute( """ SELECT EXISTS( SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s ) """, (target_schema, target_table), ) result = cursor.fetchone() exists = result[0] if result else False if not exists: logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...") # PostgreSQL 不支持在列定义中使用 COMMENT,需要分开 create_table_sql = f""" CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} ( id BIGSERIAL PRIMARY KEY, warehouse VARCHAR(100) NOT NULL, total_stock INTEGER NOT NULL DEFAULT 0, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ cursor.execute(create_table_sql) # 添加注释 cursor.execute( f"COMMENT ON TABLE {target_schema}.{target_table} IS '仓库库存汇总表'" ) cursor.execute( f"COMMENT ON COLUMN {target_schema}.{target_table}.warehouse IS '仓库编号'" ) cursor.execute( f"COMMENT ON COLUMN {target_schema}.{target_table}.total_stock IS '总库存数量'" ) cursor.execute( f"COMMENT ON COLUMN {target_schema}.{target_table}.create_time IS '数据创建时间'" ) conn.commit() logger.info(f"目标表 {target_schema}.{target_table} 创建成功") else: logger.info(f"目标表 {target_schema}.{target_table} 已存在") except Exception as e: conn.rollback() logger.error(f"创建目标表失败: {e}") raise finally: cursor.close() def extract_and_transform(conn: psycopg2.extensions.connection) -> pd.DataFrame: """ 从源表提取数据并进行转换 根据任务描述: 1. 从产品库存表中提取字段:仓库编号(warehouse)、库存数量(current_stock) 2. 按照仓库进行分组,对库存数量进行求和计算 Args: conn: 数据库连接 Returns: 转换后的DataFrame,包含 warehouse 和 total_stock 列 """ # 源表位于 dags schema(由任务 37 创建) query = """ SELECT warehouse, SUM(current_stock) AS total_stock FROM dags.test_product_inventory GROUP BY warehouse ORDER BY warehouse """ logger.info("正在从源表提取并汇总数据...") try: df = pd.read_sql(query, conn) logger.info(f"成功汇总 {len(df)} 个仓库的库存数据") return df except Exception as e: logger.error(f"数据提取转换失败: {e}") raise def load_to_target( df: pd.DataFrame, conn: psycopg2.extensions.connection, ) -> int: """ 将数据加载到目标表(全量更新模式) Args: df: 要加载的DataFrame conn: 数据库连接 Returns: 插入的记录数 """ if df.empty: logger.warning("没有数据需要加载") return 0 cursor = conn.cursor() target_table = "dags.warehouse_inventory_summary" try: # 全量更新模式:先清空目标表 logger.info("全量更新模式:清空目标表...") cursor.execute(f"TRUNCATE TABLE {target_table}") logger.info("目标表已清空") # 插入新数据 insert_sql = f""" INSERT INTO {target_table} (warehouse, total_stock, create_time) VALUES (%s, %s, %s) """ current_time = datetime.now() records = [ (row["warehouse"], int(row["total_stock"]), current_time) for _, row in df.iterrows() ] cursor.executemany(insert_sql, records) conn.commit() inserted_count = len(records) logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}") return inserted_count except Exception as e: conn.rollback() logger.error(f"数据加载失败: {e}") raise finally: cursor.close() def main() -> dict[str, Any]: """ 主函数:执行ETL流程 Returns: 执行结果字典 """ result = { "task_id": 38, "task_name": "DF_DO202601160001", "status": "failed", "warehouses_processed": 0, "records_loaded": 0, "error_message": None, "execution_time": None, } start_time = datetime.now() conn = None try: logger.info("=" * 60) logger.info("任务开始: DF_DO202601160001 - 仓库库存汇总") logger.info("=" * 60) # 步骤1: 建立数据库连接 logger.info("[Step 1/4] 建立数据库连接...") conn = get_database_connection() # 步骤2: 确保目标表存在 logger.info("[Step 2/4] 检查/创建目标表...") ensure_target_table_exists(conn) # 步骤3: 提取并转换数据 logger.info("[Step 3/4] 提取并转换数据...") df = extract_and_transform(conn) result["warehouses_processed"] = len(df) # 输出汇总结果预览 if not df.empty: logger.info("仓库库存汇总预览:") for _, row in df.iterrows(): logger.info(f" {row['warehouse']}: {row['total_stock']:,} 件") # 步骤4: 加载到目标表(全量更新) logger.info("[Step 4/4] 加载数据到目标表...") records_loaded = load_to_target(df, conn) result["records_loaded"] = records_loaded result["status"] = "success" logger.info("=" * 60) logger.info( f"任务完成! 处理仓库数: {result['warehouses_processed']}, 加载记录数: {result['records_loaded']}" ) logger.info("=" * 60) except Exception as e: result["status"] = "failed" result["error_message"] = str(e) logger.error(f"任务执行失败: {e}") raise finally: # 关闭数据库连接 if conn: conn.close() logger.debug("数据库连接已关闭") result["execution_time"] = str(datetime.now() - start_time) return result if __name__ == "__main__": # 配置日志 # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出 logger.remove() logger.add( sys.stdout, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", ) logger.add( os.path.join(PROJECT_ROOT, "logs", "task_38_DF_DO202601160001.log"), level="DEBUG", rotation="10 MB", retention="7 days", encoding="utf-8", ) try: result = main() if result["status"] == "success": sys.exit(0) else: sys.exit(1) except Exception as e: logger.exception(f"脚本执行异常: {e}") sys.exit(1)