task_44_DF_DO202601210001.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. #!/usr/bin/env python
  2. """
  3. 数据流任务脚本 - 仓库库存汇总表_数据流程
  4. 任务ID: 44
  5. 任务名称: DF_DO202601210001
  6. 创建时间: 2026-01-21
  7. 更新模式: Append (追加模式)
  8. 关联信息:
  9. - Order ID: 26
  10. - Order No: DO202601210001
  11. - DataFlow ID: 2291
  12. - DataFlow Name: 仓库库存汇总表_数据流程
  13. - Product ID: 23
  14. 描述: 从源表 test_product_inventory 提取仓库名称和库存数量,
  15. 按仓库名称分组求和,输出到 warehouse_inventory_summary 表
  16. """
  17. from __future__ import annotations
  18. import os
  19. import sys
  20. from datetime import datetime
  21. from typing import Any
  22. import pandas as pd
  23. import psycopg2
  24. from loguru import logger
  25. # 添加项目根目录到Python路径
  26. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
  27. sys.path.insert(0, PROJECT_ROOT)
  28. # 任务配置
  29. TASK_ID = 44
  30. TASK_NAME = "DF_DO202601210001"
  31. UPDATE_MODE = "append"
  32. # 源数据库配置(与目标相同)
  33. SOURCE_CONFIG = {
  34. "host": "192.168.3.143",
  35. "port": 5432, # PostgreSQL 默认端口
  36. "database": "dataops",
  37. "user": "postgres",
  38. "password": "dataOps",
  39. }
  40. # 目标数据库配置
  41. TARGET_CONFIG = {
  42. "host": "192.168.3.143",
  43. "port": 5432, # PostgreSQL 默认端口
  44. "database": "dataops",
  45. "user": "postgres",
  46. "password": "dataOps",
  47. }
  48. # 源表配置
  49. SOURCE_SCHEMA = "dags"
  50. SOURCE_TABLE = "test_product_inventory"
  51. # 目标表配置
  52. TARGET_SCHEMA = "dags"
  53. TARGET_TABLE = "warehouse_inventory_summary"
  54. def get_source_connection() -> psycopg2.extensions.connection:
  55. """
  56. 获取源数据库连接
  57. Returns:
  58. psycopg2 连接对象
  59. """
  60. conn = psycopg2.connect(
  61. host=SOURCE_CONFIG["host"],
  62. port=SOURCE_CONFIG["port"],
  63. database=SOURCE_CONFIG["database"],
  64. user=SOURCE_CONFIG["user"],
  65. password=SOURCE_CONFIG["password"],
  66. options=f"-c search_path={SOURCE_SCHEMA},public",
  67. )
  68. logger.info("源数据库连接成功")
  69. return conn
  70. def get_target_connection() -> psycopg2.extensions.connection:
  71. """
  72. 获取目标数据库连接
  73. Returns:
  74. psycopg2 连接对象
  75. """
  76. conn = psycopg2.connect(
  77. host=TARGET_CONFIG["host"],
  78. port=TARGET_CONFIG["port"],
  79. database=TARGET_CONFIG["database"],
  80. user=TARGET_CONFIG["user"],
  81. password=TARGET_CONFIG["password"],
  82. options=f"-c search_path={TARGET_SCHEMA},public",
  83. )
  84. logger.info("目标数据库连接成功")
  85. return conn
  86. def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
  87. """
  88. 确保目标表存在,如果不存在则创建
  89. Args:
  90. conn: 目标数据库连接
  91. """
  92. cursor = conn.cursor()
  93. try:
  94. # 检查表是否存在
  95. cursor.execute(
  96. """
  97. SELECT EXISTS(
  98. SELECT 1 FROM information_schema.tables
  99. WHERE table_schema = %s
  100. AND table_name = %s
  101. )
  102. """,
  103. (TARGET_SCHEMA, TARGET_TABLE),
  104. )
  105. result = cursor.fetchone()
  106. exists = result[0] if result else False
  107. if not exists:
  108. logger.info(f"目标表不存在,正在创建 {TARGET_SCHEMA}.{TARGET_TABLE}...")
  109. # 根据任务描述中的 DDL 创建表
  110. create_table_sql = f"""
  111. CREATE TABLE IF NOT EXISTS {TARGET_SCHEMA}.{TARGET_TABLE} (
  112. warehouse_name VARCHAR(255),
  113. total_inventory INTEGER,
  114. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  115. );
  116. """
  117. cursor.execute(create_table_sql)
  118. # 添加表注释
  119. cursor.execute(
  120. f"COMMENT ON TABLE {TARGET_SCHEMA}.{TARGET_TABLE} IS '仓库库存汇总表'"
  121. )
  122. # 添加列注释
  123. column_comments = {
  124. "warehouse_name": "仓库名称",
  125. "total_inventory": "库存数量",
  126. "create_time": "数据创建时间",
  127. }
  128. for col, comment in column_comments.items():
  129. try:
  130. cursor.execute(
  131. f"COMMENT ON COLUMN {TARGET_SCHEMA}.{TARGET_TABLE}.{col} IS %s",
  132. (comment,),
  133. )
  134. except Exception as e:
  135. logger.warning(f"添加列注释失败 {col}: {e}")
  136. conn.commit()
  137. logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 创建成功")
  138. else:
  139. logger.info(f"目标表 {TARGET_SCHEMA}.{TARGET_TABLE} 已存在")
  140. except Exception as e:
  141. conn.rollback()
  142. logger.error(f"创建目标表失败: {e}")
  143. raise
  144. finally:
  145. cursor.close()
  146. def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
  147. """
  148. 从源表提取数据
  149. 根据任务描述:
  150. 1. 从源数据'产品库存表'中提取'仓库名称'字段;
  151. 2. 对'产品库存表'中的'库存数量'字段进行求和计算;
  152. 3. 按'仓库名称'进行分组;
  153. 4. 最终输出数据格式包含'仓库名称'和对应的'库存数量'两个字段。
  154. Args:
  155. conn: 源数据库连接
  156. Returns:
  157. 包含汇总数据的DataFrame
  158. """
  159. query = f"""
  160. SELECT
  161. warehouse AS warehouse_name,
  162. SUM(current_stock) AS total_inventory
  163. FROM {SOURCE_SCHEMA}.{SOURCE_TABLE}
  164. WHERE warehouse IS NOT NULL
  165. GROUP BY warehouse
  166. ORDER BY warehouse
  167. """
  168. logger.info("正在从源表提取并汇总数据...")
  169. try:
  170. df = pd.read_sql(query, conn)
  171. logger.info(f"成功提取 {len(df)} 条汇总记录")
  172. return df
  173. except Exception as e:
  174. logger.error(f"提取源数据失败: {e}")
  175. raise
  176. def transform_data(df: pd.DataFrame) -> pd.DataFrame:
  177. """
  178. 数据转换处理
  179. Args:
  180. df: 源数据DataFrame
  181. Returns:
  182. 转换后的DataFrame
  183. """
  184. logger.info("正在执行数据转换...")
  185. # 数据已在SQL中完成汇总,此处仅做数据清洗
  186. # 确保字段名称与目标表一致
  187. df = df.rename(
  188. columns={
  189. "warehouse_name": "warehouse_name",
  190. "total_inventory": "total_inventory",
  191. }
  192. )
  193. # 处理空值:将 None/NaN 的库存数量设为 0
  194. df["total_inventory"] = df["total_inventory"].fillna(0).astype(int)
  195. logger.info(f"数据转换完成,共 {len(df)} 条记录")
  196. return df
  197. def load_to_target(
  198. df: pd.DataFrame,
  199. conn: psycopg2.extensions.connection,
  200. update_mode: str = "append",
  201. batch_size: int = 1000,
  202. ) -> int:
  203. """
  204. 将数据加载到目标表
  205. Args:
  206. df: 要加载的DataFrame
  207. conn: 目标数据库连接
  208. update_mode: 更新模式(append 或 full)
  209. batch_size: 批量插入大小
  210. Returns:
  211. 插入的记录数
  212. """
  213. if df.empty:
  214. logger.warning("没有数据需要加载")
  215. return 0
  216. logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
  217. target_table = f"{TARGET_SCHEMA}.{TARGET_TABLE}"
  218. cursor = conn.cursor()
  219. inserted_count = 0
  220. try:
  221. # 全量更新模式:先清空目标表
  222. if update_mode.lower() == "full":
  223. logger.info("全量更新模式:清空目标表...")
  224. cursor.execute(f"TRUNCATE TABLE {target_table}")
  225. logger.info("目标表已清空")
  226. # 目标表结构准备插入的列(不包含 create_time,由数据库自动设置)
  227. columns = ["warehouse_name", "total_inventory"]
  228. # 构建插入SQL
  229. placeholders = ", ".join(["%s"] * len(columns))
  230. column_names = ", ".join(columns)
  231. insert_sql = (
  232. f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
  233. )
  234. # 批量插入
  235. for i in range(0, len(df), batch_size):
  236. batch_df = df.iloc[i : i + batch_size]
  237. records = []
  238. for _, row in batch_df.iterrows():
  239. record = tuple(
  240. None if pd.isna(row.get(col)) else row.get(col) for col in columns
  241. )
  242. records.append(record)
  243. cursor.executemany(insert_sql, records)
  244. inserted_count += len(records)
  245. logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
  246. conn.commit()
  247. logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
  248. return inserted_count
  249. except Exception as e:
  250. conn.rollback()
  251. logger.error(f"数据加载失败: {e}")
  252. raise
  253. finally:
  254. cursor.close()
  255. def main() -> dict[str, Any]:
  256. """
  257. 主函数:执行ETL流程
  258. Returns:
  259. 执行结果字典
  260. """
  261. result = {
  262. "task_id": TASK_ID,
  263. "task_name": TASK_NAME,
  264. "status": "failed",
  265. "records_extracted": 0,
  266. "records_loaded": 0,
  267. "error_message": None,
  268. "execution_time": None,
  269. }
  270. start_time = datetime.now()
  271. source_conn = None
  272. target_conn = None
  273. try:
  274. logger.info("=" * 60)
  275. logger.info(f"任务开始: {TASK_NAME}")
  276. logger.info("=" * 60)
  277. # 步骤1: 建立数据库连接
  278. logger.info("[Step 1/5] 建立数据库连接...")
  279. source_conn = get_source_connection()
  280. target_conn = get_target_connection()
  281. # 步骤2: 确保目标表存在(重要:必须在数据加载前执行)
  282. logger.info("[Step 2/5] 检查/创建目标表...")
  283. ensure_target_table_exists(target_conn)
  284. # 步骤3: 从源表提取数据
  285. logger.info("[Step 3/5] 提取并汇总源数据...")
  286. df = extract_source_data(source_conn)
  287. result["records_extracted"] = len(df)
  288. # 步骤4: 数据转换
  289. logger.info("[Step 4/5] 数据转换...")
  290. df_transformed = transform_data(df)
  291. # 步骤5: 加载到目标表
  292. logger.info("[Step 5/5] 加载数据到目标表...")
  293. records_loaded = load_to_target(
  294. df_transformed, target_conn, update_mode=UPDATE_MODE
  295. )
  296. result["records_loaded"] = records_loaded
  297. result["status"] = "success"
  298. logger.info("=" * 60)
  299. logger.info(
  300. f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}"
  301. )
  302. logger.info("=" * 60)
  303. except Exception as e:
  304. result["status"] = "failed"
  305. result["error_message"] = str(e)
  306. logger.error(f"任务执行失败: {e}")
  307. raise
  308. finally:
  309. # 关闭数据库连接
  310. if source_conn:
  311. source_conn.close()
  312. logger.debug("源数据库连接已关闭")
  313. if target_conn:
  314. target_conn.close()
  315. logger.debug("目标数据库连接已关闭")
  316. result["execution_time"] = str(datetime.now() - start_time)
  317. return result
  318. if __name__ == "__main__":
  319. # 配置日志
  320. # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
  321. logger.remove()
  322. logger.add(
  323. sys.stdout,
  324. level="INFO",
  325. format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
  326. )
  327. logger.add(
  328. os.path.join(PROJECT_ROOT, "logs", f"task_{TASK_ID}.log"),
  329. level="DEBUG",
  330. rotation="10 MB",
  331. retention="7 days",
  332. encoding="utf-8",
  333. )
  334. try:
  335. result = main()
  336. if result["status"] == "success":
  337. sys.exit(0)
  338. else:
  339. sys.exit(1)
  340. except Exception as e:
  341. logger.exception(f"脚本执行异常: {e}")
  342. sys.exit(1)