book_sale_amt_2weekly_process.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime, timedelta
  7. # 配置日志记录器
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  11. handlers=[
  12. logging.StreamHandler(sys.stdout)
  13. ]
  14. )
  15. logger = logging.getLogger("book_sale_amt_2weekly_process")
  16. def process_2weekly_book_sales():
  17. """处理双周图书销售额数据的函数"""
  18. # 获取当前脚本的文件名
  19. script_name = os.path.basename(__file__)
  20. logger.info(f"开始执行双周图书销售额处理 - 脚本: {script_name}")
  21. try:
  22. # 模拟数据处理过程
  23. logger.info("从数据源获取原始销售数据...")
  24. # 实际应用中这里会连接到数据库或其他数据源
  25. logger.info("按双周期汇总销售额...")
  26. # 模拟处理步骤
  27. today = datetime.now()
  28. # 计算当前所在的周数
  29. week_number = int(today.strftime("%U"))
  30. # 计算当前双周期
  31. biweekly_number = (week_number // 2) + 1
  32. # 计算双周期的开始和结束日期
  33. start_day = datetime.strptime(f"{today.year}-W{biweekly_number*2-1}-1", "%Y-W%W-%w")
  34. end_day = start_day + timedelta(days=13) # 两周共14天
  35. date_range = f"{start_day.strftime('%Y-%m-%d')} 至 {end_day.strftime('%Y-%m-%d')}"
  36. logger.info(f"正在处理第 {biweekly_number} 个双周期 ({date_range}) 的数据")
  37. logger.info("汇总统计双周期内每天的销售数据...")
  38. logger.info("计算与上一个双周期的对比...")
  39. logger.info("计算热销书籍排行榜...")
  40. logger.info("生成双周期销售趋势分析...")
  41. logger.info("数据处理完成,准备保存结果...")
  42. # 实际应用中这里会将结果保存到数据库
  43. return True
  44. except Exception as e:
  45. logger.error(f"处理双周图书销售额时出错: {str(e)}")
  46. return False
  47. def run(table_name, execution_mode, **kwargs):
  48. """
  49. 统一入口函数,符合Airflow动态脚本调用规范
  50. 参数:
  51. table_name (str): 要处理的表名
  52. execution_mode (str): 执行模式 (append/full_refresh)
  53. **kwargs: 其他可能的参数
  54. 返回:
  55. bool: 执行成功返回True,否则返回False
  56. """
  57. logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
  58. # 获取当前脚本的文件名
  59. script_name = os.path.basename(__file__)
  60. # 记录详细的执行信息
  61. logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
  62. logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  63. # 根据执行模式判断处理逻辑
  64. if execution_mode == "full_refresh":
  65. logger.info("执行完全刷新模式 - 将处理所有历史数据")
  66. logger.info("获取过去12个双周期的历史数据进行分析...")
  67. else: # append
  68. logger.info("执行增量模式 - 只处理最新双周期数据")
  69. # 调用实际处理函数
  70. result = process_2weekly_book_sales()
  71. logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  72. logger.info(f"处理结果: {'成功' if result else '失败'}")
  73. return result
  74. if __name__ == "__main__":
  75. # 直接执行时调用统一入口函数
  76. run(table_name="book_sale_amt_2weekly", execution_mode="append")