#!/usr/bin/env python
"""
任务ID: 38
任务名称: DF_DO202601160001
任务描述: 仓库库存汇总统计
1. 从标签为数据资源的产品库存表中提取字段:仓库编号、库存数量
2. 按照仓库进行分组,对库存数量进行求和计算
3. 无特殊过滤条件
4. 最终输出数据格式包含字段:仓库编号、总库存数量
更新模式: Full Refresh (全量更新)
源表: dags.test_product_inventory (数据资源-产品库存表)
目标表: dags.warehouse_inventory_summary (仓库库存汇总表)
创建时间: 2026-01-16
"""
from __future__ import annotations
import os
import sys
from datetime import datetime
from typing import Any
import pandas as pd
import psycopg2
from loguru import logger
# 添加项目根目录到Python路径
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.insert(0, PROJECT_ROOT)
from app.config.config import config, current_env
# 获取配置
app_config = config[current_env]
def get_database_connection() -> psycopg2.extensions.connection:
"""
获取数据库连接
根据任务描述,数据库配置:
- Host: 192.168.3.143
- Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
- Database: dataops
- Schema: dags (源表 test_product_inventory 和目标表 warehouse_inventory_summary 都在 dags schema)
Returns:
psycopg2 连接对象
"""
conn = psycopg2.connect(
host="192.168.3.143",
port=5432,
database="dataops",
user="postgres",
password="dataOps",
options="-c search_path=dags,public", # 确保可以访问 dags 和 public schema
)
logger.info("数据库连接成功: 192.168.3.143:5432/dataops (schema: dags,public)")
return conn
def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
"""
确保目标表存在,如果不存在则创建
Args:
conn: 数据库连接
"""
cursor = conn.cursor()
target_table = "warehouse_inventory_summary"
target_schema = "dags"
try:
# 检查表是否存在
cursor.execute(
"""
SELECT EXISTS(
SELECT 1 FROM information_schema.tables
WHERE table_schema = %s
AND table_name = %s
)
""",
(target_schema, target_table),
)
result = cursor.fetchone()
exists = result[0] if result else False
if not exists:
logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...")
# PostgreSQL 不支持在列定义中使用 COMMENT,需要分开
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
id BIGSERIAL PRIMARY KEY,
warehouse VARCHAR(100) NOT NULL,
total_stock INTEGER NOT NULL DEFAULT 0,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
cursor.execute(create_table_sql)
# 添加注释
cursor.execute(
f"COMMENT ON TABLE {target_schema}.{target_table} IS '仓库库存汇总表'"
)
cursor.execute(
f"COMMENT ON COLUMN {target_schema}.{target_table}.warehouse IS '仓库编号'"
)
cursor.execute(
f"COMMENT ON COLUMN {target_schema}.{target_table}.total_stock IS '总库存数量'"
)
cursor.execute(
f"COMMENT ON COLUMN {target_schema}.{target_table}.create_time IS '数据创建时间'"
)
conn.commit()
logger.info(f"目标表 {target_schema}.{target_table} 创建成功")
else:
logger.info(f"目标表 {target_schema}.{target_table} 已存在")
except Exception as e:
conn.rollback()
logger.error(f"创建目标表失败: {e}")
raise
finally:
cursor.close()
def extract_and_transform(conn: psycopg2.extensions.connection) -> pd.DataFrame:
"""
从源表提取数据并进行转换
根据任务描述:
1. 从产品库存表中提取字段:仓库编号(warehouse)、库存数量(current_stock)
2. 按照仓库进行分组,对库存数量进行求和计算
Args:
conn: 数据库连接
Returns:
转换后的DataFrame,包含 warehouse 和 total_stock 列
"""
# 源表位于 dags schema(由任务 37 创建)
query = """
SELECT
warehouse,
SUM(current_stock) AS total_stock
FROM dags.test_product_inventory
GROUP BY warehouse
ORDER BY warehouse
"""
logger.info("正在从源表提取并汇总数据...")
try:
df = pd.read_sql(query, conn)
logger.info(f"成功汇总 {len(df)} 个仓库的库存数据")
return df
except Exception as e:
logger.error(f"数据提取转换失败: {e}")
raise
def load_to_target(
df: pd.DataFrame,
conn: psycopg2.extensions.connection,
) -> int:
"""
将数据加载到目标表(全量更新模式)
Args:
df: 要加载的DataFrame
conn: 数据库连接
Returns:
插入的记录数
"""
if df.empty:
logger.warning("没有数据需要加载")
return 0
cursor = conn.cursor()
target_table = "dags.warehouse_inventory_summary"
try:
# 全量更新模式:先清空目标表
logger.info("全量更新模式:清空目标表...")
cursor.execute(f"TRUNCATE TABLE {target_table}")
logger.info("目标表已清空")
# 插入新数据
insert_sql = f"""
INSERT INTO {target_table} (warehouse, total_stock, create_time)
VALUES (%s, %s, %s)
"""
current_time = datetime.now()
records = [
(row["warehouse"], int(row["total_stock"]), current_time)
for _, row in df.iterrows()
]
cursor.executemany(insert_sql, records)
conn.commit()
inserted_count = len(records)
logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
return inserted_count
except Exception as e:
conn.rollback()
logger.error(f"数据加载失败: {e}")
raise
finally:
cursor.close()
def main() -> dict[str, Any]:
"""
主函数:执行ETL流程
Returns:
执行结果字典
"""
result = {
"task_id": 38,
"task_name": "DF_DO202601160001",
"status": "failed",
"warehouses_processed": 0,
"records_loaded": 0,
"error_message": None,
"execution_time": None,
}
start_time = datetime.now()
conn = None
try:
logger.info("=" * 60)
logger.info("任务开始: DF_DO202601160001 - 仓库库存汇总")
logger.info("=" * 60)
# 步骤1: 建立数据库连接
logger.info("[Step 1/4] 建立数据库连接...")
conn = get_database_connection()
# 步骤2: 确保目标表存在
logger.info("[Step 2/4] 检查/创建目标表...")
ensure_target_table_exists(conn)
# 步骤3: 提取并转换数据
logger.info("[Step 3/4] 提取并转换数据...")
df = extract_and_transform(conn)
result["warehouses_processed"] = len(df)
# 输出汇总结果预览
if not df.empty:
logger.info("仓库库存汇总预览:")
for _, row in df.iterrows():
logger.info(f" {row['warehouse']}: {row['total_stock']:,} 件")
# 步骤4: 加载到目标表(全量更新)
logger.info("[Step 4/4] 加载数据到目标表...")
records_loaded = load_to_target(df, conn)
result["records_loaded"] = records_loaded
result["status"] = "success"
logger.info("=" * 60)
logger.info(
f"任务完成! 处理仓库数: {result['warehouses_processed']}, 加载记录数: {result['records_loaded']}"
)
logger.info("=" * 60)
except Exception as e:
result["status"] = "failed"
result["error_message"] = str(e)
logger.error(f"任务执行失败: {e}")
raise
finally:
# 关闭数据库连接
if conn:
conn.close()
logger.debug("数据库连接已关闭")
result["execution_time"] = str(datetime.now() - start_time)
return result
if __name__ == "__main__":
# 配置日志
# 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
logger.remove()
logger.add(
sys.stdout,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
)
logger.add(
os.path.join(PROJECT_ROOT, "logs", "task_38_DF_DO202601160001.log"),
level="DEBUG",
rotation="10 MB",
retention="7 days",
encoding="utf-8",
)
try:
result = main()
if result["status"] == "success":
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
logger.exception(f"脚本执行异常: {e}")
sys.exit(1)