task_37_产品库存表原始数据导入.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 任务ID: 37
  5. 任务名称: 产品库存表原始数据导入
  6. 任务描述: 把产品库存表的原始数据导入到数据资源的产品库存表中
  7. 更新模式: Append (追加模式)
  8. 源表: product_inventory_table_raw_data (原始数据表)
  9. 目标表: dags.test_product_inventory (数据资源表)
  10. 创建时间: 2026-01-16
  11. """
  12. from __future__ import annotations
  13. import os
  14. import sys
  15. from datetime import datetime
  16. from typing import Any
  17. import pandas as pd
  18. import psycopg2
  19. from loguru import logger
  20. # 添加项目根目录到Python路径
  21. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
  22. sys.path.insert(0, PROJECT_ROOT)
  23. from app.config.config import config, current_env
  24. # 获取配置
  25. app_config = config[current_env]
  26. def get_source_connection() -> psycopg2.extensions.connection:
  27. """
  28. 获取源数据库连接(原始数据库)
  29. Returns:
  30. psycopg2 连接对象
  31. """
  32. # 解析应用数据库URI作为源数据库
  33. # 实际使用时可根据任务描述中的数据源配置调整
  34. db_uri = app_config.SQLALCHEMY_DATABASE_URI
  35. # 解析连接字符串
  36. # 格式: postgresql://user:password@host:port/database
  37. if db_uri.startswith("postgresql://"):
  38. db_uri = db_uri[13:] # 移除 postgresql://
  39. # 分割用户信息和主机信息
  40. user_part, host_part = db_uri.split("@")
  41. username, password = user_part.split(":")
  42. host_db = host_part.split("/")
  43. host_port = host_db[0].split(":")
  44. host = host_port[0]
  45. port = int(host_port[1]) if len(host_port) > 1 else 5432
  46. database = host_db[1] if len(host_db) > 1 else "dataops"
  47. conn = psycopg2.connect(
  48. host=host,
  49. port=port,
  50. database=database,
  51. user=username,
  52. password=password,
  53. )
  54. logger.info(f"源数据库连接成功: {host}:{port}/{database}")
  55. return conn
  56. def get_target_connection() -> psycopg2.extensions.connection:
  57. """
  58. 获取目标数据库连接(数据资源数据库)
  59. 根据任务描述,目标数据库:
  60. - Host: 192.168.3.143
  61. - Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
  62. - Database: dataops
  63. - Schema: dags
  64. Returns:
  65. psycopg2 连接对象
  66. """
  67. conn = psycopg2.connect(
  68. host="192.168.3.143",
  69. port=5432,
  70. database="dataops",
  71. user="postgres",
  72. password="dataOps",
  73. options="-c search_path=dags,public",
  74. )
  75. logger.info("目标数据库连接成功: 192.168.3.143:5432/dataops (schema: dags)")
  76. return conn
  77. def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
  78. """
  79. 从源表提取数据
  80. Args:
  81. conn: 源数据库连接
  82. Returns:
  83. 包含源数据的DataFrame
  84. """
  85. query = """
  86. SELECT
  87. sku,
  88. product_name,
  89. category,
  90. brand,
  91. supplier,
  92. warehouse,
  93. current_stock,
  94. safety_stock,
  95. max_stock,
  96. unit_cost,
  97. selling_price,
  98. stock_status,
  99. last_inbound_date,
  100. last_outbound_date,
  101. inbound_quantity_30d,
  102. outbound_quantity_30d,
  103. turnover_rate,
  104. is_active,
  105. created_at,
  106. updated_at
  107. FROM product_inventory_table_raw_data
  108. """
  109. logger.info("正在从源表提取数据...")
  110. try:
  111. df = pd.read_sql(query, conn)
  112. logger.info(f"成功提取 {len(df)} 条记录")
  113. return df
  114. except Exception as e:
  115. logger.error(f"提取源数据失败: {e}")
  116. raise
  117. def transform_data(df: pd.DataFrame) -> pd.DataFrame:
  118. """
  119. 数据转换处理
  120. 根据任务描述:直接导入,无需特殊转换
  121. Args:
  122. df: 源数据DataFrame
  123. Returns:
  124. 转换后的DataFrame
  125. """
  126. logger.info("正在执行数据转换...")
  127. # 确保列名与目标表匹配(不区分大小写)
  128. df.columns = df.columns.str.lower()
  129. # 添加 create_time 字段(目标表可能需要)
  130. if "create_time" not in df.columns:
  131. df["create_time"] = datetime.now()
  132. logger.info(f"数据转换完成,共 {len(df)} 条记录")
  133. return df
  134. def load_to_target(
  135. df: pd.DataFrame,
  136. conn: psycopg2.extensions.connection,
  137. batch_size: int = 1000,
  138. ) -> int:
  139. """
  140. 将数据加载到目标表(追加模式)
  141. Args:
  142. df: 要加载的DataFrame
  143. conn: 目标数据库连接
  144. batch_size: 批量插入大小
  145. Returns:
  146. 插入的记录数
  147. """
  148. if df.empty:
  149. logger.warning("没有数据需要加载")
  150. return 0
  151. logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
  152. target_table = "test_product_inventory"
  153. # 准备插入的列
  154. columns = [
  155. "sku", "product_name", "category", "brand", "supplier", "warehouse",
  156. "current_stock", "safety_stock", "max_stock", "unit_cost", "selling_price",
  157. "stock_status", "last_inbound_date", "last_outbound_date",
  158. "inbound_quantity_30d", "outbound_quantity_30d", "turnover_rate",
  159. "is_active", "created_at", "updated_at"
  160. ]
  161. # 构建插入SQL
  162. placeholders = ", ".join(["%s"] * len(columns))
  163. column_names = ", ".join(columns)
  164. insert_sql = f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
  165. cursor = conn.cursor()
  166. inserted_count = 0
  167. try:
  168. for i in range(0, len(df), batch_size):
  169. batch_df = df.iloc[i:i + batch_size]
  170. records = []
  171. for _, row in batch_df.iterrows():
  172. record = tuple(row[col] if col in row.index else None for col in columns)
  173. records.append(record)
  174. cursor.executemany(insert_sql, records)
  175. inserted_count += len(records)
  176. logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
  177. conn.commit()
  178. logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
  179. return inserted_count
  180. except Exception as e:
  181. conn.rollback()
  182. logger.error(f"数据加载失败: {e}")
  183. raise
  184. finally:
  185. cursor.close()
  186. def main() -> dict[str, Any]:
  187. """
  188. 主函数:执行ETL流程
  189. Returns:
  190. 执行结果字典
  191. """
  192. result = {
  193. "task_id": 37,
  194. "task_name": "产品库存表原始数据导入",
  195. "status": "failed",
  196. "records_extracted": 0,
  197. "records_loaded": 0,
  198. "error_message": None,
  199. "execution_time": None,
  200. }
  201. start_time = datetime.now()
  202. source_conn = None
  203. target_conn = None
  204. try:
  205. logger.info("=" * 60)
  206. logger.info("任务开始: 产品库存表原始数据导入")
  207. logger.info("=" * 60)
  208. # 步骤1: 建立数据库连接
  209. logger.info("[Step 1/4] 建立数据库连接...")
  210. source_conn = get_source_connection()
  211. target_conn = get_target_connection()
  212. # 步骤2: 从源表提取数据
  213. logger.info("[Step 2/4] 提取源数据...")
  214. df = extract_source_data(source_conn)
  215. result["records_extracted"] = len(df)
  216. # 步骤3: 数据转换
  217. logger.info("[Step 3/4] 数据转换...")
  218. df_transformed = transform_data(df)
  219. # 步骤4: 加载到目标表(追加模式)
  220. logger.info("[Step 4/4] 加载数据到目标表...")
  221. records_loaded = load_to_target(df_transformed, target_conn)
  222. result["records_loaded"] = records_loaded
  223. result["status"] = "success"
  224. logger.info("=" * 60)
  225. logger.info(f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}")
  226. logger.info("=" * 60)
  227. except Exception as e:
  228. result["status"] = "failed"
  229. result["error_message"] = str(e)
  230. logger.error(f"任务执行失败: {e}")
  231. raise
  232. finally:
  233. # 关闭数据库连接
  234. if source_conn:
  235. source_conn.close()
  236. logger.debug("源数据库连接已关闭")
  237. if target_conn:
  238. target_conn.close()
  239. logger.debug("目标数据库连接已关闭")
  240. result["execution_time"] = str(datetime.now() - start_time)
  241. return result
  242. if __name__ == "__main__":
  243. # 配置日志
  244. logger.remove()
  245. logger.add(
  246. sys.stderr,
  247. level="INFO",
  248. format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
  249. )
  250. logger.add(
  251. os.path.join(PROJECT_ROOT, "logs", "task_37_产品库存表原始数据导入.log"),
  252. level="DEBUG",
  253. rotation="10 MB",
  254. retention="7 days",
  255. encoding="utf-8",
  256. )
  257. try:
  258. result = main()
  259. if result["status"] == "success":
  260. sys.exit(0)
  261. else:
  262. sys.exit(1)
  263. except Exception as e:
  264. logger.exception(f"脚本执行异常: {e}")
  265. sys.exit(1)