| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- #!/usr/bin/env python
- """
- 数据流任务脚本 - 产品库存表的原始数据导入
- 任务ID: 43
- 任务名称: 产品库存表的原始数据导入
- 创建时间: 2026-01-21
- 更新模式: Append (追加模式)
- 描述: 从源表 public.product_inventory_table_raw_data 导入数据到目标表 dags.test_product_inventory
- """
- from __future__ import annotations
- import os
- import sys
- from datetime import datetime
- from typing import Any
- import pandas as pd
- import psycopg2
- from loguru import logger
- # 添加项目根目录到Python路径
- PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
- sys.path.insert(0, PROJECT_ROOT)
- # 任务配置
- TASK_ID = 43
- TASK_NAME = "产品库存表的原始数据导入"
- UPDATE_MODE = "append"
- # 源数据库配置
- SOURCE_CONFIG = {
- "host": "192.168.3.143",
- "port": 5432,
- "database": "dataops",
- "user": "postgres",
- "password": "dataOps",
- }
- # 目标数据库配置
- TARGET_CONFIG = {
- "host": "192.168.3.143",
- "port": 5432, # PostgreSQL 默认端口
- "database": "dataops",
- "user": "postgres",
- "password": "dataOps",
- }
- # 目标表配置
- TARGET_SCHEMA = "dags"
- TARGET_TABLE = "test_product_inventory"
- def get_source_connection() -> psycopg2.extensions.connection:
- """
- 获取源数据库连接
- Returns:
- psycopg2 连接对象
- """
- conn = psycopg2.connect(
- host=SOURCE_CONFIG["host"],
- port=SOURCE_CONFIG["port"],
- database=SOURCE_CONFIG["database"],
- user=SOURCE_CONFIG["user"],
- password=SOURCE_CONFIG["password"],
- )
- logger.info("源数据库连接成功")
- return conn
- def get_target_connection() -> psycopg2.extensions.connection:
- """
- 获取目标数据库连接
- Returns:
- psycopg2 连接对象
- """
- conn = psycopg2.connect(
- host=TARGET_CONFIG["host"],
- port=TARGET_CONFIG["port"],
- database=TARGET_CONFIG["database"],
- user=TARGET_CONFIG["user"],
- password=TARGET_CONFIG["password"],
- options=f"-c search_path={TARGET_SCHEMA},public",
- )
- logger.info("目标数据库连接成功")
- return conn
- def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
- """
- 确保目标表存在,如果不存在则创建
- Args:
- conn: 目标数据库连接
- """
- cursor = conn.cursor()
- try:
- # 检查表是否存在
- cursor.execute(
- """
- SELECT EXISTS(
- SELECT 1 FROM information_schema.tables
- WHERE table_schema = %s
- AND table_name = %s
- )
- """,
- (TARGET_SCHEMA, TARGET_TABLE),
- )
- result = cursor.fetchone()
- exists = result[0] if result else False
- if not exists:
- logger.info(f"目标表不存在,正在创建 {TARGET_SCHEMA}.{TARGET_TABLE}...")
- # 根据任务描述中的 DDL 创建表
- create_table_sql = f"""
- CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{TARGET_TABLE} (
- id SERIAL,
- sku VARCHAR(50),
- product_name VARCHAR(200),
- category VARCHAR(100),
- brand VARCHAR(100),
- supplier VARCHAR(200),
- warehouse VARCHAR(100),
- current_stock INTEGER,
- safety_stock INTEGER,
- max_stock INTEGER,
- unit_cost NUMERIC(10, 2),
- selling_price NUMERIC(10, 2),
- stock_status VARCHAR(50),
- last_inbound_date DATE,
- last_outbound_date DATE,
- inbound_quantity_30d INTEGER,
- outbound_quantity_30d INTEGER,
- turnover_rate NUMERIC(5, 2),
- is_active BOOLEAN,
- created_at TIMESTAMP,
- updated_at TIMESTAMP,
- create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- );
- """
- cursor.execute(create_table_sql)
- # 添加表注释
- cursor.execute(
- f"COMMENT ON TABLE {TARGET_SCHEMA}.{TARGET_TABLE} IS '产品库存表'"
- )
- # 添加列注释
- column_comments = {
- "id": "ID",
- "sku": "SKU",
- "product_name": "产品名称",
- "category": "类别",
- "brand": "品牌",
- "supplier": "供应商",
- "warehouse": "仓库",
- "current_stock": "当前库存",
- "safety_stock": "安全库存",
- "max_stock": "最大库存",
- "unit_cost": "单位成本",
- "selling_price": "销售价格",
- "stock_status": "库存状态",
- "last_inbound_date": "最近入库日期",
- "last_outbound_date": "最近出库日期",
- "inbound_quantity_30d": "30天入库数量",
- "outbound_quantity_30d": "30天出库数量",
- "turnover_rate": "周转率",
- "is_active": "是否启用",
- "created_at": "创建时间",
- "updated_at": "更新时间",
- "create_time": "数据创建时间",
- }
- for col, comment in column_comments.items():
- try:
- cursor.execute(
- f"COMMENT ON COLUMN {TARGET_SCHEMA}.{TARGET_TABLE}.{col} IS %s",
- (comment,),
- )
- except Exception as e:
- logger.warning(f"添加列注释失败 {col}: {e}")
- conn.commit()
- logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 创建成功")
- else:
- logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 已存在")
- except Exception as e:
- conn.rollback()
- logger.error(f"创建目标表失败: {e}")
- raise
- finally:
- cursor.close()
- def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
- """
- 从源表提取数据
- Args:
- conn: 源数据库连接
- Returns:
- 包含源数据的DataFrame
- """
- query = """
- SELECT
- id,
- sku,
- product_name,
- category,
- brand,
- supplier,
- warehouse,
- current_stock,
- safety_stock,
- max_stock,
- unit_cost,
- selling_price,
- stock_status,
- last_inbound_date,
- last_outbound_date,
- inbound_quantity_30d,
- outbound_quantity_30d,
- turnover_rate,
- is_active,
- created_at,
- updated_at
- FROM public.product_inventory_table_raw_data
- """
- logger.info("正在从源表提取数据...")
- try:
- df = pd.read_sql(query, conn)
- logger.info(f"成功提取 {len(df)} 条记录")
- return df
- except Exception as e:
- logger.error(f"提取源数据失败: {e}")
- raise
- def transform_data(df: pd.DataFrame) -> pd.DataFrame:
- """
- 数据转换处理
- Args:
- df: 源数据DataFrame
- Returns:
- 转换后的DataFrame
- """
- logger.info("正在执行数据转换...")
- # 本任务为简单导入,无需复杂转换
- # 源表和目标表字段基本一致,直接映射
- logger.info(f"数据转换完成,共 {len(df)} 条记录")
- return df
- def load_to_target(
- df: pd.DataFrame,
- conn: psycopg2.extensions.connection,
- update_mode: str = "append",
- batch_size: int = 1000,
- ) -> int:
- """
- 将数据加载到目标表
- Args:
- df: 要加载的DataFrame
- conn: 目标数据库连接
- update_mode: 更新模式(append 或 full)
- batch_size: 批量插入大小
- Returns:
- 插入的记录数
- """
- if df.empty:
- logger.warning("没有数据需要加载")
- return 0
- logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
- target_table = f"{TARGET_SCHEMA}.{TARGET_TABLE}"
- cursor = conn.cursor()
- inserted_count = 0
- try:
- # 全量更新模式:先清空目标表
- if update_mode.lower() == "full":
- logger.info("全量更新模式:清空目标表...")
- cursor.execute(f"TRUNCATE TABLE {target_table}")
- logger.info("目标表已清空")
- # 目标表结构准备插入的列(不包含 create_time,由数据库自动设置)
- columns = [
- "sku",
- "product_name",
- "category",
- "brand",
- "supplier",
- "warehouse",
- "current_stock",
- "safety_stock",
- "max_stock",
- "unit_cost",
- "selling_price",
- "stock_status",
- "last_inbound_date",
- "last_outbound_date",
- "inbound_quantity_30d",
- "outbound_quantity_30d",
- "turnover_rate",
- "is_active",
- "created_at",
- "updated_at",
- ]
- # 构建插入SQL
- placeholders = ", ".join(["%s"] * len(columns))
- column_names = ", ".join(columns)
- insert_sql = (
- f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
- )
- # 批量插入
- for i in range(0, len(df), batch_size):
- batch_df = df.iloc[i : i + batch_size]
- records = []
- for _, row in batch_df.iterrows():
- record = tuple(
- None if pd.isna(row.get(col)) else row.get(col) for col in columns
- )
- records.append(record)
- cursor.executemany(insert_sql, records)
- inserted_count += len(records)
- logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
- conn.commit()
- logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
- return inserted_count
- except Exception as e:
- conn.rollback()
- logger.error(f"数据加载失败: {e}")
- raise
- finally:
- cursor.close()
- def main() -> dict[str, Any]:
- """
- 主函数:执行ETL流程
- Returns:
- 执行结果字典
- """
- result = {
- "task_id": TASK_ID,
- "task_name": TASK_NAME,
- "status": "failed",
- "records_extracted": 0,
- "records_loaded": 0,
- "error_message": None,
- "execution_time": None,
- }
- start_time = datetime.now()
- source_conn = None
- target_conn = None
- try:
- logger.info("=" * 60)
- logger.info(f"任务开始: {TASK_NAME}")
- logger.info("=" * 60)
- # 步骤1: 建立数据库连接
- logger.info("[Step 1/5] 建立数据库连接...")
- source_conn = get_source_connection()
- target_conn = get_target_connection()
- # 步骤2: 确保目标表存在(重要:必须在数据加载前执行)
- logger.info("[Step 2/5] 检查/创建目标表...")
- ensure_target_table_exists(target_conn)
- # 步骤3: 从源表提取数据
- logger.info("[Step 3/5] 提取源数据...")
- df = extract_source_data(source_conn)
- result["records_extracted"] = len(df)
- # 步骤4: 数据转换
- logger.info("[Step 4/5] 数据转换...")
- df_transformed = transform_data(df)
- # 步骤5: 加载到目标表
- logger.info("[Step 5/5] 加载数据到目标表...")
- records_loaded = load_to_target(
- df_transformed, target_conn, update_mode=UPDATE_MODE
- )
- result["records_loaded"] = records_loaded
- result["status"] = "success"
- logger.info("=" * 60)
- logger.info(
- f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}"
- )
- logger.info("=" * 60)
- except Exception as e:
- result["status"] = "failed"
- result["error_message"] = str(e)
- logger.error(f"任务执行失败: {e}")
- raise
- finally:
- # 关闭数据库连接
- if source_conn:
- source_conn.close()
- logger.debug("源数据库连接已关闭")
- if target_conn:
- target_conn.close()
- logger.debug("目标数据库连接已关闭")
- result["execution_time"] = str(datetime.now() - start_time)
- return result
- if __name__ == "__main__":
- # 配置日志
- # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
- logger.remove()
- logger.add(
- sys.stdout,
- level="INFO",
- format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
- )
- logger.add(
- os.path.join(PROJECT_ROOT, "logs", f"task_{TASK_ID}.log"),
- level="DEBUG",
- rotation="10 MB",
- retention="7 days",
- encoding="utf-8",
- )
- try:
- result = main()
- if result["status"] == "success":
- sys.exit(0)
- else:
- sys.exit(1)
- except Exception as e:
- logger.exception(f"脚本执行异常: {e}")
- sys.exit(1)
|