execution_python.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import os
  5. import logging
  6. from datetime import datetime
  7. import psycopg2
  8. import textwrap
  9. from airflow.exceptions import AirflowException
  10. # 配置日志
  11. logging.basicConfig(
  12. level=logging.INFO,
  13. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  14. handlers=[logging.StreamHandler(sys.stdout)]
  15. )
  16. logger = logging.getLogger("execution_python")
  17. # 将同级目录加入到Python搜索路径
  18. current_dir = os.path.dirname(os.path.abspath(__file__))
  19. if current_dir not in sys.path:
  20. sys.path.append(current_dir)
  21. # 尝试导入script_utils,使用多级导入策略
  22. try:
  23. import script_utils
  24. logger.info("成功导入script_utils模块")
  25. except ImportError as e:
  26. logger.error(f"无法直接导入script_utils: {str(e)}")
  27. # 尝试备用方法1:完整路径导入
  28. try:
  29. sys.path.append(os.path.dirname(current_dir)) # 添加父目录
  30. import dataops.scripts.script_utils as script_utils
  31. logger.info("使用完整路径成功导入script_utils模块")
  32. except ImportError as e2:
  33. logger.error(f"使用完整路径导入失败: {str(e2)}")
  34. # 尝试备用方法2:动态导入
  35. try:
  36. import importlib.util
  37. script_utils_path = os.path.join(current_dir, "script_utils.py")
  38. logger.info(f"尝试从路径动态导入: {script_utils_path}")
  39. spec = importlib.util.spec_from_file_location("script_utils", script_utils_path)
  40. script_utils = importlib.util.module_from_spec(spec)
  41. spec.loader.exec_module(script_utils)
  42. logger.info("通过动态导入成功加载script_utils模块")
  43. except Exception as e3:
  44. logger.error(f"动态导入也失败: {str(e3)}")
  45. raise ImportError(f"无法导入script_utils模块,所有方法都失败")
  46. # 动态导入 config
  47. def get_config():
  48. """
  49. 从config模块导入配置
  50. 返回:
  51. dict: PG_CONFIG 数据库连接配置
  52. """
  53. # 默认配置
  54. default_pg_config = {
  55. "host": "localhost",
  56. "port": 5432,
  57. "user": "postgres",
  58. "password": "postgres",
  59. "database": "dataops"
  60. }
  61. try:
  62. config = __import__('config')
  63. logger.info("从config模块直接导入配置")
  64. pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
  65. return pg_config
  66. except ImportError:
  67. logger.warning("未找到 config.py,使用默认数据库配置")
  68. return default_pg_config
  69. # 导入配置
  70. PG_CONFIG = get_config()
  71. logger.info(f"配置加载完成: 数据库连接={PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}")
  72. def get_pg_conn():
  73. """获取PostgreSQL连接"""
  74. return psycopg2.connect(**PG_CONFIG)
  75. def get_python_script(target_table, script_name):
  76. """
  77. 从data_transform_scripts表中获取Python脚本内容和目标日期列
  78. 参数:
  79. target_table (str): 目标表名
  80. script_name (str): 脚本名称
  81. 返回:
  82. tuple: (script_content, target_dt_column) 脚本内容和目标日期列
  83. """
  84. logger.info(f"加载Python脚本: target_table={target_table}, script_name={script_name}")
  85. conn = None
  86. cursor = None
  87. try:
  88. conn = get_pg_conn()
  89. cursor = conn.cursor()
  90. query = """
  91. SELECT script_content, target_dt_column
  92. FROM data_transform_scripts
  93. WHERE target_table = %s AND script_name = %s LIMIT 1
  94. """
  95. logger.info(f"执行SQL查询: {query}")
  96. logger.info(f"查询参数: target_table={target_table}, script_name={script_name}")
  97. cursor.execute(query, (target_table, script_name))
  98. result = cursor.fetchone()
  99. if result is None:
  100. logger.error(f"未找到目标表 '{target_table}' 和脚本名 '{script_name}' 对应的脚本")
  101. return None, None
  102. # 获取脚本内容和目标日期列
  103. script_content = result[0]
  104. target_dt_column = result[1] if len(result) > 1 else None
  105. # 记录结果
  106. logger.info(f"目标日期列: {target_dt_column if target_dt_column else '未设置'}")
  107. # 记录脚本内容,但可能很长,只记录前500个字符和后100个字符
  108. if len(script_content) > 600:
  109. logger.info(f"成功获取脚本内容,总长度: {len(script_content)}字符")
  110. logger.info(f"脚本内容前500字符: \n{script_content[:500]}")
  111. else:
  112. logger.info(f"成功获取脚本内容,内容如下: \n{script_content}")
  113. return script_content, target_dt_column
  114. except Exception as e:
  115. logger.error(f"查询脚本出错: {str(e)}", exc_info=True)
  116. return None, None
  117. finally:
  118. if cursor:
  119. cursor.close()
  120. if conn:
  121. conn.close()
  122. def execute_sql(sql, params=None):
  123. """
  124. 执行SQL语句
  125. 参数:
  126. sql (str): SQL语句
  127. params (dict, optional): SQL参数
  128. 返回:
  129. tuple: (成功标志, 影响的行数或结果)
  130. """
  131. conn = None
  132. cursor = None
  133. try:
  134. conn = get_pg_conn()
  135. cursor = conn.cursor()
  136. # 记录SQL(不包含敏感参数)
  137. # 由于SQL可能很长,只记录前200个字符
  138. if len(sql) > 200:
  139. logger.info(f"执行SQL (前200字符): {sql[:200]}...")
  140. else:
  141. logger.info(f"执行SQL: {sql}")
  142. if params:
  143. logger.info(f"SQL参数: {params}")
  144. # 执行SQL
  145. cursor.execute(sql, params)
  146. # 获取影响的行数
  147. if cursor.rowcount >= 0:
  148. affected_rows = cursor.rowcount
  149. logger.info(f"SQL执行成功,影响了 {affected_rows} 行")
  150. else:
  151. affected_rows = 0
  152. logger.info("SQL执行成功,但无法确定影响的行数")
  153. # 如果是SELECT语句,获取结果
  154. if sql.strip().upper().startswith("SELECT"):
  155. result = cursor.fetchall()
  156. logger.info(f"查询返回 {len(result)} 行结果")
  157. conn.commit()
  158. return True, {"affected_rows": affected_rows, "result": result}
  159. else:
  160. # 对于非SELECT语句,提交事务
  161. conn.commit()
  162. return True, {"affected_rows": affected_rows}
  163. except Exception as e:
  164. logger.error(f"执行SQL时出错: {str(e)}", exc_info=True)
  165. if conn:
  166. conn.rollback()
  167. return False, {"error": str(e)}
  168. finally:
  169. if cursor:
  170. cursor.close()
  171. if conn:
  172. conn.close()
  173. def run(script_type=None, target_table=None, script_name=None, exec_date=None, frequency=None, **kwargs):
  174. """
  175. 执行Python脚本主入口函数
  176. 参数:
  177. script_type (str): 脚本类型,必须为'python'
  178. target_table (str): 目标表名
  179. script_name (str): 脚本名称
  180. exec_date (str): 执行日期,格式为YYYY-MM-DD
  181. frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
  182. **kwargs: 其他参数
  183. 返回:
  184. bool: 是否执行成功
  185. """
  186. # 记录开始执行的时间
  187. start_time = datetime.now()
  188. logger.info("===== 开始执行 Python 脚本 =====")
  189. logger.info(f"脚本类型: {script_type}")
  190. logger.info(f"目标表: {target_table}")
  191. logger.info(f"脚本名称: {script_name}")
  192. logger.info(f"执行日期: {exec_date}")
  193. logger.info(f"频率: {frequency}")
  194. # 记录其他参数
  195. for key, value in kwargs.items():
  196. logger.info(f"其他参数 - {key}: {value}")
  197. # 验证必要参数
  198. if not script_type or script_type.lower() != 'python':
  199. logger.error(f"脚本类型必须为'python',当前为: {script_type}")
  200. return False
  201. if not target_table:
  202. logger.error("未提供目标表名")
  203. return False
  204. if not script_name:
  205. logger.error("未提供脚本名称")
  206. return False
  207. if not exec_date:
  208. logger.error("未提供执行日期")
  209. return False
  210. if not frequency:
  211. logger.error("未提供频率")
  212. return False
  213. try:
  214. # 获取Python脚本和目标日期列
  215. script_code, target_dt_column = get_python_script(target_table, script_name)
  216. if not script_code:
  217. logger.error("未获取到 Python 脚本内容")
  218. return False
  219. logger.info(f"成功获取脚本内容,长度: {len(script_code)} 字符")
  220. # 日期计算
  221. try:
  222. start_date, end_date = script_utils.get_date_range(exec_date, frequency)
  223. logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
  224. except Exception as date_err:
  225. logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
  226. return False
  227. # 检查是否开启ETL幂等性
  228. target_table_label = kwargs.get('target_table_label', '')
  229. script_exec_mode = kwargs.get('execution_mode', 'append') # 默认为append
  230. logger.info(f"脚本执行模式: {script_exec_mode}")
  231. # 导入config模块获取幂等性开关
  232. try:
  233. config = __import__('config')
  234. enable_idempotency = getattr(config, 'ENABLE_ETL_IDEMPOTENCY', False)
  235. except ImportError:
  236. logger.warning("无法导入config模块获取幂等性开关,默认为False")
  237. enable_idempotency = False
  238. logger.info(f"ETL幂等性开关状态: {enable_idempotency}")
  239. logger.info(f"目标表标签: {target_table_label}")
  240. # 如果开启了ETL幂等性处理
  241. if enable_idempotency:
  242. # 处理append模式
  243. if script_exec_mode.lower() == 'append':
  244. logger.info("当前为append模式,开始考虑ETL幂等性处理")
  245. # 检查是否有目标日期列
  246. if target_dt_column:
  247. logger.info(f"找到目标日期列 {target_dt_column},将进行数据清理")
  248. # 生成DELETE语句
  249. delete_sql = f"""DELETE FROM {target_table}
  250. WHERE {target_dt_column} >= '{start_date}'
  251. AND {target_dt_column} < '{end_date}';"""
  252. logger.info(f"生成的DELETE语句: {delete_sql}")
  253. # 执行DELETE SQL
  254. logger.info("执行清理SQL以实现幂等性")
  255. delete_success, delete_result = execute_sql(delete_sql)
  256. if delete_success:
  257. if isinstance(delete_result, dict) and "affected_rows" in delete_result:
  258. logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
  259. else:
  260. logger.info("清理SQL执行成功")
  261. else:
  262. logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
  263. # 继续执行原始Python脚本
  264. logger.warning("继续执行原始Python脚本")
  265. else:
  266. logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
  267. logger.warning("将直接执行原始Python脚本,可能导致数据重复")
  268. # 处理full_refresh模式
  269. elif script_exec_mode.lower() == 'full_refresh':
  270. logger.info("当前为full_refresh模式,将执行TRUNCATE操作")
  271. # 构建TRUNCATE语句
  272. truncate_sql = f"TRUNCATE TABLE {target_table};"
  273. logger.info(f"生成的TRUNCATE SQL: {truncate_sql}")
  274. # 执行TRUNCATE操作
  275. truncate_success, truncate_result = execute_sql(truncate_sql)
  276. if truncate_success:
  277. logger.info(f"TRUNCATE TABLE {target_table} 执行成功,表已清空")
  278. else:
  279. error_msg = truncate_result.get("error", "未知错误")
  280. logger.error(f"TRUNCATE TABLE执行失败: {error_msg}")
  281. # 继续执行原始Python脚本
  282. logger.warning("TRUNCATE失败,继续执行原始Python脚本")
  283. else:
  284. logger.info(f"当前执行模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
  285. else:
  286. logger.info("未开启ETL幂等性,直接执行Python脚本")
  287. # 准备执行上下文
  288. conn = get_pg_conn() # 在外层先拿到连接
  289. exec_globals = {
  290. "conn": conn, # 提供数据库连接对象
  291. "execute_sql": execute_sql, # 提供SQL执行方法
  292. "datetime": datetime, # 提供datetime模块
  293. "psycopg2": psycopg2, # 提供psycopg2模块
  294. "os": os, # 提供os模块
  295. "sys": sys # 提供sys模块
  296. }
  297. exec_locals = {
  298. "start_date": start_date,
  299. "end_date": end_date,
  300. "logger": logger,
  301. "target_table": target_table,
  302. **kwargs
  303. }
  304. # 安全执行Python片段
  305. try:
  306. # 开始执行 Python 片段...
  307. logger.info("开始执行 Python 片段...")
  308. # 执行脚本 - psycopg2自动开始事务
  309. exec(textwrap.dedent(script_code), exec_globals, exec_locals)
  310. # 检查事务是否仍然处于活动状态
  311. # psycopg2没有_in_transaction属性,使用status代替
  312. if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
  313. conn.commit()
  314. logger.info("在外层提交未完成的事务")
  315. # 记录执行时间
  316. end_time = datetime.now()
  317. duration = (end_time - start_time).total_seconds()
  318. logger.info("Python 脚本执行成功")
  319. logger.info(f"===== Python脚本执行完成 (成功) =====")
  320. logger.info(f"总耗时: {duration:.2f}秒")
  321. return True
  322. except Exception as e:
  323. # 检查事务状态进行回滚
  324. if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
  325. conn.rollback()
  326. logger.info("事务已回滚")
  327. # 记录错误信息
  328. end_time = datetime.now()
  329. duration = (end_time - start_time).total_seconds()
  330. logger.error(f"执行 Python 脚本失败: {str(e)}", exc_info=True)
  331. logger.info(f"===== Python脚本执行完成 (失败) =====")
  332. logger.info(f"总耗时: {duration:.2f}秒")
  333. # 将异常重新抛出,让Airflow知道任务失败
  334. raise AirflowException(f"Python脚本执行失败: {str(e)}")
  335. finally:
  336. # 安全关闭连接
  337. conn.close()
  338. logger.info("数据库连接已关闭")
  339. except Exception as e:
  340. # 捕获所有未处理的异常
  341. logger.error(f"执行Python脚本时发生未预期的错误: {str(e)}", exc_info=True)
  342. # 抛出异常,让Airflow知道任务失败
  343. raise AirflowException(f"执行Python脚本时发生未预期的错误: {str(e)}")
  344. if __name__ == "__main__":
  345. import argparse
  346. parser = argparse.ArgumentParser(description='执行Python脚本片段')
  347. parser.add_argument('--target-table', type=str, required=True, help='目标表名')
  348. parser.add_argument('--script-name', type=str, required=True, help='脚本名称')
  349. parser.add_argument('--exec-date', type=str, required=True, help='执行日期 (YYYY-MM-DD)')
  350. parser.add_argument('--frequency', type=str, required=True,
  351. choices=['daily', 'weekly', 'monthly', 'quarterly', 'yearly'],
  352. help='频率: daily, weekly, monthly, quarterly, yearly')
  353. parser.add_argument('--execution-mode', type=str, default='append',
  354. choices=['append', 'full_refresh'],
  355. help='执行模式: append(追加), full_refresh(全量刷新)')
  356. args = parser.parse_args()
  357. run_kwargs = {
  358. "script_type": "python",
  359. "target_table": args.target_table,
  360. "script_name": args.script_name,
  361. "exec_date": args.exec_date,
  362. "frequency": args.frequency,
  363. "execution_mode": args.execution_mode
  364. }
  365. logger.info("命令行测试执行参数: " + str(run_kwargs))
  366. try:
  367. success = run(**run_kwargs)
  368. if success:
  369. logger.info("Python脚本执行成功")
  370. sys.exit(0)
  371. else:
  372. logger.error("Python脚本执行失败")
  373. sys.exit(1)
  374. except Exception as e:
  375. logger.error(f"执行失败: {str(e)}")
  376. sys.exit(1)