task_43_产品库存表的原始数据导入.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. #!/usr/bin/env python
  2. """
  3. 数据流任务脚本 - 产品库存表的原始数据导入
  4. 任务ID: 43
  5. 任务名称: 产品库存表的原始数据导入
  6. 创建时间: 2026-01-21
  7. 更新模式: Append (追加模式)
  8. 描述: 从源表 public.product_inventory_table_raw_data 导入数据到目标表 dags.test_product_inventory
  9. """
  10. from __future__ import annotations
  11. import os
  12. import sys
  13. from datetime import datetime
  14. from typing import Any
  15. import pandas as pd
  16. import psycopg2
  17. from loguru import logger
  18. # 添加项目根目录到Python路径
  19. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
  20. sys.path.insert(0, PROJECT_ROOT)
  21. # 任务配置
  22. TASK_ID = 43
  23. TASK_NAME = "产品库存表的原始数据导入"
  24. UPDATE_MODE = "append"
  25. # 源数据库配置
  26. SOURCE_CONFIG = {
  27. "host": "192.168.3.143",
  28. "port": 5432,
  29. "database": "dataops",
  30. "user": "postgres",
  31. "password": "dataOps",
  32. }
  33. # 目标数据库配置
  34. TARGET_CONFIG = {
  35. "host": "192.168.3.143",
  36. "port": 5432, # PostgreSQL 默认端口
  37. "database": "dataops",
  38. "user": "postgres",
  39. "password": "dataOps",
  40. }
  41. # 目标表配置
  42. TARGET_SCHEMA = "dags"
  43. TARGET_TABLE = "test_product_inventory"
  44. def get_source_connection() -> psycopg2.extensions.connection:
  45. """
  46. 获取源数据库连接
  47. Returns:
  48. psycopg2 连接对象
  49. """
  50. conn = psycopg2.connect(
  51. host=SOURCE_CONFIG["host"],
  52. port=SOURCE_CONFIG["port"],
  53. database=SOURCE_CONFIG["database"],
  54. user=SOURCE_CONFIG["user"],
  55. password=SOURCE_CONFIG["password"],
  56. )
  57. logger.info("源数据库连接成功")
  58. return conn
  59. def get_target_connection() -> psycopg2.extensions.connection:
  60. """
  61. 获取目标数据库连接
  62. Returns:
  63. psycopg2 连接对象
  64. """
  65. conn = psycopg2.connect(
  66. host=TARGET_CONFIG["host"],
  67. port=TARGET_CONFIG["port"],
  68. database=TARGET_CONFIG["database"],
  69. user=TARGET_CONFIG["user"],
  70. password=TARGET_CONFIG["password"],
  71. options=f"-c search_path={TARGET_SCHEMA},public",
  72. )
  73. logger.info("目标数据库连接成功")
  74. return conn
  75. def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
  76. """
  77. 确保目标表存在,如果不存在则创建
  78. Args:
  79. conn: 目标数据库连接
  80. """
  81. cursor = conn.cursor()
  82. try:
  83. # 检查表是否存在
  84. cursor.execute(
  85. """
  86. SELECT EXISTS(
  87. SELECT 1 FROM information_schema.tables
  88. WHERE table_schema = %s
  89. AND table_name = %s
  90. )
  91. """,
  92. (TARGET_SCHEMA, TARGET_TABLE),
  93. )
  94. result = cursor.fetchone()
  95. exists = result[0] if result else False
  96. if not exists:
  97. logger.info(f"目标表不存在,正在创建 {TARGET_SCHEMA}.{TARGET_TABLE}...")
  98. # 根据任务描述中的 DDL 创建表
  99. create_table_sql = f"""
  100. CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{TARGET_TABLE} (
  101. id SERIAL,
  102. sku VARCHAR(50),
  103. product_name VARCHAR(200),
  104. category VARCHAR(100),
  105. brand VARCHAR(100),
  106. supplier VARCHAR(200),
  107. warehouse VARCHAR(100),
  108. current_stock INTEGER,
  109. safety_stock INTEGER,
  110. max_stock INTEGER,
  111. unit_cost NUMERIC(10, 2),
  112. selling_price NUMERIC(10, 2),
  113. stock_status VARCHAR(50),
  114. last_inbound_date DATE,
  115. last_outbound_date DATE,
  116. inbound_quantity_30d INTEGER,
  117. outbound_quantity_30d INTEGER,
  118. turnover_rate NUMERIC(5, 2),
  119. is_active BOOLEAN,
  120. created_at TIMESTAMP,
  121. updated_at TIMESTAMP,
  122. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  123. );
  124. """
  125. cursor.execute(create_table_sql)
  126. # 添加表注释
  127. cursor.execute(
  128. f"COMMENT ON TABLE {TARGET_SCHEMA}.{TARGET_TABLE} IS '产品库存表'"
  129. )
  130. # 添加列注释
  131. column_comments = {
  132. "id": "ID",
  133. "sku": "SKU",
  134. "product_name": "产品名称",
  135. "category": "类别",
  136. "brand": "品牌",
  137. "supplier": "供应商",
  138. "warehouse": "仓库",
  139. "current_stock": "当前库存",
  140. "safety_stock": "安全库存",
  141. "max_stock": "最大库存",
  142. "unit_cost": "单位成本",
  143. "selling_price": "销售价格",
  144. "stock_status": "库存状态",
  145. "last_inbound_date": "最近入库日期",
  146. "last_outbound_date": "最近出库日期",
  147. "inbound_quantity_30d": "30天入库数量",
  148. "outbound_quantity_30d": "30天出库数量",
  149. "turnover_rate": "周转率",
  150. "is_active": "是否启用",
  151. "created_at": "创建时间",
  152. "updated_at": "更新时间",
  153. "create_time": "数据创建时间",
  154. }
  155. for col, comment in column_comments.items():
  156. try:
  157. cursor.execute(
  158. f"COMMENT ON COLUMN {TARGET_SCHEMA}.{TARGET_TABLE}.{col} IS %s",
  159. (comment,),
  160. )
  161. except Exception as e:
  162. logger.warning(f"添加列注释失败 {col}: {e}")
  163. conn.commit()
  164. logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 创建成功")
  165. else:
  166. logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 已存在")
  167. except Exception as e:
  168. conn.rollback()
  169. logger.error(f"创建目标表失败: {e}")
  170. raise
  171. finally:
  172. cursor.close()
  173. def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
  174. """
  175. 从源表提取数据
  176. Args:
  177. conn: 源数据库连接
  178. Returns:
  179. 包含源数据的DataFrame
  180. """
  181. query = """
  182. SELECT
  183. id,
  184. sku,
  185. product_name,
  186. category,
  187. brand,
  188. supplier,
  189. warehouse,
  190. current_stock,
  191. safety_stock,
  192. max_stock,
  193. unit_cost,
  194. selling_price,
  195. stock_status,
  196. last_inbound_date,
  197. last_outbound_date,
  198. inbound_quantity_30d,
  199. outbound_quantity_30d,
  200. turnover_rate,
  201. is_active,
  202. created_at,
  203. updated_at
  204. FROM public.product_inventory_table_raw_data
  205. """
  206. logger.info("正在从源表提取数据...")
  207. try:
  208. df = pd.read_sql(query, conn)
  209. logger.info(f"成功提取 {len(df)} 条记录")
  210. return df
  211. except Exception as e:
  212. logger.error(f"提取源数据失败: {e}")
  213. raise
  214. def transform_data(df: pd.DataFrame) -> pd.DataFrame:
  215. """
  216. 数据转换处理
  217. Args:
  218. df: 源数据DataFrame
  219. Returns:
  220. 转换后的DataFrame
  221. """
  222. logger.info("正在执行数据转换...")
  223. # 本任务为简单导入,无需复杂转换
  224. # 源表和目标表字段基本一致,直接映射
  225. logger.info(f"数据转换完成,共 {len(df)} 条记录")
  226. return df
  227. def load_to_target(
  228. df: pd.DataFrame,
  229. conn: psycopg2.extensions.connection,
  230. update_mode: str = "append",
  231. batch_size: int = 1000,
  232. ) -> int:
  233. """
  234. 将数据加载到目标表
  235. Args:
  236. df: 要加载的DataFrame
  237. conn: 目标数据库连接
  238. update_mode: 更新模式(append 或 full)
  239. batch_size: 批量插入大小
  240. Returns:
  241. 插入的记录数
  242. """
  243. if df.empty:
  244. logger.warning("没有数据需要加载")
  245. return 0
  246. logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
  247. target_table = f"{TARGET_SCHEMA}.{TARGET_TABLE}"
  248. cursor = conn.cursor()
  249. inserted_count = 0
  250. try:
  251. # 全量更新模式:先清空目标表
  252. if update_mode.lower() == "full":
  253. logger.info("全量更新模式:清空目标表...")
  254. cursor.execute(f"TRUNCATE TABLE {target_table}")
  255. logger.info("目标表已清空")
  256. # 目标表结构准备插入的列(不包含 create_time,由数据库自动设置)
  257. columns = [
  258. "sku",
  259. "product_name",
  260. "category",
  261. "brand",
  262. "supplier",
  263. "warehouse",
  264. "current_stock",
  265. "safety_stock",
  266. "max_stock",
  267. "unit_cost",
  268. "selling_price",
  269. "stock_status",
  270. "last_inbound_date",
  271. "last_outbound_date",
  272. "inbound_quantity_30d",
  273. "outbound_quantity_30d",
  274. "turnover_rate",
  275. "is_active",
  276. "created_at",
  277. "updated_at",
  278. ]
  279. # 构建插入SQL
  280. placeholders = ", ".join(["%s"] * len(columns))
  281. column_names = ", ".join(columns)
  282. insert_sql = (
  283. f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
  284. )
  285. # 批量插入
  286. for i in range(0, len(df), batch_size):
  287. batch_df = df.iloc[i : i + batch_size]
  288. records = []
  289. for _, row in batch_df.iterrows():
  290. record = tuple(
  291. None if pd.isna(row.get(col)) else row.get(col) for col in columns
  292. )
  293. records.append(record)
  294. cursor.executemany(insert_sql, records)
  295. inserted_count += len(records)
  296. logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
  297. conn.commit()
  298. logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
  299. return inserted_count
  300. except Exception as e:
  301. conn.rollback()
  302. logger.error(f"数据加载失败: {e}")
  303. raise
  304. finally:
  305. cursor.close()
  306. def main() -> dict[str, Any]:
  307. """
  308. 主函数:执行ETL流程
  309. Returns:
  310. 执行结果字典
  311. """
  312. result = {
  313. "task_id": TASK_ID,
  314. "task_name": TASK_NAME,
  315. "status": "failed",
  316. "records_extracted": 0,
  317. "records_loaded": 0,
  318. "error_message": None,
  319. "execution_time": None,
  320. }
  321. start_time = datetime.now()
  322. source_conn = None
  323. target_conn = None
  324. try:
  325. logger.info("=" * 60)
  326. logger.info(f"任务开始: {TASK_NAME}")
  327. logger.info("=" * 60)
  328. # 步骤1: 建立数据库连接
  329. logger.info("[Step 1/5] 建立数据库连接...")
  330. source_conn = get_source_connection()
  331. target_conn = get_target_connection()
  332. # 步骤2: 确保目标表存在(重要:必须在数据加载前执行)
  333. logger.info("[Step 2/5] 检查/创建目标表...")
  334. ensure_target_table_exists(target_conn)
  335. # 步骤3: 从源表提取数据
  336. logger.info("[Step 3/5] 提取源数据...")
  337. df = extract_source_data(source_conn)
  338. result["records_extracted"] = len(df)
  339. # 步骤4: 数据转换
  340. logger.info("[Step 4/5] 数据转换...")
  341. df_transformed = transform_data(df)
  342. # 步骤5: 加载到目标表
  343. logger.info("[Step 5/5] 加载数据到目标表...")
  344. records_loaded = load_to_target(
  345. df_transformed, target_conn, update_mode=UPDATE_MODE
  346. )
  347. result["records_loaded"] = records_loaded
  348. result["status"] = "success"
  349. logger.info("=" * 60)
  350. logger.info(
  351. f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}"
  352. )
  353. logger.info("=" * 60)
  354. except Exception as e:
  355. result["status"] = "failed"
  356. result["error_message"] = str(e)
  357. logger.error(f"任务执行失败: {e}")
  358. raise
  359. finally:
  360. # 关闭数据库连接
  361. if source_conn:
  362. source_conn.close()
  363. logger.debug("源数据库连接已关闭")
  364. if target_conn:
  365. target_conn.close()
  366. logger.debug("目标数据库连接已关闭")
  367. result["execution_time"] = str(datetime.now() - start_time)
  368. return result
  369. if __name__ == "__main__":
  370. # 配置日志
  371. # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
  372. logger.remove()
  373. logger.add(
  374. sys.stdout,
  375. level="INFO",
  376. format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
  377. )
  378. logger.add(
  379. os.path.join(PROJECT_ROOT, "logs", f"task_{TASK_ID}.log"),
  380. level="DEBUG",
  381. rotation="10 MB",
  382. retention="7 days",
  383. encoding="utf-8",
  384. )
  385. try:
  386. result = main()
  387. if result["status"] == "success":
  388. sys.exit(0)
  389. else:
  390. sys.exit(1)
  391. except Exception as e:
  392. logger.exception(f"脚本执行异常: {e}")
  393. sys.exit(1)