#!/usr/bin/env python
"""
数据流任务脚本 - 产品库存表的原始数据导入
任务ID: 43
任务名称: 产品库存表的原始数据导入
创建时间: 2026-01-21
更新模式: Append (追加模式)
描述: 从源表 public.product_inventory_table_raw_data 导入数据到目标表 dags.test_product_inventory
"""
from __future__ import annotations
import os
import sys
from datetime import datetime
from typing import Any
import pandas as pd
import psycopg2
from loguru import logger
# 添加项目根目录到Python路径
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.insert(0, PROJECT_ROOT)
# 任务配置
TASK_ID = 43
TASK_NAME = "产品库存表的原始数据导入"
UPDATE_MODE = "append"
# 源数据库配置
SOURCE_CONFIG = {
"host": "192.168.3.143",
"port": 5432,
"database": "dataops",
"user": "postgres",
"password": "dataOps",
}
# 目标数据库配置
TARGET_CONFIG = {
"host": "192.168.3.143",
"port": 5432, # PostgreSQL 默认端口
"database": "dataops",
"user": "postgres",
"password": "dataOps",
}
# 目标表配置
TARGET_SCHEMA = "dags"
TARGET_TABLE = "test_product_inventory"
def get_source_connection() -> psycopg2.extensions.connection:
"""
获取源数据库连接
Returns:
psycopg2 连接对象
"""
conn = psycopg2.connect(
host=SOURCE_CONFIG["host"],
port=SOURCE_CONFIG["port"],
database=SOURCE_CONFIG["database"],
user=SOURCE_CONFIG["user"],
password=SOURCE_CONFIG["password"],
)
logger.info("源数据库连接成功")
return conn
def get_target_connection() -> psycopg2.extensions.connection:
"""
获取目标数据库连接
Returns:
psycopg2 连接对象
"""
conn = psycopg2.connect(
host=TARGET_CONFIG["host"],
port=TARGET_CONFIG["port"],
database=TARGET_CONFIG["database"],
user=TARGET_CONFIG["user"],
password=TARGET_CONFIG["password"],
options=f"-c search_path={TARGET_SCHEMA},public",
)
logger.info("目标数据库连接成功")
return conn
def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
"""
确保目标表存在,如果不存在则创建
Args:
conn: 目标数据库连接
"""
cursor = conn.cursor()
try:
# 检查表是否存在
cursor.execute(
"""
SELECT EXISTS(
SELECT 1 FROM information_schema.tables
WHERE table_schema = %s
AND table_name = %s
)
""",
(TARGET_SCHEMA, TARGET_TABLE),
)
result = cursor.fetchone()
exists = result[0] if result else False
if not exists:
logger.info(f"目标表不存在,正在创建 {TARGET_SCHEMA}.{TARGET_TABLE}...")
# 根据任务描述中的 DDL 创建表
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{TARGET_TABLE} (
id SERIAL,
sku VARCHAR(50),
product_name VARCHAR(200),
category VARCHAR(100),
brand VARCHAR(100),
supplier VARCHAR(200),
warehouse VARCHAR(100),
current_stock INTEGER,
safety_stock INTEGER,
max_stock INTEGER,
unit_cost NUMERIC(10, 2),
selling_price NUMERIC(10, 2),
stock_status VARCHAR(50),
last_inbound_date DATE,
last_outbound_date DATE,
inbound_quantity_30d INTEGER,
outbound_quantity_30d INTEGER,
turnover_rate NUMERIC(5, 2),
is_active BOOLEAN,
created_at TIMESTAMP,
updated_at TIMESTAMP,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
cursor.execute(create_table_sql)
# 添加表注释
cursor.execute(
f"COMMENT ON TABLE {TARGET_SCHEMA}.{TARGET_TABLE} IS '产品库存表'"
)
# 添加列注释
column_comments = {
"id": "ID",
"sku": "SKU",
"product_name": "产品名称",
"category": "类别",
"brand": "品牌",
"supplier": "供应商",
"warehouse": "仓库",
"current_stock": "当前库存",
"safety_stock": "安全库存",
"max_stock": "最大库存",
"unit_cost": "单位成本",
"selling_price": "销售价格",
"stock_status": "库存状态",
"last_inbound_date": "最近入库日期",
"last_outbound_date": "最近出库日期",
"inbound_quantity_30d": "30天入库数量",
"outbound_quantity_30d": "30天出库数量",
"turnover_rate": "周转率",
"is_active": "是否启用",
"created_at": "创建时间",
"updated_at": "更新时间",
"create_time": "数据创建时间",
}
for col, comment in column_comments.items():
try:
cursor.execute(
f"COMMENT ON COLUMN {TARGET_SCHEMA}.{TARGET_TABLE}.{col} IS %s",
(comment,),
)
except Exception as e:
logger.warning(f"添加列注释失败 {col}: {e}")
conn.commit()
logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 创建成功")
else:
logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 已存在")
except Exception as e:
conn.rollback()
logger.error(f"创建目标表失败: {e}")
raise
finally:
cursor.close()
def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
"""
从源表提取数据
Args:
conn: 源数据库连接
Returns:
包含源数据的DataFrame
"""
query = """
SELECT
id,
sku,
product_name,
category,
brand,
supplier,
warehouse,
current_stock,
safety_stock,
max_stock,
unit_cost,
selling_price,
stock_status,
last_inbound_date,
last_outbound_date,
inbound_quantity_30d,
outbound_quantity_30d,
turnover_rate,
is_active,
created_at,
updated_at
FROM public.product_inventory_table_raw_data
"""
logger.info("正在从源表提取数据...")
try:
df = pd.read_sql(query, conn)
logger.info(f"成功提取 {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"提取源数据失败: {e}")
raise
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""
数据转换处理
Args:
df: 源数据DataFrame
Returns:
转换后的DataFrame
"""
logger.info("正在执行数据转换...")
# 本任务为简单导入,无需复杂转换
# 源表和目标表字段基本一致,直接映射
logger.info(f"数据转换完成,共 {len(df)} 条记录")
return df
def load_to_target(
df: pd.DataFrame,
conn: psycopg2.extensions.connection,
update_mode: str = "append",
batch_size: int = 1000,
) -> int:
"""
将数据加载到目标表
Args:
df: 要加载的DataFrame
conn: 目标数据库连接
update_mode: 更新模式(append 或 full)
batch_size: 批量插入大小
Returns:
插入的记录数
"""
if df.empty:
logger.warning("没有数据需要加载")
return 0
logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
target_table = f"{TARGET_SCHEMA}.{TARGET_TABLE}"
cursor = conn.cursor()
inserted_count = 0
try:
# 全量更新模式:先清空目标表
if update_mode.lower() == "full":
logger.info("全量更新模式:清空目标表...")
cursor.execute(f"TRUNCATE TABLE {target_table}")
logger.info("目标表已清空")
# 目标表结构准备插入的列(不包含 create_time,由数据库自动设置)
columns = [
"sku",
"product_name",
"category",
"brand",
"supplier",
"warehouse",
"current_stock",
"safety_stock",
"max_stock",
"unit_cost",
"selling_price",
"stock_status",
"last_inbound_date",
"last_outbound_date",
"inbound_quantity_30d",
"outbound_quantity_30d",
"turnover_rate",
"is_active",
"created_at",
"updated_at",
]
# 构建插入SQL
placeholders = ", ".join(["%s"] * len(columns))
column_names = ", ".join(columns)
insert_sql = (
f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
)
# 批量插入
for i in range(0, len(df), batch_size):
batch_df = df.iloc[i : i + batch_size]
records = []
for _, row in batch_df.iterrows():
record = tuple(
None if pd.isna(row.get(col)) else row.get(col) for col in columns
)
records.append(record)
cursor.executemany(insert_sql, records)
inserted_count += len(records)
logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
conn.commit()
logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
return inserted_count
except Exception as e:
conn.rollback()
logger.error(f"数据加载失败: {e}")
raise
finally:
cursor.close()
def main() -> dict[str, Any]:
"""
主函数:执行ETL流程
Returns:
执行结果字典
"""
result = {
"task_id": TASK_ID,
"task_name": TASK_NAME,
"status": "failed",
"records_extracted": 0,
"records_loaded": 0,
"error_message": None,
"execution_time": None,
}
start_time = datetime.now()
source_conn = None
target_conn = None
try:
logger.info("=" * 60)
logger.info(f"任务开始: {TASK_NAME}")
logger.info("=" * 60)
# 步骤1: 建立数据库连接
logger.info("[Step 1/5] 建立数据库连接...")
source_conn = get_source_connection()
target_conn = get_target_connection()
# 步骤2: 确保目标表存在(重要:必须在数据加载前执行)
logger.info("[Step 2/5] 检查/创建目标表...")
ensure_target_table_exists(target_conn)
# 步骤3: 从源表提取数据
logger.info("[Step 3/5] 提取源数据...")
df = extract_source_data(source_conn)
result["records_extracted"] = len(df)
# 步骤4: 数据转换
logger.info("[Step 4/5] 数据转换...")
df_transformed = transform_data(df)
# 步骤5: 加载到目标表
logger.info("[Step 5/5] 加载数据到目标表...")
records_loaded = load_to_target(
df_transformed, target_conn, update_mode=UPDATE_MODE
)
result["records_loaded"] = records_loaded
result["status"] = "success"
logger.info("=" * 60)
logger.info(
f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}"
)
logger.info("=" * 60)
except Exception as e:
result["status"] = "failed"
result["error_message"] = str(e)
logger.error(f"任务执行失败: {e}")
raise
finally:
# 关闭数据库连接
if source_conn:
source_conn.close()
logger.debug("源数据库连接已关闭")
if target_conn:
target_conn.close()
logger.debug("目标数据库连接已关闭")
result["execution_time"] = str(datetime.now() - start_time)
return result
if __name__ == "__main__":
# 配置日志
# 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
logger.remove()
logger.add(
sys.stdout,
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
)
logger.add(
os.path.join(PROJECT_ROOT, "logs", f"task_{TASK_ID}.log"),
level="DEBUG",
rotation="10 MB",
retention="7 days",
encoding="utf-8",
)
try:
result = main()
if result["status"] == "success":
sys.exit(0)
else:
sys.exit(1)
except Exception as e:
logger.exception(f"脚本执行异常: {e}")
sys.exit(1)