task_38_DF_DO202601160001.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. #!/usr/bin/env python
  2. """
  3. 任务ID: 38
  4. 任务名称: DF_DO202601160001
  5. 任务描述: 仓库库存汇总统计
  6. 1. 从标签为数据资源的产品库存表中提取字段:仓库编号、库存数量
  7. 2. 按照仓库进行分组,对库存数量进行求和计算
  8. 3. 无特殊过滤条件
  9. 4. 最终输出数据格式包含字段:仓库编号、总库存数量
  10. 更新模式: Full Refresh (全量更新)
  11. 源表: dags.test_product_inventory (数据资源-产品库存表)
  12. 目标表: dags.warehouse_inventory_summary (仓库库存汇总表)
  13. 创建时间: 2026-01-16
  14. """
  15. from __future__ import annotations
  16. import os
  17. import sys
  18. from datetime import datetime
  19. from typing import Any
  20. import pandas as pd
  21. import psycopg2
  22. from loguru import logger
  23. # 添加项目根目录到Python路径
  24. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
  25. sys.path.insert(0, PROJECT_ROOT)
  26. from app.config.config import config, current_env
  27. # 获取配置
  28. app_config = config[current_env]
  29. def get_database_connection() -> psycopg2.extensions.connection:
  30. """
  31. 获取数据库连接
  32. 根据任务描述,数据库配置:
  33. - Host: 192.168.3.143
  34. - Port: 5432 (标准 PostgreSQL 端口,任务描述中的 5678 有误)
  35. - Database: dataops
  36. - Schema: dags (源表 test_product_inventory 和目标表 warehouse_inventory_summary 都在 dags schema)
  37. Returns:
  38. psycopg2 连接对象
  39. """
  40. conn = psycopg2.connect(
  41. host="192.168.3.143",
  42. port=5432,
  43. database="dataops",
  44. user="postgres",
  45. password="dataOps",
  46. options="-c search_path=dags,public", # 确保可以访问 dags 和 public schema
  47. )
  48. logger.info("数据库连接成功: 192.168.3.143:5432/dataops (schema: dags,public)")
  49. return conn
  50. def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
  51. """
  52. 确保目标表存在,如果不存在则创建
  53. Args:
  54. conn: 数据库连接
  55. """
  56. cursor = conn.cursor()
  57. target_table = "warehouse_inventory_summary"
  58. target_schema = "dags"
  59. try:
  60. # 检查表是否存在
  61. cursor.execute(
  62. """
  63. SELECT EXISTS(
  64. SELECT 1 FROM information_schema.tables
  65. WHERE table_schema = %s
  66. AND table_name = %s
  67. )
  68. """,
  69. (target_schema, target_table),
  70. )
  71. result = cursor.fetchone()
  72. exists = result[0] if result else False
  73. if not exists:
  74. logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...")
  75. # PostgreSQL 不支持在列定义中使用 COMMENT,需要分开
  76. create_table_sql = f"""
  77. CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
  78. id BIGSERIAL PRIMARY KEY,
  79. warehouse VARCHAR(100) NOT NULL,
  80. total_stock INTEGER NOT NULL DEFAULT 0,
  81. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  82. );
  83. """
  84. cursor.execute(create_table_sql)
  85. # 添加注释
  86. cursor.execute(
  87. f"COMMENT ON TABLE {target_schema}.{target_table} IS '仓库库存汇总表'"
  88. )
  89. cursor.execute(
  90. f"COMMENT ON COLUMN {target_schema}.{target_table}.warehouse IS '仓库编号'"
  91. )
  92. cursor.execute(
  93. f"COMMENT ON COLUMN {target_schema}.{target_table}.total_stock IS '总库存数量'"
  94. )
  95. cursor.execute(
  96. f"COMMENT ON COLUMN {target_schema}.{target_table}.create_time IS '数据创建时间'"
  97. )
  98. conn.commit()
  99. logger.info(f"目标表 {target_schema}.{target_table} 创建成功")
  100. else:
  101. logger.info(f"目标表 {target_schema}.{target_table} 已存在")
  102. except Exception as e:
  103. conn.rollback()
  104. logger.error(f"创建目标表失败: {e}")
  105. raise
  106. finally:
  107. cursor.close()
  108. def extract_and_transform(conn: psycopg2.extensions.connection) -> pd.DataFrame:
  109. """
  110. 从源表提取数据并进行转换
  111. 根据任务描述:
  112. 1. 从产品库存表中提取字段:仓库编号(warehouse)、库存数量(current_stock)
  113. 2. 按照仓库进行分组,对库存数量进行求和计算
  114. Args:
  115. conn: 数据库连接
  116. Returns:
  117. 转换后的DataFrame,包含 warehouse 和 total_stock 列
  118. """
  119. # 源表位于 dags schema(由任务 37 创建)
  120. query = """
  121. SELECT
  122. warehouse,
  123. SUM(current_stock) AS total_stock
  124. FROM dags.test_product_inventory
  125. GROUP BY warehouse
  126. ORDER BY warehouse
  127. """
  128. logger.info("正在从源表提取并汇总数据...")
  129. try:
  130. df = pd.read_sql(query, conn)
  131. logger.info(f"成功汇总 {len(df)} 个仓库的库存数据")
  132. return df
  133. except Exception as e:
  134. logger.error(f"数据提取转换失败: {e}")
  135. raise
  136. def load_to_target(
  137. df: pd.DataFrame,
  138. conn: psycopg2.extensions.connection,
  139. ) -> int:
  140. """
  141. 将数据加载到目标表(全量更新模式)
  142. Args:
  143. df: 要加载的DataFrame
  144. conn: 数据库连接
  145. Returns:
  146. 插入的记录数
  147. """
  148. if df.empty:
  149. logger.warning("没有数据需要加载")
  150. return 0
  151. cursor = conn.cursor()
  152. target_table = "dags.warehouse_inventory_summary"
  153. try:
  154. # 全量更新模式:先清空目标表
  155. logger.info("全量更新模式:清空目标表...")
  156. cursor.execute(f"TRUNCATE TABLE {target_table}")
  157. logger.info("目标表已清空")
  158. # 插入新数据
  159. insert_sql = f"""
  160. INSERT INTO {target_table} (warehouse, total_stock, create_time)
  161. VALUES (%s, %s, %s)
  162. """
  163. current_time = datetime.now()
  164. records = [
  165. (row["warehouse"], int(row["total_stock"]), current_time)
  166. for _, row in df.iterrows()
  167. ]
  168. cursor.executemany(insert_sql, records)
  169. conn.commit()
  170. inserted_count = len(records)
  171. logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
  172. return inserted_count
  173. except Exception as e:
  174. conn.rollback()
  175. logger.error(f"数据加载失败: {e}")
  176. raise
  177. finally:
  178. cursor.close()
  179. def main() -> dict[str, Any]:
  180. """
  181. 主函数:执行ETL流程
  182. Returns:
  183. 执行结果字典
  184. """
  185. result = {
  186. "task_id": 38,
  187. "task_name": "DF_DO202601160001",
  188. "status": "failed",
  189. "warehouses_processed": 0,
  190. "records_loaded": 0,
  191. "error_message": None,
  192. "execution_time": None,
  193. }
  194. start_time = datetime.now()
  195. conn = None
  196. try:
  197. logger.info("=" * 60)
  198. logger.info("任务开始: DF_DO202601160001 - 仓库库存汇总")
  199. logger.info("=" * 60)
  200. # 步骤1: 建立数据库连接
  201. logger.info("[Step 1/4] 建立数据库连接...")
  202. conn = get_database_connection()
  203. # 步骤2: 确保目标表存在
  204. logger.info("[Step 2/4] 检查/创建目标表...")
  205. ensure_target_table_exists(conn)
  206. # 步骤3: 提取并转换数据
  207. logger.info("[Step 3/4] 提取并转换数据...")
  208. df = extract_and_transform(conn)
  209. result["warehouses_processed"] = len(df)
  210. # 输出汇总结果预览
  211. if not df.empty:
  212. logger.info("仓库库存汇总预览:")
  213. for _, row in df.iterrows():
  214. logger.info(f" {row['warehouse']}: {row['total_stock']:,} 件")
  215. # 步骤4: 加载到目标表(全量更新)
  216. logger.info("[Step 4/4] 加载数据到目标表...")
  217. records_loaded = load_to_target(df, conn)
  218. result["records_loaded"] = records_loaded
  219. result["status"] = "success"
  220. logger.info("=" * 60)
  221. logger.info(
  222. f"任务完成! 处理仓库数: {result['warehouses_processed']}, 加载记录数: {result['records_loaded']}"
  223. )
  224. logger.info("=" * 60)
  225. except Exception as e:
  226. result["status"] = "failed"
  227. result["error_message"] = str(e)
  228. logger.error(f"任务执行失败: {e}")
  229. raise
  230. finally:
  231. # 关闭数据库连接
  232. if conn:
  233. conn.close()
  234. logger.debug("数据库连接已关闭")
  235. result["execution_time"] = str(datetime.now() - start_time)
  236. return result
  237. if __name__ == "__main__":
  238. # 配置日志
  239. # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
  240. logger.remove()
  241. logger.add(
  242. sys.stdout,
  243. level="INFO",
  244. format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
  245. )
  246. logger.add(
  247. os.path.join(PROJECT_ROOT, "logs", "task_38_DF_DO202601160001.log"),
  248. level="DEBUG",
  249. rotation="10 MB",
  250. retention="7 days",
  251. encoding="utf-8",
  252. )
  253. try:
  254. result = main()
  255. if result["status"] == "success":
  256. sys.exit(0)
  257. else:
  258. sys.exit(1)
  259. except Exception as e:
  260. logger.exception(f"脚本执行异常: {e}")
  261. sys.exit(1)