Browse Source

fix: change target database port from 5678 to 5432

maxiaolong 2 weeks ago
parent
commit
24f2ecc6fb
1 changed files with 327 additions and 0 deletions
  1. 327 0
      datafactory/scripts/task_37_产品库存表原始数据导入.py

+ 327 - 0
datafactory/scripts/task_37_产品库存表原始数据导入.py

@@ -0,0 +1,327 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+任务ID: 37
+任务名称: 产品库存表原始数据导入
+任务描述: 把产品库存表的原始数据导入到数据资源的产品库存表中
+更新模式: Append (追加模式)
+
+源表: product_inventory_table_raw_data (原始数据表)
+目标表: dags.test_product_inventory (数据资源表)
+
+创建时间: 2026-01-16
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+from datetime import datetime
+from typing import Any
+
+import pandas as pd
+import psycopg2
+from loguru import logger
+
+# 添加项目根目录到Python路径
+PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
+sys.path.insert(0, PROJECT_ROOT)
+
+from app.config.config import config, current_env
+
+# 获取配置
+app_config = config[current_env]
+
+
+def get_source_connection() -> psycopg2.extensions.connection:
+    """
+    获取源数据库连接(原始数据库)
+
+    Returns:
+        psycopg2 连接对象
+    """
+    # 解析应用数据库URI作为源数据库
+    # 实际使用时可根据任务描述中的数据源配置调整
+    db_uri = app_config.SQLALCHEMY_DATABASE_URI
+
+    # 解析连接字符串
+    # 格式: postgresql://user:password@host:port/database
+    if db_uri.startswith("postgresql://"):
+        db_uri = db_uri[13:]  # 移除 postgresql://
+
+    # 分割用户信息和主机信息
+    user_part, host_part = db_uri.split("@")
+    username, password = user_part.split(":")
+    host_db = host_part.split("/")
+    host_port = host_db[0].split(":")
+    host = host_port[0]
+    port = int(host_port[1]) if len(host_port) > 1 else 5432
+    database = host_db[1] if len(host_db) > 1 else "dataops"
+
+    conn = psycopg2.connect(
+        host=host,
+        port=port,
+        database=database,
+        user=username,
+        password=password,
+    )
+    logger.info(f"源数据库连接成功: {host}:{port}/{database}")
+    return conn
+
+
+def get_target_connection() -> psycopg2.extensions.connection:
+    """
+    获取目标数据库连接(数据资源数据库)
+
+    根据任务描述,目标数据库:
+    - Host: 192.168.3.143
+    - Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
+    - Database: dataops
+    - Schema: dags
+
+    Returns:
+        psycopg2 连接对象
+    """
+    conn = psycopg2.connect(
+        host="192.168.3.143",
+        port=5432,
+        database="dataops",
+        user="postgres",
+        password="dataOps",
+        options="-c search_path=dags,public",
+    )
+    logger.info("目标数据库连接成功: 192.168.3.143:5432/dataops (schema: dags)")
+    return conn
+
+
+def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
+    """
+    从源表提取数据
+
+    Args:
+        conn: 源数据库连接
+
+    Returns:
+        包含源数据的DataFrame
+    """
+    query = """
+    SELECT
+        sku,
+        product_name,
+        category,
+        brand,
+        supplier,
+        warehouse,
+        current_stock,
+        safety_stock,
+        max_stock,
+        unit_cost,
+        selling_price,
+        stock_status,
+        last_inbound_date,
+        last_outbound_date,
+        inbound_quantity_30d,
+        outbound_quantity_30d,
+        turnover_rate,
+        is_active,
+        created_at,
+        updated_at
+    FROM product_inventory_table_raw_data
+    """
+
+    logger.info("正在从源表提取数据...")
+
+    try:
+        df = pd.read_sql(query, conn)
+        logger.info(f"成功提取 {len(df)} 条记录")
+        return df
+    except Exception as e:
+        logger.error(f"提取源数据失败: {e}")
+        raise
+
+
+def transform_data(df: pd.DataFrame) -> pd.DataFrame:
+    """
+    数据转换处理
+
+    根据任务描述:直接导入,无需特殊转换
+
+    Args:
+        df: 源数据DataFrame
+
+    Returns:
+        转换后的DataFrame
+    """
+    logger.info("正在执行数据转换...")
+
+    # 确保列名与目标表匹配(不区分大小写)
+    df.columns = df.columns.str.lower()
+
+    # 添加 create_time 字段(目标表可能需要)
+    if "create_time" not in df.columns:
+        df["create_time"] = datetime.now()
+
+    logger.info(f"数据转换完成,共 {len(df)} 条记录")
+    return df
+
+
+def load_to_target(
+    df: pd.DataFrame,
+    conn: psycopg2.extensions.connection,
+    batch_size: int = 1000,
+) -> int:
+    """
+    将数据加载到目标表(追加模式)
+
+    Args:
+        df: 要加载的DataFrame
+        conn: 目标数据库连接
+        batch_size: 批量插入大小
+
+    Returns:
+        插入的记录数
+    """
+    if df.empty:
+        logger.warning("没有数据需要加载")
+        return 0
+
+    logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
+
+    target_table = "test_product_inventory"
+
+    # 准备插入的列
+    columns = [
+        "sku", "product_name", "category", "brand", "supplier", "warehouse",
+        "current_stock", "safety_stock", "max_stock", "unit_cost", "selling_price",
+        "stock_status", "last_inbound_date", "last_outbound_date",
+        "inbound_quantity_30d", "outbound_quantity_30d", "turnover_rate",
+        "is_active", "created_at", "updated_at"
+    ]
+
+    # 构建插入SQL
+    placeholders = ", ".join(["%s"] * len(columns))
+    column_names = ", ".join(columns)
+    insert_sql = f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
+
+    cursor = conn.cursor()
+    inserted_count = 0
+
+    try:
+        for i in range(0, len(df), batch_size):
+            batch_df = df.iloc[i:i + batch_size]
+            records = []
+            for _, row in batch_df.iterrows():
+                record = tuple(row[col] if col in row.index else None for col in columns)
+                records.append(record)
+
+            cursor.executemany(insert_sql, records)
+            inserted_count += len(records)
+            logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
+
+        conn.commit()
+        logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
+        return inserted_count
+
+    except Exception as e:
+        conn.rollback()
+        logger.error(f"数据加载失败: {e}")
+        raise
+    finally:
+        cursor.close()
+
+
+def main() -> dict[str, Any]:
+    """
+    主函数:执行ETL流程
+
+    Returns:
+        执行结果字典
+    """
+    result = {
+        "task_id": 37,
+        "task_name": "产品库存表原始数据导入",
+        "status": "failed",
+        "records_extracted": 0,
+        "records_loaded": 0,
+        "error_message": None,
+        "execution_time": None,
+    }
+
+    start_time = datetime.now()
+    source_conn = None
+    target_conn = None
+
+    try:
+        logger.info("=" * 60)
+        logger.info("任务开始: 产品库存表原始数据导入")
+        logger.info("=" * 60)
+
+        # 步骤1: 建立数据库连接
+        logger.info("[Step 1/4] 建立数据库连接...")
+        source_conn = get_source_connection()
+        target_conn = get_target_connection()
+
+        # 步骤2: 从源表提取数据
+        logger.info("[Step 2/4] 提取源数据...")
+        df = extract_source_data(source_conn)
+        result["records_extracted"] = len(df)
+
+        # 步骤3: 数据转换
+        logger.info("[Step 3/4] 数据转换...")
+        df_transformed = transform_data(df)
+
+        # 步骤4: 加载到目标表(追加模式)
+        logger.info("[Step 4/4] 加载数据到目标表...")
+        records_loaded = load_to_target(df_transformed, target_conn)
+        result["records_loaded"] = records_loaded
+
+        result["status"] = "success"
+        logger.info("=" * 60)
+        logger.info(f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}")
+        logger.info("=" * 60)
+
+    except Exception as e:
+        result["status"] = "failed"
+        result["error_message"] = str(e)
+        logger.error(f"任务执行失败: {e}")
+        raise
+
+    finally:
+        # 关闭数据库连接
+        if source_conn:
+            source_conn.close()
+            logger.debug("源数据库连接已关闭")
+        if target_conn:
+            target_conn.close()
+            logger.debug("目标数据库连接已关闭")
+
+        result["execution_time"] = str(datetime.now() - start_time)
+
+    return result
+
+
+if __name__ == "__main__":
+    # 配置日志
+    logger.remove()
+    logger.add(
+        sys.stderr,
+        level="INFO",
+        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
+    )
+    logger.add(
+        os.path.join(PROJECT_ROOT, "logs", "task_37_产品库存表原始数据导入.log"),
+        level="DEBUG",
+        rotation="10 MB",
+        retention="7 days",
+        encoding="utf-8",
+    )
+
+    try:
+        result = main()
+        if result["status"] == "success":
+            sys.exit(0)
+        else:
+            sys.exit(1)
+    except Exception as e:
+        logger.exception(f"脚本执行异常: {e}")
+        sys.exit(1)