book_sale_amt_daily_clean.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime, timedelta
  7. # 配置日志记录器
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  11. handlers=[
  12. logging.StreamHandler(sys.stdout)
  13. ]
  14. )
  15. logger = logging.getLogger("book_sale_amt_daily_clean")
  16. def clean_daily_book_sales():
  17. """清洗日度图书销售额数据的函数"""
  18. # 获取当前脚本的文件名
  19. script_name = os.path.basename(__file__)
  20. logger.info(f"开始执行日度图书销售额数据清洗 - 脚本: {script_name}")
  21. try:
  22. # 模拟数据处理过程
  23. logger.info("从数据源获取原始销售数据...")
  24. # 实际应用中这里会连接到数据库或其他数据源
  25. logger.info("执行数据清洗流程...")
  26. # 模拟处理步骤
  27. today = datetime.now()
  28. yesterday = today - timedelta(days=1)
  29. date_str = yesterday.strftime('%Y-%m-%d')
  30. logger.info(f"正在清洗 {date_str} 的数据")
  31. logger.info("检查数据完整性...")
  32. logger.info("检测并处理异常值...")
  33. logger.info("填充缺失数据...")
  34. logger.info("标准化数据格式...")
  35. logger.info("去除重复记录...")
  36. logger.info("数据清洗完成,准备保存结果...")
  37. # 实际应用中这里会将结果保存到数据库
  38. return True
  39. except Exception as e:
  40. logger.error(f"清洗日度图书销售额数据时出错: {str(e)}")
  41. return False
  42. def run(table_name, execution_mode, **kwargs):
  43. """
  44. 统一入口函数,符合Airflow动态脚本调用规范
  45. 参数:
  46. table_name (str): 要处理的表名
  47. execution_mode (str): 执行模式 (append/full_refresh)
  48. **kwargs: 其他可能的参数
  49. 返回:
  50. bool: 执行成功返回True,否则返回False
  51. """
  52. logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
  53. # 获取当前脚本的文件名
  54. script_name = os.path.basename(__file__)
  55. # 记录详细的执行信息
  56. logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
  57. logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  58. # 根据执行模式判断处理逻辑
  59. if execution_mode == "full_refresh":
  60. logger.info("执行完全刷新模式 - 将处理所有历史数据")
  61. logger.info("获取过去30天的历史数据进行清洗...")
  62. else: # append
  63. logger.info("执行增量模式 - 只清洗最新一天的数据")
  64. # 调用实际处理函数
  65. result = clean_daily_book_sales()
  66. logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  67. logger.info(f"处理结果: {'成功' if result else '失败'}")
  68. return result
  69. if __name__ == "__main__":
  70. # 直接执行时调用统一入口函数
  71. run(table_name="book_sale_amt_daily", execution_mode="append")