script_template_with_table_check.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. #!/usr/bin/env python
  2. """
  3. 数据流任务脚本模板(包含目标表检测和自动创建功能)
  4. 重要要求:
  5. 1. 所有数据流脚本必须包含目标表检测功能
  6. 2. 如果目标表不存在,脚本必须自动创建目标表
  7. 3. 表创建应基于任务描述中的 DDL 定义
  8. 4. 表创建应在数据加载之前执行
  9. 创建时间: 2026-01-16
  10. """
  11. from __future__ import annotations
  12. import os
  13. import sys
  14. from datetime import datetime
  15. from typing import Any
  16. import pandas as pd
  17. import psycopg2
  18. from loguru import logger
  19. # 添加项目根目录到Python路径
  20. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
  21. sys.path.insert(0, PROJECT_ROOT)
  22. from app.config.config import config, current_env
  23. # 获取配置
  24. app_config = config[current_env]
  25. def get_source_connection() -> psycopg2.extensions.connection:
  26. """
  27. 获取源数据库连接
  28. Returns:
  29. psycopg2 连接对象
  30. """
  31. # 根据任务描述配置源数据库连接
  32. # TODO: 根据实际任务描述配置
  33. conn = psycopg2.connect(
  34. host="192.168.3.143",
  35. port=5432,
  36. database="dataops",
  37. user="postgres",
  38. password="dataOps",
  39. )
  40. logger.info("源数据库连接成功")
  41. return conn
  42. def get_target_connection() -> psycopg2.extensions.connection:
  43. """
  44. 获取目标数据库连接
  45. Returns:
  46. psycopg2 连接对象
  47. """
  48. # 根据任务描述配置目标数据库连接
  49. # TODO: 根据实际任务描述配置
  50. conn = psycopg2.connect(
  51. host="192.168.3.143",
  52. port=5432,
  53. database="dataops",
  54. user="postgres",
  55. password="dataOps",
  56. options="-c search_path=dags,public", # 根据实际 schema 配置
  57. )
  58. logger.info("目标数据库连接成功")
  59. return conn
  60. def ensure_target_table_exists(conn: psycopg2.extensions.connection) -> None:
  61. """
  62. 确保目标表存在,如果不存在则创建
  63. 重要:此函数必须根据任务描述中的目标表 DDL 来实现
  64. Args:
  65. conn: 目标数据库连接
  66. """
  67. cursor = conn.cursor()
  68. target_table = "target_table_name" # TODO: 替换为实际表名
  69. target_schema = "public" # TODO: 替换为实际 schema(如 dags, public 等)
  70. try:
  71. # 检查表是否存在
  72. cursor.execute(
  73. """
  74. SELECT EXISTS(
  75. SELECT 1 FROM information_schema.tables
  76. WHERE table_schema = %s
  77. AND table_name = %s
  78. )
  79. """,
  80. (target_schema, target_table),
  81. )
  82. result = cursor.fetchone()
  83. exists = result[0] if result else False
  84. if not exists:
  85. logger.info(f"目标表不存在,正在创建 {target_schema}.{target_table}...")
  86. # TODO: 根据任务描述中的 DDL 创建表
  87. # 示例:根据任务描述中的 CREATE TABLE 语句创建
  88. create_table_sql = f"""
  89. CREATE TABLE IF NOT EXISTS {target_schema}.{target_table} (
  90. id SERIAL PRIMARY KEY,
  91. -- TODO: 根据任务描述中的 DDL 添加所有列
  92. create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  93. );
  94. """
  95. cursor.execute(create_table_sql)
  96. # 添加表注释
  97. cursor.execute(
  98. f"COMMENT ON TABLE {target_schema}.{target_table} IS '表注释'"
  99. )
  100. # 添加列注释(根据任务描述中的 COMMENT)
  101. # TODO: 根据任务描述添加列注释
  102. conn.commit()
  103. logger.info(f"目标表 {target_schema}.{target_table} 创建成功")
  104. else:
  105. logger.info(f"目标表 {target_schema}.{target_table} 已存在")
  106. except Exception as e:
  107. conn.rollback()
  108. logger.error(f"创建目标表失败: {e}")
  109. raise
  110. finally:
  111. cursor.close()
  112. def extract_source_data(conn: psycopg2.extensions.connection) -> pd.DataFrame:
  113. """
  114. 从源表提取数据
  115. Args:
  116. conn: 源数据库连接
  117. Returns:
  118. 包含源数据的DataFrame
  119. """
  120. # TODO: 根据任务描述编写提取数据的 SQL
  121. query = """
  122. SELECT *
  123. FROM source_table
  124. """
  125. logger.info("正在从源表提取数据...")
  126. try:
  127. df = pd.read_sql(query, conn)
  128. logger.info(f"成功提取 {len(df)} 条记录")
  129. return df
  130. except Exception as e:
  131. logger.error(f"提取源数据失败: {e}")
  132. raise
  133. def transform_data(df: pd.DataFrame) -> pd.DataFrame:
  134. """
  135. 数据转换处理
  136. Args:
  137. df: 源数据DataFrame
  138. Returns:
  139. 转换后的DataFrame
  140. """
  141. logger.info("正在执行数据转换...")
  142. # TODO: 根据任务描述中的 rule 实现数据转换逻辑
  143. logger.info(f"数据转换完成,共 {len(df)} 条记录")
  144. return df
  145. def load_to_target(
  146. df: pd.DataFrame,
  147. conn: psycopg2.extensions.connection,
  148. update_mode: str = "append",
  149. batch_size: int = 1000,
  150. ) -> int:
  151. """
  152. 将数据加载到目标表
  153. Args:
  154. df: 要加载的DataFrame
  155. conn: 目标数据库连接
  156. update_mode: 更新模式(append 或 full)
  157. batch_size: 批量插入大小
  158. Returns:
  159. 插入的记录数
  160. """
  161. if df.empty:
  162. logger.warning("没有数据需要加载")
  163. return 0
  164. logger.info(f"正在将 {len(df)} 条记录加载到目标表...")
  165. target_table = "schema.target_table_name" # TODO: 替换为实际表名(包含 schema)
  166. cursor = conn.cursor()
  167. inserted_count = 0
  168. try:
  169. # 全量更新模式:先清空目标表
  170. if update_mode.lower() == "full":
  171. logger.info("全量更新模式:清空目标表...")
  172. cursor.execute(f"TRUNCATE TABLE {target_table}")
  173. logger.info("目标表已清空")
  174. # TODO: 根据目标表结构准备插入的列
  175. columns = ["col1", "col2", "col3"] # TODO: 替换为实际列名
  176. # 构建插入SQL
  177. placeholders = ", ".join(["%s"] * len(columns))
  178. column_names = ", ".join(columns)
  179. insert_sql = (
  180. f"INSERT INTO {target_table} ({column_names}) VALUES ({placeholders})"
  181. )
  182. # 批量插入
  183. for i in range(0, len(df), batch_size):
  184. batch_df = df.iloc[i : i + batch_size]
  185. records = []
  186. for _, row in batch_df.iterrows():
  187. record = tuple(
  188. row[col] if col in row.index else None for col in columns
  189. )
  190. records.append(record)
  191. cursor.executemany(insert_sql, records)
  192. inserted_count += len(records)
  193. logger.debug(f"已插入 {inserted_count}/{len(df)} 条记录")
  194. conn.commit()
  195. logger.info(f"成功加载 {inserted_count} 条记录到 {target_table}")
  196. return inserted_count
  197. except Exception as e:
  198. conn.rollback()
  199. logger.error(f"数据加载失败: {e}")
  200. raise
  201. finally:
  202. cursor.close()
  203. def main() -> dict[str, Any]:
  204. """
  205. 主函数:执行ETL流程
  206. Returns:
  207. 执行结果字典
  208. """
  209. result = {
  210. "task_id": 0, # TODO: 替换为实际任务ID
  211. "task_name": "任务名称", # TODO: 替换为实际任务名称
  212. "status": "failed",
  213. "records_extracted": 0,
  214. "records_loaded": 0,
  215. "error_message": None,
  216. "execution_time": None,
  217. }
  218. start_time = datetime.now()
  219. source_conn = None
  220. target_conn = None
  221. try:
  222. logger.info("=" * 60)
  223. logger.info("任务开始: 任务名称") # TODO: 替换为实际任务名称
  224. logger.info("=" * 60)
  225. # 步骤1: 建立数据库连接
  226. logger.info("[Step 1/5] 建立数据库连接...")
  227. source_conn = get_source_connection()
  228. target_conn = get_target_connection()
  229. # 步骤2: 确保目标表存在(重要:必须在数据加载前执行)
  230. logger.info("[Step 2/5] 检查/创建目标表...")
  231. ensure_target_table_exists(target_conn)
  232. # 步骤3: 从源表提取数据
  233. logger.info("[Step 3/5] 提取源数据...")
  234. df = extract_source_data(source_conn)
  235. result["records_extracted"] = len(df)
  236. # 步骤4: 数据转换
  237. logger.info("[Step 4/5] 数据转换...")
  238. df_transformed = transform_data(df)
  239. # 步骤5: 加载到目标表
  240. logger.info("[Step 5/5] 加载数据到目标表...")
  241. records_loaded = load_to_target(
  242. df_transformed, target_conn, update_mode="append"
  243. )
  244. result["records_loaded"] = records_loaded
  245. result["status"] = "success"
  246. logger.info("=" * 60)
  247. logger.info(
  248. f"任务完成! 提取: {result['records_extracted']}, 加载: {result['records_loaded']}"
  249. )
  250. logger.info("=" * 60)
  251. except Exception as e:
  252. result["status"] = "failed"
  253. result["error_message"] = str(e)
  254. logger.error(f"任务执行失败: {e}")
  255. raise
  256. finally:
  257. # 关闭数据库连接
  258. if source_conn:
  259. source_conn.close()
  260. logger.debug("源数据库连接已关闭")
  261. if target_conn:
  262. target_conn.close()
  263. logger.debug("目标数据库连接已关闭")
  264. result["execution_time"] = str(datetime.now() - start_time)
  265. return result
  266. if __name__ == "__main__":
  267. # 配置日志
  268. # 重要:日志输出到 stdout 而非 stderr,以便 n8n 工作流正确解析输出
  269. # n8n SSH 节点的成功响应使用 $json.stdout,失败响应使用 $json.stderr
  270. logger.remove()
  271. logger.add(
  272. sys.stdout,
  273. level="INFO",
  274. format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
  275. )
  276. logger.add(
  277. os.path.join(
  278. PROJECT_ROOT, "logs", "task_xxx.log"
  279. ), # TODO: 替换为实际日志文件名
  280. level="DEBUG",
  281. rotation="10 MB",
  282. retention="7 days",
  283. encoding="utf-8",
  284. )
  285. try:
  286. result = main()
  287. if result["status"] == "success":
  288. sys.exit(0)
  289. else:
  290. sys.exit(1)
  291. except Exception as e:
  292. logger.exception(f"脚本执行异常: {e}")
  293. sys.exit(1)