execution_sql.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime
  7. import psycopg2
  8. import jinja2
  9. # 修改导入方式,避免使用相对导入
  10. import sys
  11. import os
  12. import traceback
  13. import time
  14. from datetime import datetime, timedelta
  15. import pytz
  16. import re
  17. from script_utils import get_config_param
  18. SCHEDULE_TABLE_SCHEMA = get_config_param("SCHEDULE_TABLE_SCHEMA")
  19. # 配置日志记录器
  20. logging.basicConfig(
  21. level=logging.INFO,
  22. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  23. handlers=[
  24. logging.StreamHandler(sys.stdout)
  25. ]
  26. )
  27. logger = logging.getLogger("execution_sql")
  28. # 将同级目录加入到Python搜索路径
  29. current_dir = os.path.dirname(os.path.abspath(__file__))
  30. if current_dir not in sys.path:
  31. sys.path.append(current_dir)
  32. # 尝试导入script_utils
  33. try:
  34. import script_utils
  35. from script_utils import get_pg_config, logger as utils_logger
  36. logger.info("成功导入script_utils模块的get_pg_config方法")
  37. except ImportError as e:
  38. logger.error(f"无法直接导入script_utils方法: {str(e)}")
  39. # 尝试备用方法1:完整路径导入
  40. # try:
  41. # sys.path.append(os.path.dirname(current_dir)) # 添加父目录
  42. # from dataops_scripts.script_utils import get_pg_config
  43. # logger.info("使用完整路径成功导入script_utils模块的方法")
  44. # except ImportError as e2:
  45. # logger.error(f"使用完整路径导入失败: {str(e2)}")
  46. # # 尝试备用方法2:动态导入
  47. # try:
  48. # import importlib.util
  49. # script_utils_path = os.path.join(current_dir, "script_utils.py")
  50. # logger.info(f"尝试从路径动态导入: {script_utils_path}")
  51. # spec = importlib.util.spec_from_file_location("script_utils", script_utils_path)
  52. # script_utils = importlib.util.module_from_spec(spec)
  53. # spec.loader.exec_module(script_utils)
  54. # get_pg_config = script_utils.get_pg_config
  55. # logger.info("通过动态导入成功加载script_utils模块的方法")
  56. # except Exception as e3:
  57. # logger.error(f"动态导入也失败: {str(e3)}")
  58. # raise ImportError(f"无法导入script_utils模块的方法,所有方法都失败")
  59. # 使用script_utils中的方法获取配置
  60. try:
  61. # 获取PostgreSQL配置
  62. PG_CONFIG = get_pg_config()
  63. logger.info(f"通过script_utils.get_pg_config()获取PostgreSQL配置成功")
  64. except Exception as e:
  65. logger.error(f"获取配置失败,使用默认值: {str(e)}")
  66. # 默认配置
  67. PG_CONFIG = {
  68. "host": "localhost",
  69. "port": 5432,
  70. "user": "postgres",
  71. "password": "postgres",
  72. "database": "dataops",
  73. }
  74. def get_pg_conn():
  75. """获取PostgreSQL连接"""
  76. return psycopg2.connect(**PG_CONFIG)
  77. def get_script_content(target_table, script_name):
  78. """
  79. 从data_transform_scripts表中获取脚本内容和目标日期列
  80. 参数:
  81. target_table (str): 目标表名
  82. script_name (str): 脚本名称
  83. 返回:
  84. tuple: (script_content, target_dt_column) 脚本内容和目标日期列
  85. """
  86. logger.info(f"正在从data_transform_scripts表获取脚本内容,目标表: {target_table},脚本名称: {script_name}")
  87. conn = None
  88. cursor = None
  89. try:
  90. conn = get_pg_conn()
  91. cursor = conn.cursor()
  92. query = """
  93. SELECT script_content, target_dt_column
  94. FROM {}.data_transform_scripts
  95. WHERE target_table = %s AND script_name = %s
  96. LIMIT 1
  97. """.format(SCHEDULE_TABLE_SCHEMA)
  98. logger.info(f"执行SQL查询: {query}")
  99. logger.info(f"查询参数: target_table={target_table}, script_name={script_name}")
  100. cursor.execute(query, (target_table, script_name))
  101. result = cursor.fetchone()
  102. if result is None:
  103. logger.error(f"未找到目标表 '{target_table}' 和脚本名 '{script_name}' 对应的脚本")
  104. return None, None
  105. # 获取脚本内容和目标日期列
  106. script_content = result[0]
  107. target_dt_column = result[1] if len(result) > 1 else None
  108. # 记录结果
  109. logger.info(f"目标日期列: {target_dt_column if target_dt_column else '未设置'}")
  110. # 记录脚本内容,但可能很长,只记录前500个字符和后100个字符
  111. if len(script_content) > 600:
  112. logger.info(f"成功获取脚本内容,总长度: {len(script_content)}字符")
  113. logger.info(f"脚本内容前500字符: \n{script_content[:500]}")
  114. logger.info(f"脚本内容后100字符: \n{script_content[-100:]}")
  115. else:
  116. logger.info(f"成功获取脚本内容,内容如下: \n{script_content}")
  117. return script_content, target_dt_column
  118. except Exception as e:
  119. logger.error(f"获取脚本内容时出错: {str(e)}", exc_info=True)
  120. return None, None
  121. finally:
  122. if cursor:
  123. cursor.close()
  124. if conn:
  125. conn.close()
  126. def execute_sql(sql, params=None):
  127. """
  128. 执行SQL语句
  129. 参数:
  130. sql (str): SQL语句
  131. params (dict, optional): SQL参数
  132. 返回:
  133. tuple: (成功标志, 影响的行数或结果)
  134. """
  135. conn = None
  136. cursor = None
  137. try:
  138. conn = get_pg_conn()
  139. cursor = conn.cursor()
  140. # 记录SQL(不包含敏感参数)
  141. # 由于SQL可能很长,只记录前200个字符
  142. if len(sql) > 200:
  143. logger.info(f"执行SQL (前200字符): {sql[:200]}...")
  144. else:
  145. logger.info(f"执行SQL: {sql}")
  146. if params:
  147. logger.info(f"SQL参数: {params}")
  148. # 执行SQL
  149. cursor.execute(sql, params)
  150. # 获取影响的行数
  151. if cursor.rowcount >= 0:
  152. affected_rows = cursor.rowcount
  153. logger.info(f"SQL执行成功,影响了 {affected_rows} 行")
  154. else:
  155. affected_rows = 0
  156. logger.info("SQL执行成功,但无法确定影响的行数")
  157. # 如果是SELECT语句,获取结果
  158. if sql.strip().upper().startswith("SELECT"):
  159. result = cursor.fetchall()
  160. logger.info(f"查询返回 {len(result)} 行结果")
  161. conn.commit()
  162. return True, {"affected_rows": affected_rows, "result": result}
  163. else:
  164. # 对于非SELECT语句,提交事务
  165. conn.commit()
  166. return True, {"affected_rows": affected_rows}
  167. except Exception as e:
  168. logger.error(f"执行SQL时出错: {str(e)}", exc_info=True)
  169. if conn:
  170. conn.rollback()
  171. return False, {"error": str(e)}
  172. finally:
  173. if cursor:
  174. cursor.close()
  175. if conn:
  176. conn.close()
  177. def render_sql_template(sql_content, template_params):
  178. """
  179. 使用Jinja2渲染SQL模板
  180. 参数:
  181. sql_content (str): SQL模板内容
  182. template_params (dict): 模板参数
  183. 返回:
  184. str: 渲染后的SQL
  185. """
  186. try:
  187. # 创建Jinja2环境
  188. env = jinja2.Environment(
  189. autoescape=False,
  190. undefined=jinja2.StrictUndefined # 严格模式,未定义变量会抛出异常
  191. )
  192. # 创建模板并渲染
  193. template = env.from_string(sql_content)
  194. rendered_sql = template.render(**template_params)
  195. logger.info("SQL模板渲染成功")
  196. # 记录渲染后的SQL,只记录前200个字符和后100个字符
  197. if len(rendered_sql) > 300:
  198. logger.info(f"渲染后SQL前200字符: \n{rendered_sql[:200]}...")
  199. logger.info(f"渲染后SQL后100字符: \n...{rendered_sql[-100:]}")
  200. else:
  201. logger.info(f"渲染后SQL: \n{rendered_sql}")
  202. return rendered_sql
  203. except Exception as e:
  204. logger.error(f"渲染SQL模板时出错: {str(e)}", exc_info=True)
  205. raise
  206. def run(script_type=None, target_table=None, script_name=None, exec_date=None,
  207. schedule_frequency=None, **kwargs):
  208. """
  209. 执行SQL脚本主入口函数
  210. 参数:
  211. script_type (str): 脚本类型,必须为'sql'
  212. target_table (str): 目标表名
  213. script_name (str): 脚本名称
  214. exec_date (str): 执行日期,格式为YYYY-MM-DD
  215. schedule_frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
  216. **kwargs: 其他参数
  217. 返回:
  218. bool: 是否执行成功
  219. """
  220. # 记录开始执行的时间
  221. start_time = datetime.now()
  222. logger.info(f"===== 开始执行 SQL 脚本 =====")
  223. logger.info(f"脚本类型: {script_type}")
  224. logger.info(f"目标表: {target_table}")
  225. logger.info(f"脚本名称: {script_name}")
  226. logger.info(f"执行日期: {exec_date}")
  227. logger.info(f"频率: {schedule_frequency}")
  228. # 记录其他参数
  229. for key, value in kwargs.items():
  230. logger.info(f"其他参数 - {key}: {value}")
  231. # 验证必要参数
  232. if not script_type or script_type.lower() != 'sql':
  233. logger.error(f"脚本类型必须为'sql',当前为: {script_type}")
  234. return False
  235. if not target_table:
  236. logger.error("未提供目标表名")
  237. return False
  238. if not script_name:
  239. logger.error("未提供脚本名称")
  240. return False
  241. if not exec_date:
  242. logger.error("未提供执行日期")
  243. return False
  244. if not schedule_frequency:
  245. logger.error("未提供频率")
  246. return False
  247. try:
  248. # 获取脚本内容,直接从数据库获取,不检查文件是否存在
  249. sql_content, target_dt_column = get_script_content(target_table, script_name)
  250. if not sql_content:
  251. logger.error("无法获取脚本内容,执行失败")
  252. return False
  253. logger.info(f"成功获取脚本内容,长度: {len(sql_content)} 字符")
  254. # 检查是否开启ETL幂等性
  255. target_table_label = kwargs.get('target_table_label', '')
  256. script_exec_mode = kwargs.get('update_mode', 'append') # 只使用update_mode
  257. is_manual_dag_trigger = kwargs.get('is_manual_dag_trigger', False) # 新增:获取是否为手动触发的DAG
  258. logger.info(f"脚本更新模式: {script_exec_mode}")
  259. logger.info(f"是否手工DAG触发: {is_manual_dag_trigger}")
  260. # 导入config模块获取幂等性开关
  261. try:
  262. config = __import__('config')
  263. enable_idempotency = getattr(config, 'ENABLE_ETL_IDEMPOTENCY', False)
  264. except ImportError:
  265. logger.warning("无法导入config模块获取幂等性开关,默认为False")
  266. enable_idempotency = False
  267. logger.info(f"ETL幂等性开关状态: {enable_idempotency}")
  268. logger.info(f"目标表标签: {target_table_label}")
  269. # 如果开启了ETL幂等性且是SQL类型
  270. if script_type.lower() == 'sql' and enable_idempotency:
  271. # 处理append模式
  272. if script_exec_mode.lower() == 'append':
  273. logger.info("当前为append模式,开始考虑ETL幂等性处理")
  274. if is_manual_dag_trigger:
  275. logger.info("manual dag被手工触发")
  276. # 计算日期范围
  277. try:
  278. start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
  279. logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
  280. except Exception as date_err:
  281. logger.error(f"计算日期范围时出错: {str(date_err)}", exc_info=True)
  282. return False
  283. # 检查是否有目标日期列
  284. if target_dt_column:
  285. logger.info(f"找到目标日期列 {target_dt_column},将生成DELETE语句")
  286. # 生成DELETE语句
  287. delete_sql = f"""DELETE FROM {target_table}
  288. WHERE {target_dt_column} >= '{{{{ start_date }}}}'
  289. AND {target_dt_column} < '{{{{ end_date }}}}';"""
  290. logger.info(f"生成的DELETE语句: {delete_sql}")
  291. # 准备模板参数
  292. template_params = {
  293. 'start_date': start_date,
  294. 'end_date': end_date,
  295. # 可以添加更多默认参数
  296. }
  297. # 渲染DELETE SQL
  298. try:
  299. rendered_delete_sql = render_sql_template(delete_sql, template_params)
  300. logger.info("成功渲染清理SQL")
  301. except Exception as render_err:
  302. logger.error(f"渲染清理SQL时出错: {str(render_err)}", exc_info=True)
  303. # 即使清理SQL失败,仍然继续执行后续SQL
  304. logger.warning("继续执行原始SQL")
  305. else:
  306. # 执行DELETE SQL
  307. logger.info("执行清理SQL以实现幂等性")
  308. delete_success, delete_result = execute_sql(rendered_delete_sql)
  309. if delete_success:
  310. if isinstance(delete_result, dict) and "affected_rows" in delete_result:
  311. logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
  312. else:
  313. logger.info("清理SQL执行成功")
  314. else:
  315. logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
  316. # 继续执行原始SQL
  317. logger.warning("继续执行原始SQL")
  318. else:
  319. logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
  320. logger.warning("将直接执行原始SQL,可能导致数据重复")
  321. else:
  322. logger.info(f"不是manual dag手工触发,对目标表 {target_table} create_time 进行数据清理")
  323. # 获取一天范围的日期
  324. start_date, end_date = script_utils.get_one_day_range(exec_date)
  325. logger.info(f"使用一天范围的日期: start_date={start_date}, end_date={end_date}")
  326. # 生成基于create_time的DELETE语句
  327. delete_sql = f"""DELETE FROM {target_table}
  328. WHERE create_time >= '{start_date}'
  329. AND create_time < '{end_date}';"""
  330. logger.info(f"生成的基于create_time的DELETE语句: {delete_sql}")
  331. # 执行DELETE SQL
  332. logger.info("基于create_time执行清理SQL以实现幂等性")
  333. delete_success, delete_result = execute_sql(delete_sql)
  334. if delete_success:
  335. if isinstance(delete_result, dict) and "affected_rows" in delete_result:
  336. logger.info(f"基于create_time,清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
  337. else:
  338. logger.info("基于create_time,清理SQL执行成功")
  339. else:
  340. logger.error(f"基于create_time,清理SQL执行失败: {delete_result.get('error', '未知错误')}")
  341. # 继续执行原始SQL
  342. logger.warning("继续执行原始SQL")
  343. # 处理full_refresh模式
  344. elif script_exec_mode.lower() == 'full_refresh':
  345. logger.info("当前为full_refresh模式,将执行TRUNCATE操作")
  346. # 构建TRUNCATE语句
  347. truncate_sql = f"TRUNCATE TABLE {target_table};"
  348. logger.info(f"生成的TRUNCATE SQL: {truncate_sql}")
  349. # 执行TRUNCATE操作
  350. truncate_success, truncate_result = execute_sql(truncate_sql)
  351. if truncate_success:
  352. logger.info(f"TRUNCATE TABLE {target_table} 执行成功,表已清空")
  353. else:
  354. error_msg = truncate_result.get("error", "未知错误")
  355. logger.error(f"TRUNCATE TABLE执行失败: {error_msg}")
  356. # 继续执行原始SQL
  357. logger.warning("TRUNCATE失败,继续执行原始SQL")
  358. else:
  359. logger.info(f"当前更新模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
  360. else:
  361. logger.info("未满足ETL幂等性处理条件,直接执行原始SQL")
  362. # 计算日期范围(如果之前未计算)
  363. if 'start_date' not in locals() or 'end_date' not in locals():
  364. try:
  365. start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
  366. logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
  367. except Exception as date_err:
  368. logger.error(f"计算日期范围时出错: {str(date_err)}", exc_info=True)
  369. return False
  370. # 准备模板参数(如果之前未准备)
  371. if 'template_params' not in locals():
  372. template_params = {
  373. 'start_date': start_date,
  374. 'end_date': end_date,
  375. # 可以添加更多默认参数
  376. }
  377. # 渲染原始SQL模板
  378. try:
  379. rendered_sql = render_sql_template(sql_content, template_params)
  380. except Exception as render_err:
  381. logger.error(f"渲染SQL模板时出错: {str(render_err)}", exc_info=True)
  382. return False
  383. # 执行原始SQL
  384. success, result = execute_sql(rendered_sql)
  385. if success:
  386. # 记录执行结果
  387. if isinstance(result, dict):
  388. if "affected_rows" in result:
  389. logger.info(f"SQL执行成功,影响了 {result['affected_rows']} 行数据")
  390. if "result" in result:
  391. logger.info(f"SQL查询返回 {len(result['result'])} 行结果")
  392. else:
  393. logger.info("SQL执行成功")
  394. # 记录执行时间
  395. end_time = datetime.now()
  396. duration = (end_time - start_time).total_seconds()
  397. logger.info(f"===== SQL脚本执行完成 (成功) =====")
  398. logger.info(f"总耗时: {duration:.2f}秒")
  399. return True
  400. else:
  401. # 记录错误信息
  402. error_msg = result.get("error", "未知错误")
  403. logger.error(f"SQL执行失败: {error_msg}")
  404. # 记录执行时间
  405. end_time = datetime.now()
  406. duration = (end_time - start_time).total_seconds()
  407. logger.info(f"===== SQL脚本执行完成 (失败) =====")
  408. logger.info(f"总耗时: {duration:.2f}秒")
  409. return False
  410. except Exception as e:
  411. # 捕获所有未处理的异常
  412. logger.error(f"执行SQL脚本时发生未预期的错误: {str(e)}", exc_info=True)
  413. return False
  414. if __name__ == "__main__":
  415. # 直接执行时的测试代码
  416. import argparse
  417. parser = argparse.ArgumentParser(description='执行SQL脚本')
  418. parser.add_argument('--target-table', type=str, required=True, help='目标表名')
  419. parser.add_argument('--script-name', type=str, required=True, help='脚本名称')
  420. parser.add_argument('--exec-date', type=str, required=True, help='执行日期 (YYYY-MM-DD)')
  421. parser.add_argument('--schedule_frequency', type=str, required=True,
  422. choices=['daily', 'weekly', 'monthly', 'quarterly', 'yearly'],
  423. help='频率: daily, weekly, monthly, quarterly, yearly')
  424. args = parser.parse_args()
  425. # 构造必要的 kwargs
  426. run_kwargs = {
  427. "script_type": "sql",
  428. "target_table": args.target_table,
  429. "script_name": args.script_name,
  430. "exec_date": args.exec_date,
  431. "schedule_frequency": args.schedule_frequency,
  432. }
  433. logger.info("命令行测试执行参数: " + str(run_kwargs))
  434. success = run(**run_kwargs)
  435. if success:
  436. logger.info("SQL脚本执行成功")
  437. sys.exit(0)
  438. else:
  439. logger.error("SQL脚本执行失败")
  440. sys.exit(1)