#!/usr/bin/env python """ 数据流任务脚本 - 产品库存表的原始数据导入 任务ID: 43 任务名称: 产品库存表的原始数据导入 创建时间: 2026-01-21 更新模式: Append (追加模式) 描述: 从源表 public.product_inventory_table_raw_data 导入数据到目标表 dags.test_product_inventory """ from __future__ import annotations import os import sys from datetime import datetime from typing import Any import pandas as pd import psycopg2 from loguru import logger # 添加项目根目录到Python路径 PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) sys.path.insert(0, PROJECT_ROOT) # 任务配置 TASK_ID = 43 TASK_NAME = "产品库存表的原始数据导入" UPDATE_MODE = "append" # 源数据库配置 SOURCE_CONFIG = { "host": "192.168.3.143", "port": 5432, "database": "dataops", "user": "postgres", "password": "dataOps", } # 目标数据库配置 TARGET_CONFIG = { "host": "192.168.3.143", "port": 5432, # PostgreSQL 默认端口 "database": "dataops", "user": "postgres", "password": "dataOps", } # 目标表配置 TARGET_SCHEMA = "dags" TARGET_TABLE = "test_product_inventory" def get_source_connection() -> psycopg2.extensions.connection: """ 获取源数据库连接 Returns: psycopg2 连接对象 """ conn = psycopg2.connect( host=SOURCE_CONFIG["host"], port=SOURCE_CONFIG["port"], database=SOURCE_CONFIG["database"], user=SOURCE_CONFIG["user"], password=SOURCE_CONFIG["password"], ) logger.info("源数据库连接成功") return conn def get_target_connection() -> psycopg2.extensions.connection: """ 获取目标数据库连接 Returns: psycopg2 连接对象 """ conn = psycopg2.connect( host=TARGET_CONFIG["host"], port=TARGET_CONFIG["port"], database=TARGET_CONFIG["database"], user=TARGET_CONFIG["user"], password=TARGET_CONFIG["password"], options=f"-c search_path={TARGET_SCHEMA},public", ) logger.info("目标数据库连接成功") return conn def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None: """ 确保目标表存在,如果不存在则创建 Args: conn: 目标数据库连接 """ cursor = conn.cursor() try: # 检查表是否存在 cursor.execute( """ SELECT EXISTS( SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s ) """, (TARGET_SCHEMA, TARGET_TABLE), ) result = cursor.fetchone() exists = result[0] if result else False if not exists: logger.info(f"目标表不存在,正在创建 {TARGET_SCHEMA}.{TARGET_TABLE}...") # 根据任务描述中的 DDL 创建表 create_table_sql = f""" CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{TARGET_TABLE} ( id SERIAL, sku VARCHAR(50), product_name VARCHAR(200), category VARCHAR(100), brand VARCHAR(100), supplier VARCHAR(200), warehouse VARCHAR(100), current_stock INTEGER, safety_stock INTEGER, max_stock INTEGER, unit_cost NUMERIC(10, 2), selling_price NUMERIC(10, 2), stock_status VARCHAR(50), last_inbound_date DATE, last_outbound_date DATE, inbound_quantity_30d INTEGER, outbound_quantity_30d INTEGER, turnover_rate NUMERIC(5, 2), is_active BOOLEAN, created_at TIMESTAMP, updated_at TIMESTAMP, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """ cursor.execute(create_table_sql) # 添加表注释 cursor.execute( f"COMMENT ON TABLE {TARGET_SCHEMA}.{TARGET_TABLE} IS '产品库存表'" ) # 添加列注释 column_comments = { "id": "ID", "sku": "SKU", "product_name": "产品名称", "category": "类别", "brand": "品牌", "supplier": "供应商", "warehouse": "仓库", "current_stock": "当前库存", "safety_stock": "安全库存", "max_stock": "最大库存", "unit_cost": "单位成本", "selling_price": "销售价格", "stock_status": "库存状态", "last_inbound_date": "最近入库日期", "last_outbound_date": "最近出库日期", "inbound_quantity_30d": "30天入库数量", "outbound_quantity_30d": "30天出库数量", "turnover_rate": "周转率", "is_active": "是否启用", "created_at": "创建时间", "updated_at": "更新时间", "create_time": "数据创建时间", } for col, comment in column_comments.items(): try: cursor.execute( f"COMMENT ON COLUMN {TARGET_SCHEMA}.{TARGET_TABLE}.{col} IS %s", (comment,), ) except Exception as e: logger.warning(f"添加列注释失败 {col}: {e}") conn.commit() logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 创建成功") else: logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 已存在") except Exception as e: conn.rollback() logger.error(f"创建目标表失败: {e}") raise finally: cursor.close() def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame: """ 从源表提取数据 Args: conn: 源数据库连接 Returns: 包含源数据的DataFrame """ query = """ SELECT id, sku, product_name, category, brand, supplier, warehouse, current_stock, safety_stock, max_stock, unit_cost, selling_price, stock_status, last_inbound_date, last_outbound_date, inbound_quantity_30d, outbound_quantity_30d, turnover_rate, is_active, created_at, updated_at FROM public.product_inventory_table_raw_data """ logger.info("正在从源表提取数据...") try: df = pd.read_sql(query, conn) logger.info(f"成功提取 {len(df)} 条记录") return df except Exception as e: logger.error(f"提取源数据失败: {e}") raise def transform_data(df: pd.DataFrame) -> pd.DataFrame: """ 数据转换处理 Args: df: 源数据DataFrame Returns: 转换后的DataFrame """ logger.info("正在执行数据转换...") # 本任务为简单导入,无需复杂转换 # 源表和目标表字段基本一致,直接映射 logger.info(f"数据转换完成,共 {len(df)} 条记录") return df def load_to_target( df: pd.DataFrame, conn: psycopg2.extensions.connection, update_mode: str = "append", batch_size: int = 1000, ) -> int: """ 将数据加载到目标表 Args: df: 要加载的DataFrame conn: 目标数据库连接 update_mode: 更新模式(append 或 full) batch_size: 批量插入大小 Returns: 插入的记录数 """ if df.empty: logger.warning("没有数据需要加载") return 0 logger.info(f"正在将 {len(df)} 条记录加载到目标表...") target_table = f"{TARGET_SCHEMA}.{TARGET_TABLE}" cursor = conn.cursor() inserted_count = 0 try: # 全量更新模式:先清空目标表 if update_mode.lower() == "full": logger.info("全量更新模式:清空目标表...") cursor.execute(f"TRUNCATE TABLE {target_table}") logger.info("目标表已清空") # 目标表结构准备插入的列(不包含 create_time,由数据库自动设置) columns = [ "sku", "product_name", "category", "brand", "supplier", "warehouse", "current_stock", "safety_stock", "max_stock", "unit_cost", "selling_price", "stock_status", "last_inbound_date", "last_outbound_date", "inbound_quantity_30d", "outbound_quantity_30d", "turnover_rate", "is_active", "created_at", "updated_at", ] # 构建插入SQL placeholders = ", ".join(["%s"] * len(columns)) column_names = ", ".join(columns) insert_sql = ( f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})" ) # 批量插入 for i in range(0, len(df), batch_size): batch_df = df.iloc[i : i + batch_size] records = [] for _, row in batch_df.iterrows(): record = tuple( None if pd.isna(row.get(col)) else row.get(col) for col in columns ) records.append(record) cursor.executemany(insert_sql, records) inserted_count += len(records) logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录") conn.commit() logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}") return inserted_count except Exception as e: conn.rollback() logger.error(f"数据加载失败: {e}") raise finally: cursor.close() def main() -> dict[str, Any]: """ 主函数:执行ETL流程 Returns: 执行结果字典 """ result = { "task_id": TASK_ID, "task_name": TASK_NAME, "status": "failed", "records_extracted": 0, "records_loaded": 0, "error_message": None, "execution_time": None, } start_time = datetime.now() source_conn = None target_conn = None try: logger.info("=" * 60) logger.info(f"任务开始: {TASK_NAME}") logger.info("=" * 60) # 步骤1: 建立数据库连接 logger.info("[Step 1/5] 建立数据库连接...") source_conn = get_source_connection() target_conn = get_target_connection() # 步骤2: 确保目标表存在(重要:必须在数据加载前执行) logger.info("[Step 2/5] 检查/创建目标表...") ensure_target_table_exists(target_conn) # 步骤3: 从源表提取数据 logger.info("[Step 3/5] 提取源数据...") df = extract_source_data(source_conn) result["records_extracted"] = len(df) # 步骤4: 数据转换 logger.info("[Step 4/5] 数据转换...") df_transformed = transform_data(df) # 步骤5: 加载到目标表 logger.info("[Step 5/5] 加载数据到目标表...") records_loaded = load_to_target( df_transformed, target_conn, update_mode=UPDATE_MODE ) result["records_loaded"] = records_loaded result["status"] = "success" logger.info("=" * 60) logger.info( f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}" ) logger.info("=" * 60) except Exception as e: result["status"] = "failed" result["error_message"] = str(e) logger.error(f"任务执行失败: {e}") raise finally: # 关闭数据库连接 if source_conn: source_conn.close() logger.debug("源数据库连接已关闭") if target_conn: target_conn.close() logger.debug("目标数据库连接已关闭") result["execution_time"] = str(datetime.now() - start_time) return result if __name__ == "__main__": # 配置日志 # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出 logger.remove() logger.add( sys.stdout, level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", ) logger.add( os.path.join(PROJECT_ROOT, "logs", f"task_{TASK_ID}.log"), level="DEBUG", rotation="10 MB", retention="7 days", encoding="utf-8", ) try: result = main() if result["status"] == "success": sys.exit(0) else: sys.exit(1) except Exception as e: logger.exception(f"脚本执行异常: {e}") sys.exit(1)