script_utils.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. from datetime import datetime, timedelta
  6. import pytz
  7. import re # 添加re模块以支持正则表达式
  8. # 配置日志记录器
  9. logging.basicConfig(
  10. level=logging.INFO,
  11. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  12. handlers=[
  13. logging.StreamHandler(sys.stdout)
  14. ]
  15. )
  16. logger = logging.getLogger("script_utils")
  17. def get_date_range(exec_date, frequency):
  18. """
  19. 根据执行日期和频率,计算开始日期和结束日期
  20. 参数:
  21. exec_date (str): 执行日期,格式为 YYYY-MM-DD
  22. frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
  23. 返回:
  24. tuple: (start_date, end_date) 格式为 YYYY-MM-DD 的字符串
  25. """
  26. logger.info(f"计算日期范围 - 执行日期: {exec_date}, 频率: {frequency}")
  27. # 将输入的日期转换为上海时区的datetime对象
  28. shanghai_tz = pytz.timezone('Asia/Shanghai')
  29. try:
  30. # 解析输入的exec_date
  31. if isinstance(exec_date, str):
  32. date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
  33. elif isinstance(exec_date, datetime):
  34. date_obj = exec_date
  35. else:
  36. raise ValueError(f"不支持的exec_date类型: {type(exec_date)}")
  37. # 转换为上海时区
  38. date_obj = shanghai_tz.localize(date_obj)
  39. logger.info(f"上海时区的执行日期: {date_obj}")
  40. # 根据不同频率计算日期范围
  41. if frequency.lower() == 'daily':
  42. # 每日: start_date = exec_date, end_date = exec_date + 1 day
  43. start_date = date_obj.strftime('%Y-%m-%d')
  44. end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
  45. elif frequency.lower() == 'weekly':
  46. # 每周: start_date = 本周一, end_date = 下周一
  47. days_since_monday = date_obj.weekday() # 0=周一, 6=周日
  48. monday = date_obj - timedelta(days=days_since_monday)
  49. next_monday = monday + timedelta(days=7)
  50. start_date = monday.strftime('%Y-%m-%d')
  51. end_date = next_monday.strftime('%Y-%m-%d')
  52. elif frequency.lower() == 'monthly':
  53. # 每月: start_date = 本月第一天, end_date = 下月第一天
  54. first_day = date_obj.replace(day=1)
  55. # 计算下个月的第一天
  56. if first_day.month == 12:
  57. next_month_first_day = first_day.replace(year=first_day.year + 1, month=1)
  58. else:
  59. next_month_first_day = first_day.replace(month=first_day.month + 1)
  60. start_date = first_day.strftime('%Y-%m-%d')
  61. end_date = next_month_first_day.strftime('%Y-%m-%d')
  62. elif frequency.lower() == 'quarterly':
  63. # 每季度: start_date = 本季度第一天, end_date = 下季度第一天
  64. quarter = (date_obj.month - 1) // 3 + 1 # 1-4季度
  65. first_month_of_quarter = (quarter - 1) * 3 + 1 # 季度的第一个月
  66. quarter_first_day = date_obj.replace(month=first_month_of_quarter, day=1)
  67. # 计算下个季度的第一天
  68. if quarter == 4:
  69. next_quarter_first_day = quarter_first_day.replace(year=quarter_first_day.year + 1, month=1)
  70. else:
  71. next_quarter_first_day = quarter_first_day.replace(month=first_month_of_quarter + 3)
  72. start_date = quarter_first_day.strftime('%Y-%m-%d')
  73. end_date = next_quarter_first_day.strftime('%Y-%m-%d')
  74. elif frequency.lower() == 'yearly':
  75. # 每年: start_date = 本年第一天, end_date = 下年第一天
  76. year_first_day = date_obj.replace(month=1, day=1)
  77. next_year_first_day = date_obj.replace(year=date_obj.year + 1, month=1, day=1)
  78. start_date = year_first_day.strftime('%Y-%m-%d')
  79. end_date = next_year_first_day.strftime('%Y-%m-%d')
  80. else:
  81. logger.error(f"不支持的频率: {frequency}")
  82. raise ValueError(f"不支持的频率: {frequency}")
  83. logger.info(f"计算结果 - 开始日期: {start_date}, 结束日期: {end_date}")
  84. return start_date, end_date
  85. except Exception as e:
  86. logger.error(f"计算日期范围时出错: {str(e)}", exc_info=True)
  87. raise
  88. import re
  89. from typing import Dict, List, Optional, Set
  90. def extract_source_fields_linked_to_template(sql: str, jinja_vars: List[str]) -> Set[str]:
  91. """
  92. 从 SQL 中提取和 jinja 模板变量绑定的源字段(支持各种形式)
  93. """
  94. fields = set()
  95. sql = re.sub(r"\s+", " ", sql)
  96. for var in jinja_vars:
  97. # 普通比较、函数包裹
  98. pattern = re.compile(
  99. r"""
  100. (?P<field>
  101. (?:\w+\s*\(\s*)? # 可选函数开始(如 DATE(
  102. [\w\.]+ # 字段名
  103. (?:\s+AS\s+\w+)? # 可选 CAST 形式
  104. \)? # 可选右括号
  105. )
  106. \s*(=|<|>|<=|>=)\s*['"]?\{\{\s*""" + var + r"""\s*\}\}['"]?
  107. """, re.IGNORECASE | re.VERBOSE
  108. )
  109. fields.update(match.group("field").strip() for match in pattern.finditer(sql))
  110. # BETWEEN '{{ start_date }}' AND '{{ end_date }}'
  111. if var == "start_date":
  112. pattern_between = re.compile(
  113. r"""(?P<field>
  114. (?:\w+\s*\(\s*)?[\w\.]+(?:\s+AS\s+\w+)?\)? # 字段(函数包裹可选)
  115. )
  116. \s+BETWEEN\s+['"]?\{\{\s*start_date\s*\}\}['"]?\s+AND\s+['"]?\{\{\s*end_date\s*\}\}
  117. """, re.IGNORECASE | re.VERBOSE
  118. )
  119. fields.update(match.group("field").strip() for match in pattern_between.finditer(sql))
  120. return {extract_core_field(f) for f in fields}
  121. def extract_core_field(expr: str) -> str:
  122. """
  123. 清洗函数包裹的字段表达式:DATE(sd.sale_date) -> sd.sale_date, CAST(...) -> ...
  124. """
  125. expr = re.sub(r"CAST\s*\(\s*([\w\.]+)\s+AS\s+\w+\s*\)", r"\1", expr, flags=re.IGNORECASE)
  126. expr = re.sub(r"\b\w+\s*\(\s*([\w\.]+)\s*\)", r"\1", expr)
  127. return expr.strip()
  128. def parse_select_aliases(sql: str) -> Dict[str, str]:
  129. """
  130. 提取 SELECT 中的字段别名映射:原字段 -> 目标别名
  131. """
  132. sql = re.sub(r"\s+", " ", sql)
  133. select_clause_match = re.search(r"SELECT\s+(.*?)\s+FROM", sql, re.IGNORECASE)
  134. if not select_clause_match:
  135. return {}
  136. select_clause = select_clause_match.group(1)
  137. mappings = {}
  138. for expr in select_clause.split(","):
  139. expr = expr.strip()
  140. alias_match = re.match(r"([\w\.]+)\s+AS\s+([\w]+)", expr, re.IGNORECASE)
  141. if alias_match:
  142. source, alias = alias_match.groups()
  143. mappings[source.strip()] = alias.strip()
  144. return mappings
  145. def find_target_date_field(sql: str, jinja_vars: List[str] = ["start_date", "end_date"]) -> Optional[str]:
  146. """
  147. 从 SQL 中找出与模板时间变量绑定的目标表字段(只返回一个)
  148. """
  149. source_fields = extract_source_fields_linked_to_template(sql, jinja_vars)
  150. alias_map = parse_select_aliases(sql)
  151. # 匹配 SELECT 中的映射字段
  152. for src_field in source_fields:
  153. if src_field in alias_map:
  154. return alias_map[src_field] # 源字段映射的目标字段
  155. # 若未通过 AS 映射,可能直接 SELECT sd.sale_date(裸字段)
  156. for src_field in source_fields:
  157. if '.' not in src_field:
  158. return src_field # 裸字段直接作为目标字段名
  159. return None
  160. def generate_delete_sql(sql_content, target_table=None):
  161. """
  162. 根据SQL脚本内容生成用于清理数据的DELETE语句
  163. 参数:
  164. sql_content (str): 原始SQL脚本内容
  165. target_table (str, optional): 目标表名,如果SQL脚本中无法解析出表名时使用
  166. 返回:
  167. str: DELETE语句,用于清理数据
  168. """
  169. logger.info("生成清理SQL语句,实现ETL作业幂等性")
  170. # 如果提供了目标表名,直接使用
  171. if target_table:
  172. logger.info(f"使用提供的目标表名: {target_table}")
  173. delete_stmt = f"""DELETE FROM {target_table}
  174. WHERE summary_date >= '{{{{ start_date }}}}'
  175. AND summary_date < '{{{{ end_date }}}}';"""
  176. logger.info(f"生成的清理SQL: {delete_stmt}")
  177. return delete_stmt
  178. # 尝试从SQL内容中解析出目标表名
  179. try:
  180. # 简单解析,尝试找出INSERT语句的目标表
  181. # 匹配 INSERT INTO xxx 或 INSERT INTO "xxx" 或 INSERT INTO `xxx` 或 INSERT INTO [xxx]
  182. insert_match = re.search(r'INSERT\s+INTO\s+(?:["\[`])?([a-zA-Z0-9_\.]+)(?:["\]`])?', sql_content, re.IGNORECASE)
  183. if insert_match:
  184. table_name = insert_match.group(1)
  185. logger.info(f"从SQL中解析出目标表名: {table_name}")
  186. delete_stmt = f"""DELETE FROM {table_name}
  187. WHERE summary_date >= '{{{{ start_date }}}}'
  188. AND summary_date < '{{{{ end_date }}}}';"""
  189. logger.info(f"生成的清理SQL: {delete_stmt}")
  190. return delete_stmt
  191. else:
  192. logger.warning("无法从SQL中解析出目标表名,无法生成清理SQL")
  193. return None
  194. except Exception as e:
  195. logger.error(f"解析SQL生成清理语句时出错: {str(e)}", exc_info=True)
  196. return None