#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
任务ID: 37
任务名称: 产品库存表原始数据导入
任务描述: 把产品库存表的原始数据导入到数据资源的产品库存表中
更新模式: Append (追加模式)
源表: product_inventory_table_raw_data (原始数据表)
目标表: dags.test_product_inventory (数据资源表)
创建时间: 2026-01-16
"""
from __future__ import annotations
import os
import sys
from datetime import datetime
from typing import Any
import pandas as pd
import psycopg2
from loguru import logger
# 添加项目根目录到Python路径
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.insert(0, PROJECT_ROOT)
from app.config.config import config, current_env
# 获取配置
app_config = config[current_env]
def get_source_connection() -> psycopg2.extensions.connection:
"""
获取源数据库连接(原始数据库)
Returns:
psycopg2 连接对象
"""
# 解析应用数据库URI作为源数据库
# 实际使用时可根据任务描述中的数据源配置调整
db_uri = app_config.SQLALCHEMY_DATABASE_URI
# 解析连接字符串
# 格式: postgresql://user:password@host:port/database
if db_uri.startswith("postgresql://"):
db_uri = db_uri[13:] # 移除 postgresql://
# 分割用户信息和主机信息
user_part, host_part = db_uri.split("@")
username, password = user_part.split(":")
host_db = host_part.split("/")
host_port = host_db[0].split(":")
host = host_port[0]
port = int(host_port[1]) if len(host_port) > 1 else 5432
database = host_db[1] if len(host_db) > 1 else "dataops"
conn = psycopg2.connect(
host=host,
port=port,
database=database,
user=username,
password=password,
)
logger.info(f"源数据库连接成功: {host}:{port}/{database}")
return conn
def get_target_connection() -> psycopg2.extensions.connection:
"""
获取目标数据库连接(数据资源数据库)
根据任务描述,目标数据库:
- Host: 192.168.3.143
- Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
- Database: dataops
- Schema: dags
Returns:
psycopg2 连接对象
"""
conn = psycopg2.connect(
host="192.168.3.143",
port=5432,
database="dataops",
user="postgres",
password="dataOps",
options="-c search_path=dags,public",
)
logger.info("目标数据库连接成功: 192.168.3.143:5432/dataops (schema: dags)")
return conn
def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
"""
从源表提取数据
Args:
conn: 源数据库连接
Returns:
包含源数据的DataFrame
"""
query = """
SELECT
sku,
product_name,
category,
brand,
supplier,
warehouse,
current_stock,
safety_stock,
max_stock,
unit_cost,
selling_price,
stock_status,
last_inbound_date,
last_outbound_date,
inbound_quantity_30d,
outbound_quantity_30d,
turnover_rate,
is_active,
created_at,
updated_at
FROM product_inventory_table_raw_data
"""
logger.info("正在从源表提取数据...")
try:
df = pd.read_sql(query, conn)
logger.info(f"成功提取 {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"提取源数据失败: {e}")
raise
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""
数据转换处理
根据任务描述:直接导入,无需特殊转换
Args:
df: 源数据DataFrame
Returns:
转换后的DataFrame
"""
logger.info("正在执行数据转换...")
# 确保列名与目标表匹配(不区分大小写)
df.columns = df.columns.str.lower()
# 添加 create_time 字段(目标表可能需要)
if "create_time" not in df.columns:
df["create_time"] = datetime.now()
logger.info(f"数据转换完成,共 {len(df)} 条记录")
return df
def load_to_target(
df: pd.DataFrame,
conn: psycopg2.extensions.connection,
batch_size: int = 1000,
) -> int:
"""
将数据加载到目标表(追加模式)
Args:
df: 要加载的DataFrame
conn: 目标数据库连接
batch_size: 批量插入大小
Returns:
插入的记录数
"""
if df.empty:
logger.warning("没有数据需要加载")
return 0
logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
target_table = "test_product_inventory"
# 准备插入的列
columns = [
"sku", "product_name", "category", "brand", "supplier", "warehouse",
"current_stock", "safety_stock", "max_stock", "unit_cost", "selling_price",
"stock_status", "last_inbound_date", "last_outbound_date",
"inbound_quantity_30d", "outbound_quantity_30d", "turnover_rate",
"is_active", "created_at", "updated_at"
]
# 构建插入SQL
placeholders = ", ".join(["%s"] * len(columns))
column_names = ", ".join(columns)
insert_sql = f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
cursor = conn.cursor()
inserted_count = 0
try:
for i in range(0, len(df), batch_size):
batch_df = df.iloc[i:i + batch_size]
records = []
for _, row in batch_df.iterrows():
record = tuple(row[col] if col in row.index else None for col in columns)
records.append(record)
cursor.executemany(insert_sql, records)
inserted_count += len(records)
logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
conn.commit()
logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
return inserted_count
except Exception as e:
conn.rollback()
logger.error(f"数据加载失败: {e}")
raise
finally:
cursor.close()
def main() -> dict[str, Any]:
"""
主函数:执行ETL流程
Returns:
执行结果字典
"""
result = {
"task_id": 37,
"task_name": "产品库存表原始数据导入",
"status": "failed",
"records_extracted": 0,
"records_loaded": 0,
"error_message": None,
"execution_time": None,
}
start_time = datetime.now()
source_conn = None
target_conn = None
try:
logger.info("=" * 60)
logger.info("任务开始: 产品库存表原始数据导入")
logger.info("=" * 60)
# 步骤1: 建立数据库连接
logger.info("[Step 1/4] 建立数据库连接...")
source_conn = get_source_connection()
target_conn = get_target_connection()
# 步骤2: 从源表提取数据
logger.info("[Step 2/4] 提取源数据...")
df = extract_source_data(source_conn)
result["records_extracted"] = len(df)
# 步骤3: 数据转换
logger.info("[Step 3/4] 数据转换...")
df_transformed = transform_data(df)
# 步骤4: 加载到目标表(追加模式)
logger.info("[Step 4/4] 加载数据到目标表...")
records_loaded = load_to_target(df_transformed, target_conn)
result["records_loaded"] = records_loaded
result["status"] = "success"
logger.info("=" * 60)
logger.info(f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}")
logger.info("=" * 60)
except Exception as e:
result["status"] = "failed"
result["error_message"] = str(e)
logger.error(f"任务执行失败: {e}")
raise
finally:
# 关闭数据库连接
if source_conn:
source_conn.close()
logger.debug("源数据库连接已关闭")
if target_conn:
target_conn.close()
logger.debug("目标数据库连接已关闭")
result["execution_time"] = str(datetime.now() - start_time)
return result
if __name__ == "__main__":
# 配置日志
logger.remove()
logger.add(
sys.stderr,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
)
logger.add(
os.path.join(PROJECT_ROOT, "logs", "task_37_产品库存表原始数据导入.log"),
level="DEBUG",
rotation="10 MB",
retention="7 days",
encoding="utf-8",
)
try:
result = main()
if result["status"] == "success":
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
logger.exception(f"脚本执行异常: {e}")
sys.exit(1)