task_37_产品库存表原始数据导入.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. #!/usr/bin/env python
  2. """
  3. 任务ID: 37
  4. 任务名称: 产品库存表原始数据导入
  5. 任务描述: 把产品库存表的原始数据导入到数据资源的产品库存表中
  6. 更新模式: Append (追加模式)
  7. 源表: product_inventory_table_raw_data (原始数据表)
  8. 目标表: dags.test_product_inventory (数据资源表)
  9. 创建时间: 2026-01-16
  10. """
  11. from __future__ import annotations
  12. import os
  13. import sys
  14. from datetime import datetime
  15. from typing import Any
  16. import pandas as pd
  17. import psycopg2
  18. from loguru import logger
  19. # 添加项目根目录到Python路径
  20. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
  21. sys.path.insert(0, PROJECT_ROOT)
  22. from app.config.config import config, current_env
  23. # 获取配置
  24. app_config = config[current_env]
  25. def get_source_connection() -> psycopg2.extensions.connection:
  26. """
  27. 获取源数据库连接(原始数据库)
  28. Returns:
  29. psycopg2 连接对象
  30. """
  31. # 解析应用数据库URI作为源数据库
  32. # 实际使用时可根据任务描述中的数据源配置调整
  33. db_uri = app_config.SQLALCHEMY_DATABASE_URI
  34. # 解析连接字符串
  35. # 格式: postgresql://user:password@host:port/database
  36. if db_uri.startswith("postgresql://"):
  37. db_uri = db_uri[13:] # 移除 postgresql://
  38. # 分割用户信息和主机信息
  39. user_part, host_part = db_uri.split("@")
  40. username, password = user_part.split(":")
  41. host_db = host_part.split("/")
  42. host_port = host_db[0].split(":")
  43. host = host_port[0]
  44. port = int(host_port[1]) if len(host_port) > 1 else 5432
  45. database = host_db[1] if len(host_db) > 1 else "dataops"
  46. conn = psycopg2.connect(
  47. host=host,
  48. port=port,
  49. database=database,
  50. user=username,
  51. password=password,
  52. )
  53. logger.info(f"源数据库连接成功: {host}:{port}/{database}")
  54. return conn
  55. def get_target_connection() -> psycopg2.extensions.connection:
  56. """
  57. 获取目标数据库连接(数据资源数据库)
  58. 根据任务描述,目标数据库:
  59. - Host: 192.168.3.143
  60. - Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
  61. - Database: dataops
  62. - Schema: dags
  63. Returns:
  64. psycopg2 连接对象
  65. """
  66. conn = psycopg2.connect(
  67. host="192.168.3.143",
  68. port=5432,
  69. database="dataops",
  70. user="postgres",
  71. password="dataOps",
  72. options="-c search_path=dags,public",
  73. )
  74. logger.info("目标数据库连接成功: 192.168.3.143:5432/dataops (schema: dags)")
  75. return conn
  76. def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
  77. """
  78. 从源表提取数据
  79. Args:
  80. conn: 源数据库连接
  81. Returns:
  82. 包含源数据的DataFrame
  83. """
  84. query = """
  85. SELECT
  86. sku,
  87. product_name,
  88. category,
  89. brand,
  90. supplier,
  91. warehouse,
  92. current_stock,
  93. safety_stock,
  94. max_stock,
  95. unit_cost,
  96. selling_price,
  97. stock_status,
  98. last_inbound_date,
  99. last_outbound_date,
  100. inbound_quantity_30d,
  101. outbound_quantity_30d,
  102. turnover_rate,
  103. is_active,
  104. created_at,
  105. updated_at
  106. FROM product_inventory_table_raw_data
  107. """
  108. logger.info("正在从源表提取数据...")
  109. try:
  110. df = pd.read_sql(query, conn)
  111. logger.info(f"成功提取 {len(df)} 条记录")
  112. return df
  113. except Exception as e:
  114. logger.error(f"提取源数据失败: {e}")
  115. raise
  116. def transform_data(df: pd.DataFrame) -> pd.DataFrame:
  117. """
  118. 数据转换处理
  119. 根据任务描述:直接导入,无需特殊转换
  120. Args:
  121. df: 源数据DataFrame
  122. Returns:
  123. 转换后的DataFrame
  124. """
  125. logger.info("正在执行数据转换...")
  126. # 确保列名与目标表匹配(不区分大小写)
  127. df.columns = df.columns.str.lower()
  128. # 添加 create_time 字段(目标表可能需要)
  129. if "create_time" not in df.columns:
  130. df["create_time"] = datetime.now()
  131. logger.info(f"数据转换完成,共 {len(df)} 条记录")
  132. return df
  133. def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
  134. """
  135. 确保目标表存在,如果不存在则创建
  136. Args:
  137. conn: 目标数据库连接
  138. """
  139. cursor = conn.cursor()
  140. target_table = "test_product_inventory"
  141. target_schema = "dags"
  142. try:
  143. # 检查表是否存在
  144. cursor.execute(
  145. """
  146. SELECT EXISTS(
  147. SELECT 1 FROM information_schema.tables
  148. WHERE table_schema = %s
  149. AND table_name = %s
  150. )
  151. """,
  152. (target_schema, target_table),
  153. )
  154. result = cursor.fetchone()
  155. exists = result[0] if result else False
  156. if not exists:
  157. logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...")
  158. # 创建表 SQL(根据任务描述中的 DDL)
  159. create_table_sql = f"""
  160. CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
  161. id SERIAL PRIMARY KEY,
  162. sku VARCHAR(50),
  163. product_name VARCHAR(200),
  164. category VARCHAR(100),
  165. brand VARCHAR(100),
  166. supplier VARCHAR(200),
  167. warehouse VARCHAR(100),
  168. current_stock INTEGER,
  169. safety_stock INTEGER,
  170. max_stock INTEGER,
  171. unit_cost NUMERIC(10, 2),
  172. selling_price NUMERIC(10, 2),
  173. stock_status VARCHAR(50),
  174. last_inbound_date DATE,
  175. last_outbound_date DATE,
  176. inbound_quantity_30d INTEGER,
  177. outbound_quantity_30d INTEGER,
  178. turnover_rate NUMERIC(5, 2),
  179. is_active BOOLEAN,
  180. created_at TIMESTAMP,
  181. updated_at TIMESTAMP,
  182. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  183. );
  184. """
  185. cursor.execute(create_table_sql)
  186. # 添加表注释
  187. cursor.execute(
  188. f"COMMENT ON TABLE {target_schema}.{target_table} IS '产品库存表'"
  189. )
  190. # 添加列注释
  191. column_comments = {
  192. "sku": "SKU",
  193. "product_name": "产品名称",
  194. "category": "类别",
  195. "brand": "品牌",
  196. "supplier": "供应商",
  197. "warehouse": "仓库",
  198. "current_stock": "当前库存",
  199. "safety_stock": "安全库存",
  200. "max_stock": "最大库存",
  201. "unit_cost": "单位成本",
  202. "selling_price": "销售价格",
  203. "stock_status": "库存状态",
  204. "last_inbound_date": "最近入库日期",
  205. "last_outbound_date": "最近出库日期",
  206. "inbound_quantity_30d": "30天入库数量",
  207. "outbound_quantity_30d": "30天出库数量",
  208. "turnover_rate": "周转率",
  209. "is_active": "是否启用",
  210. "created_at": "创建时间",
  211. "updated_at": "更新时间",
  212. "create_time": "数据创建时间",
  213. }
  214. for col_name, col_comment in column_comments.items():
  215. cursor.execute(
  216. f"COMMENT ON COLUMN {target_schema}.{target_table}.{col_name} IS %s",
  217. (col_comment,),
  218. )
  219. conn.commit()
  220. logger.info(f"目标表 {target_schema}.{target_table} 创建成功")
  221. else:
  222. logger.info(f"目标表 {target_schema}.{target_table} 已存在")
  223. except Exception as e:
  224. conn.rollback()
  225. logger.error(f"创建目标表失败: {e}")
  226. raise
  227. finally:
  228. cursor.close()
  229. def load_to_target(
  230. df: pd.DataFrame,
  231. conn: psycopg2.extensions.connection,
  232. batch_size: int = 1000,
  233. ) -> int:
  234. """
  235. 将数据加载到目标表(追加模式)
  236. Args:
  237. df: 要加载的DataFrame
  238. conn: 目标数据库连接
  239. batch_size: 批量插入大小
  240. Returns:
  241. 插入的记录数
  242. """
  243. if df.empty:
  244. logger.warning("没有数据需要加载")
  245. return 0
  246. logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
  247. target_table = "dags.test_product_inventory"
  248. # 准备插入的列
  249. columns = [
  250. "sku",
  251. "product_name",
  252. "category",
  253. "brand",
  254. "supplier",
  255. "warehouse",
  256. "current_stock",
  257. "safety_stock",
  258. "max_stock",
  259. "unit_cost",
  260. "selling_price",
  261. "stock_status",
  262. "last_inbound_date",
  263. "last_outbound_date",
  264. "inbound_quantity_30d",
  265. "outbound_quantity_30d",
  266. "turnover_rate",
  267. "is_active",
  268. "created_at",
  269. "updated_at",
  270. ]
  271. # 构建插入SQL(使用完整的 schema.table 格式)
  272. placeholders = ", ".join(["%s"] * len(columns))
  273. column_names = ", ".join(columns)
  274. insert_sql = f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
  275. cursor = conn.cursor()
  276. inserted_count = 0
  277. try:
  278. for i in range(0, len(df), batch_size):
  279. batch_df = df.iloc[i : i + batch_size]
  280. records = []
  281. for _, row in batch_df.iterrows():
  282. record = tuple(
  283. row[col] if col in row.index else None for col in columns
  284. )
  285. records.append(record)
  286. cursor.executemany(insert_sql, records)
  287. inserted_count += len(records)
  288. logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
  289. conn.commit()
  290. logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
  291. return inserted_count
  292. except Exception as e:
  293. conn.rollback()
  294. logger.error(f"数据加载失败: {e}")
  295. raise
  296. finally:
  297. cursor.close()
  298. def main() -> dict[str, Any]:
  299. """
  300. 主函数:执行ETL流程
  301. Returns:
  302. 执行结果字典
  303. """
  304. result = {
  305. "task_id": 37,
  306. "task_name": "产品库存表原始数据导入",
  307. "status": "failed",
  308. "records_extracted": 0,
  309. "records_loaded": 0,
  310. "error_message": None,
  311. "execution_time": None,
  312. }
  313. start_time = datetime.now()
  314. source_conn = None
  315. target_conn = None
  316. try:
  317. logger.info("=" * 60)
  318. logger.info("任务开始: 产品库存表原始数据导入")
  319. logger.info("=" * 60)
  320. # 步骤1: 建立数据库连接
  321. logger.info("[Step 1/5] 建立数据库连接...")
  322. source_conn = get_source_connection()
  323. target_conn = get_target_connection()
  324. # 步骤2: 确保目标表存在
  325. logger.info("[Step 2/5] 检查/创建目标表...")
  326. ensure_target_table_exists(target_conn)
  327. # 步骤3: 从源表提取数据
  328. logger.info("[Step 3/5] 提取源数据...")
  329. df = extract_source_data(source_conn)
  330. result["records_extracted"] = len(df)
  331. # 步骤4: 数据转换
  332. logger.info("[Step 4/5] 数据转换...")
  333. df_transformed = transform_data(df)
  334. # 步骤5: 加载到目标表(追加模式)
  335. logger.info("[Step 5/5] 加载数据到目标表...")
  336. records_loaded = load_to_target(df_transformed, target_conn)
  337. result["records_loaded"] = records_loaded
  338. result["status"] = "success"
  339. logger.info("=" * 60)
  340. logger.info(
  341. f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}"
  342. )
  343. logger.info("=" * 60)
  344. except Exception as e:
  345. result["status"] = "failed"
  346. result["error_message"] = str(e)
  347. logger.error(f"任务执行失败: {e}")
  348. raise
  349. finally:
  350. # 关闭数据库连接
  351. if source_conn:
  352. source_conn.close()
  353. logger.debug("源数据库连接已关闭")
  354. if target_conn:
  355. target_conn.close()
  356. logger.debug("目标数据库连接已关闭")
  357. result["execution_time"] = str(datetime.now() - start_time)
  358. return result
  359. if __name__ == "__main__":
  360. # 配置日志
  361. # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
  362. logger.remove()
  363. logger.add(
  364. sys.stdout,
  365. level="INFO",
  366. format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
  367. )
  368. logger.add(
  369. os.path.join(PROJECT_ROOT, "logs", "task_37_产品库存表原始数据导入.log"),
  370. level="DEBUG",
  371. rotation="10 MB",
  372. retention="7 days",
  373. encoding="utf-8",
  374. )
  375. try:
  376. result = main()
  377. if result["status"] == "success":
  378. sys.exit(0)
  379. else:
  380. sys.exit(1)
  381. except Exception as e:
  382. logger.exception(f"脚本执行异常: {e}")
  383. sys.exit(1)