execution_python.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import sys
  4. import os
  5. import logging
  6. from datetime import datetime, timedelta
  7. import psycopg2
  8. import textwrap
  9. from airflow.exceptions import AirflowException
  10. import importlib
  11. import time
  12. import traceback
  13. import re
  14. from script_utils import get_config_param
  15. SCHEDULE_TABLE_SCHEMA = get_config_param("SCHEDULE_TABLE_SCHEMA")
  16. # 配置日志
  17. logging.basicConfig(
  18. level=logging.INFO,
  19. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  20. handlers=[logging.StreamHandler(sys.stdout)]
  21. )
  22. logger = logging.getLogger("execution_python")
  23. # 明确地将当前目录添加到Python模块搜索路径的最前面,确保优先从当前目录导入
  24. current_dir = os.path.dirname(os.path.abspath(__file__))
  25. if current_dir not in sys.path:
  26. sys.path.insert(0, current_dir)
  27. logger.info(f"将当前目录添加到Python模块搜索路径: {current_dir}")
  28. # 直接尝试导入script_utils
  29. try:
  30. import script_utils
  31. from script_utils import get_pg_config
  32. logger.info(f"成功导入script_utils模块,模块路径: {script_utils.__file__}")
  33. except ImportError as e:
  34. logger.error(f"导入script_utils模块失败: {str(e)}")
  35. logger.error(f"当前Python模块搜索路径: {sys.path}")
  36. logger.error(f"当前目录内容: {os.listdir(current_dir)}")
  37. raise ImportError(f"无法导入script_utils模块,请确保script_utils.py在当前目录下")
  38. # 使用script_utils中的方法获取配置
  39. try:
  40. # 获取PostgreSQL配置
  41. PG_CONFIG = get_pg_config()
  42. logger.info(f"通过script_utils.get_pg_config()获取PostgreSQL配置成功")
  43. except Exception as e:
  44. logger.error(f"获取配置失败,使用默认值: {str(e)}")
  45. # 默认配置
  46. PG_CONFIG = {
  47. "host": "localhost",
  48. "port": 5432,
  49. "user": "postgres",
  50. "password": "postgres",
  51. "database": "dataops"
  52. }
  53. def get_pg_conn():
  54. """获取PostgreSQL连接"""
  55. return psycopg2.connect(**PG_CONFIG)
  56. def get_python_script(target_table, script_name):
  57. """
  58. 从data_transform_scripts表中获取Python脚本内容和目标日期列
  59. 参数:
  60. target_table (str): 目标表名
  61. script_name (str): 脚本名称
  62. 返回:
  63. tuple: (script_content, target_dt_column) 脚本内容和目标日期列
  64. """
  65. logger.info(f"加载Python脚本: target_table={target_table}, script_name={script_name}")
  66. conn = None
  67. cursor = None
  68. try:
  69. conn = get_pg_conn()
  70. cursor = conn.cursor()
  71. query = """
  72. SELECT script_content, target_dt_column
  73. FROM {}.data_transform_scripts
  74. WHERE target_table = %s AND script_name = %s LIMIT 1
  75. """.format(SCHEDULE_TABLE_SCHEMA)
  76. logger.info(f"执行SQL查询: {query}")
  77. logger.info(f"查询参数: target_table={target_table}, script_name={script_name}")
  78. cursor.execute(query, (target_table, script_name))
  79. result = cursor.fetchone()
  80. if result is None:
  81. logger.error(f"未找到目标表 '{target_table}' 和脚本名 '{script_name}' 对应的脚本")
  82. return None, None
  83. # 获取脚本内容和目标日期列
  84. script_content = result[0]
  85. target_dt_column = result[1] if len(result) > 1 else None
  86. # 记录结果
  87. logger.info(f"目标日期列: {target_dt_column if target_dt_column else '未设置'}")
  88. # 记录脚本内容,但可能很长,只记录前500个字符和后100个字符
  89. if len(script_content) > 600:
  90. logger.info(f"成功获取脚本内容,总长度: {len(script_content)}字符")
  91. logger.info(f"脚本内容前500字符: \n{script_content[:500]}")
  92. else:
  93. logger.info(f"成功获取脚本内容,内容如下: \n{script_content}")
  94. return script_content, target_dt_column
  95. except Exception as e:
  96. logger.error(f"查询脚本出错: {str(e)}", exc_info=True)
  97. return None, None
  98. finally:
  99. if cursor:
  100. cursor.close()
  101. if conn:
  102. conn.close()
  103. def execute_sql(sql, params=None):
  104. """
  105. 执行SQL语句
  106. 参数:
  107. sql (str): SQL语句
  108. params (dict, optional): SQL参数
  109. 返回:
  110. tuple: (成功标志, 影响的行数或结果)
  111. """
  112. conn = None
  113. cursor = None
  114. try:
  115. conn = get_pg_conn()
  116. cursor = conn.cursor()
  117. # 记录SQL(不包含敏感参数)
  118. # 由于SQL可能很长,只记录前200个字符
  119. if len(sql) > 200:
  120. logger.info(f"执行SQL (前200字符): {sql[:200]}...")
  121. else:
  122. logger.info(f"执行SQL: {sql}")
  123. if params:
  124. logger.info(f"SQL参数: {params}")
  125. # 执行SQL
  126. cursor.execute(sql, params)
  127. # 获取影响的行数
  128. if cursor.rowcount >= 0:
  129. affected_rows = cursor.rowcount
  130. logger.info(f"SQL执行成功,影响了 {affected_rows} 行")
  131. else:
  132. affected_rows = 0
  133. logger.info("SQL执行成功,但无法确定影响的行数")
  134. # 如果是SELECT语句,获取结果
  135. if sql.strip().upper().startswith("SELECT"):
  136. result = cursor.fetchall()
  137. logger.info(f"查询返回 {len(result)} 行结果")
  138. conn.commit()
  139. return True, {"affected_rows": affected_rows, "result": result}
  140. else:
  141. # 对于非SELECT语句,提交事务
  142. conn.commit()
  143. return True, {"affected_rows": affected_rows}
  144. except Exception as e:
  145. logger.error(f"执行SQL时出错: {str(e)}", exc_info=True)
  146. if conn:
  147. conn.rollback()
  148. return False, {"error": str(e)}
  149. finally:
  150. if cursor:
  151. cursor.close()
  152. if conn:
  153. conn.close()
  154. def run(script_type=None, target_table=None, script_name=None, exec_date=None, schedule_frequency=None, **kwargs):
  155. """
  156. 执行Python脚本主入口函数
  157. 参数:
  158. script_type (str): 脚本类型,必须为'python'
  159. target_table (str): 目标表名
  160. script_name (str): 脚本名称
  161. exec_date (str): 执行日期,格式为YYYY-MM-DD
  162. schedule_frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
  163. **kwargs: 其他参数
  164. 返回:
  165. bool: 是否执行成功
  166. """
  167. # 记录开始执行的时间
  168. start_time = datetime.now()
  169. logger.info("===== 开始执行 Python 脚本 =====")
  170. logger.info(f"脚本类型: {script_type}")
  171. logger.info(f"目标表: {target_table}")
  172. logger.info(f"脚本名称: {script_name}")
  173. logger.info(f"执行日期: {exec_date}")
  174. logger.info(f"频率: {schedule_frequency}")
  175. # 记录其他参数
  176. for key, value in kwargs.items():
  177. logger.info(f"其他参数 - {key}: {value}")
  178. # 验证必要参数
  179. if not script_type or script_type.lower() != 'python':
  180. logger.error(f"脚本类型必须为'python',当前为: {script_type}")
  181. return False
  182. if not target_table:
  183. logger.error("未提供目标表名")
  184. return False
  185. if not script_name:
  186. logger.error("未提供脚本名称")
  187. return False
  188. if not exec_date:
  189. logger.error("未提供执行日期")
  190. return False
  191. if not schedule_frequency:
  192. logger.error("未提供频率")
  193. return False
  194. try:
  195. # 获取Python脚本和目标日期列
  196. script_code, target_dt_column = get_python_script(target_table, script_name)
  197. if not script_code:
  198. logger.error("未获取到 Python 脚本内容")
  199. return False
  200. logger.info(f"成功获取脚本内容,长度: {len(script_code)} 字符")
  201. # 日期计算
  202. # try:
  203. # # 直接使用script_utils.get_date_range计算日期范围
  204. # logger.info(f"使用script_utils.get_date_range计算日期范围,参数: exec_date={exec_date}, frequency={schedule_frequency}")
  205. # start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
  206. # logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
  207. # except Exception as date_err:
  208. # logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
  209. # # 使用简单的默认日期范围计算
  210. # date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
  211. # if schedule_frequency.lower() == 'daily':
  212. # start_date = date_obj.strftime('%Y-%m-%d')
  213. # end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
  214. # else:
  215. # # 对其他频率使用默认范围
  216. # start_date = exec_date
  217. # end_date = (date_obj + timedelta(days=30)).strftime('%Y-%m-%d')
  218. # logger.warning(f"使用默认日期范围计算: start_date={start_date}, end_date={end_date}")
  219. # 检查是否开启ETL幂等性
  220. target_table_label = kwargs.get('target_table_label', '')
  221. script_exec_mode = kwargs.get('update_mode', 'append') # 只使用update_mode
  222. is_manual_dag_trigger = kwargs.get('is_manual_dag_trigger', False)
  223. logger.info(f"脚本更新模式: {script_exec_mode}")
  224. logger.info(f"是否手工DAG触发: {is_manual_dag_trigger}")
  225. # 导入config模块获取幂等性开关
  226. try:
  227. config = __import__('config')
  228. enable_idempotency = getattr(config, 'ENABLE_ETL_IDEMPOTENCY', False)
  229. except ImportError:
  230. logger.warning("无法导入config模块获取幂等性开关,默认为False")
  231. enable_idempotency = False
  232. logger.info(f"ETL幂等性开关状态: {enable_idempotency}")
  233. logger.info(f"目标表标签: {target_table_label}")
  234. # 如果开启了ETL幂等性处理,并且是由手工dag触发的。
  235. if enable_idempotency:
  236. # 处理append模式
  237. if script_exec_mode.lower() == 'append':
  238. logger.info("当前为append模式,且ETL幂等性开关被打开")
  239. if is_manual_dag_trigger:
  240. logger.info("manual dag被手工触发")
  241. # 检查是否有目标日期列
  242. logger.info(f"使用script_utils.get_date_range计算日期范围,参数: exec_date={exec_date}, frequency={schedule_frequency}")
  243. start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
  244. logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
  245. if target_dt_column:
  246. logger.info(f"找到目标日期列 {target_dt_column},将进行数据清理")
  247. # 生成DELETE语句
  248. delete_sql = f"""DELETE FROM {target_table}
  249. WHERE {target_dt_column} >= '{start_date}'
  250. AND {target_dt_column} < '{end_date}';"""
  251. logger.info(f"生成的DELETE语句: {delete_sql}")
  252. # 执行DELETE SQL
  253. logger.info("执行清理SQL以实现幂等性")
  254. delete_success, delete_result = execute_sql(delete_sql)
  255. if delete_success:
  256. if isinstance(delete_result, dict) and "affected_rows" in delete_result:
  257. logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
  258. else:
  259. logger.info("清理SQL执行成功")
  260. else:
  261. logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
  262. # 继续执行原始Python脚本
  263. logger.warning("继续执行原始Python脚本")
  264. else:
  265. logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
  266. logger.warning("将直接执行原始Python脚本,可能导致数据重复")
  267. else:
  268. logger.info(f"不是manual dag手工触发,对目标表 {target_table} create_time 进行数据清理")
  269. start_date, end_date = script_utils.get_one_day_range(exec_date)
  270. # 生成DELETE语句
  271. delete_sql = f"""DELETE FROM {target_table}
  272. WHERE create_time >= '{start_date}'
  273. AND create_time < '{end_date}';"""
  274. logger.info(f"生成的DELETE语句: {delete_sql}")
  275. # 执行DELETE SQL
  276. logger.info("基于create_time执行清理SQL以实现幂等性")
  277. delete_success, delete_result = execute_sql(delete_sql)
  278. if delete_success:
  279. if isinstance(delete_result, dict) and "affected_rows" in delete_result:
  280. logger.info(f"基于create_time,清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
  281. else:
  282. logger.info("基于create_time,清理SQL执行成功")
  283. else:
  284. logger.error(f"基于create_time,清理SQL执行失败: {delete_result.get('error', '未知错误')}")
  285. # 继续执行原始Python脚本
  286. logger.warning("继续执行原始Python脚本")
  287. # 处理full_refresh模式
  288. elif script_exec_mode.lower() == 'full_refresh':
  289. logger.info("当前为full_refresh模式,将执行TRUNCATE操作")
  290. # 构建TRUNCATE语句
  291. truncate_sql = f"TRUNCATE TABLE {target_table};"
  292. logger.info(f"生成的TRUNCATE SQL: {truncate_sql}")
  293. # 执行TRUNCATE操作
  294. truncate_success, truncate_result = execute_sql(truncate_sql)
  295. if truncate_success:
  296. logger.info(f"TRUNCATE TABLE {target_table} 执行成功,表已清空")
  297. else:
  298. error_msg = truncate_result.get("error", "未知错误")
  299. logger.error(f"TRUNCATE TABLE执行失败: {error_msg}")
  300. # 继续执行原始Python脚本
  301. logger.warning("TRUNCATE失败,继续执行原始Python脚本")
  302. else:
  303. logger.info(f"当前更新模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
  304. else:
  305. logger.info("未开启ETL幂等性,直接执行Python脚本")
  306. # 准备执行上下文
  307. conn = get_pg_conn() # 在外层先拿到连接
  308. exec_globals = {
  309. "conn": conn, # 提供数据库连接对象
  310. "execute_sql": execute_sql, # 提供SQL执行方法
  311. "datetime": datetime, # 提供datetime模块
  312. "psycopg2": psycopg2, # 提供psycopg2模块
  313. "os": os, # 提供os模块
  314. "sys": sys # 提供sys模块
  315. }
  316. exec_locals = {
  317. "start_date": start_date,
  318. "end_date": end_date,
  319. "logger": logger,
  320. "target_table": target_table,
  321. **kwargs
  322. }
  323. # 安全执行Python片段
  324. try:
  325. # 开始执行 Python 片段...
  326. logger.info("开始执行 Python 片段...")
  327. # 执行脚本 - psycopg2自动开始事务
  328. exec(textwrap.dedent(script_code), exec_globals, exec_locals)
  329. # 检查事务是否仍然处于活动状态
  330. # psycopg2没有_in_transaction属性,使用status代替
  331. if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
  332. conn.commit()
  333. logger.info("在外层提交未完成的事务")
  334. # 记录执行时间
  335. end_time = datetime.now()
  336. duration = (end_time - start_time).total_seconds()
  337. logger.info("Python 脚本执行成功")
  338. logger.info(f"===== Python脚本执行完成 (成功) =====")
  339. logger.info(f"总耗时: {duration:.2f}秒")
  340. return True
  341. except Exception as e:
  342. # 检查事务状态进行回滚
  343. if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
  344. conn.rollback()
  345. logger.info("事务已回滚")
  346. # 记录错误信息
  347. end_time = datetime.now()
  348. duration = (end_time - start_time).total_seconds()
  349. logger.error(f"执行 Python 脚本失败: {str(e)}", exc_info=True)
  350. logger.info(f"===== Python脚本执行完成 (失败) =====")
  351. logger.info(f"总耗时: {duration:.2f}秒")
  352. # 将异常重新抛出,让Airflow知道任务失败
  353. raise AirflowException(f"Python脚本执行失败: {str(e)}")
  354. finally:
  355. # 安全关闭连接
  356. conn.close()
  357. logger.info("数据库连接已关闭")
  358. except Exception as e:
  359. # 捕获所有未处理的异常
  360. logger.error(f"执行Python脚本时发生未预期的错误: {str(e)}", exc_info=True)
  361. # 抛出异常,让Airflow知道任务失败
  362. raise AirflowException(f"执行Python脚本时发生未预期的错误: {str(e)}")
  363. if __name__ == "__main__":
  364. import argparse
  365. parser = argparse.ArgumentParser(description='执行Python脚本')
  366. parser.add_argument('--target-table', type=str, required=True, help='目标表名')
  367. parser.add_argument('--script-name', type=str, required=True, help='脚本名称')
  368. parser.add_argument('--exec-date', type=str, required=True, help='执行日期 (YYYY-MM-DD)')
  369. parser.add_argument('--schedule_frequency', type=str, required=True,
  370. choices=['daily', 'weekly', 'monthly', 'quarterly', 'yearly'],
  371. help='频率: daily, weekly, monthly, quarterly, yearly')
  372. parser.add_argument('--update-mode', type=str, default='append',
  373. choices=['append', 'full_refresh'],
  374. help='更新模式: append(追加), full_refresh(全量刷新)')
  375. args = parser.parse_args()
  376. run_kwargs = {
  377. "script_type": "python",
  378. "target_table": args.target_table,
  379. "script_name": args.script_name,
  380. "exec_date": args.exec_date,
  381. "schedule_frequency": args.schedule_frequency,
  382. "update_mode": args.update_mode
  383. }
  384. logger.info("命令行测试执行参数: " + str(run_kwargs))
  385. try:
  386. success = run(**run_kwargs)
  387. if success:
  388. logger.info("Python脚本执行成功")
  389. sys.exit(0)
  390. else:
  391. logger.error("Python脚本执行失败")
  392. sys.exit(1)
  393. except Exception as e:
  394. logger.error(f"执行失败: {str(e)}")
  395. sys.exit(1)