book_sale_amt_daily_clean.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime, timedelta
  7. import time
  8. import random
  9. # 配置日志记录器
  10. logging.basicConfig(
  11. level=logging.INFO,
  12. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  13. handlers=[
  14. logging.StreamHandler(sys.stdout)
  15. ]
  16. )
  17. logger = logging.getLogger("book_sale_amt_daily_clean")
  18. def clean_daily_book_sales():
  19. """清洗日度图书销售额数据的函数"""
  20. # 获取当前脚本的文件名
  21. script_name = os.path.basename(__file__)
  22. logger.info(f"开始执行日度图书销售额数据清洗 - 脚本: {script_name}")
  23. try:
  24. # 模拟数据处理过程
  25. logger.info("从数据源获取原始销售数据...")
  26. # 实际应用中这里会连接到数据库或其他数据源
  27. logger.info("执行数据清洗流程...")
  28. # 模拟处理步骤
  29. today = datetime.now()
  30. yesterday = today - timedelta(days=1)
  31. date_str = yesterday.strftime('%Y-%m-%d')
  32. logger.info(f"正在清洗 {date_str} 的数据")
  33. logger.info("检查数据完整性...")
  34. logger.info("检测并处理异常值...")
  35. logger.info("填充缺失数据...")
  36. logger.info("标准化数据格式...")
  37. logger.info("去除重复记录...")
  38. logger.info("数据清洗完成,准备保存结果...")
  39. # 实际应用中这里会将结果保存到数据库
  40. # 模拟处理时间
  41. processing_time = random.uniform(0.5, 2.0)
  42. logger.info(f"开始处理数据,预计需要 {processing_time:.2f} 秒")
  43. time.sleep(processing_time)
  44. # 模拟处理逻辑
  45. current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  46. logger.info(f"数据处理中... 当前时间: {current_time}")
  47. # 模拟数据清洗操作
  48. logger.info(f"执行数据清洗操作: 移除异常值、填充缺失值、标准化格式")
  49. time.sleep(processing_time)
  50. # 模拟写入数据库
  51. success_rate = random.random()
  52. if success_rate > 0.1: # 90%的成功率
  53. logger.info(f"表 {date_str} 数据清洗成功,已处理并写入")
  54. return True
  55. else:
  56. logger.error(f"表 {date_str} 数据清洗或写入过程中出现随机错误")
  57. return False
  58. except Exception as e:
  59. logger.error(f"清洗日度图书销售额数据时出错: {str(e)}")
  60. return False
  61. def run(table_name, execution_mode, **kwargs):
  62. """
  63. 统一入口函数,符合Airflow动态脚本调用规范
  64. 参数:
  65. table_name (str): 要处理的表名
  66. execution_mode (str): 执行模式 (append/full_refresh)
  67. **kwargs: 其他可能的参数
  68. 返回:
  69. bool: 执行成功返回True,否则返回False
  70. """
  71. logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
  72. # 获取当前脚本的文件名
  73. script_name = os.path.basename(__file__)
  74. # 记录详细的执行信息
  75. logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
  76. logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  77. # 根据执行模式判断处理逻辑
  78. if execution_mode == "full_refresh":
  79. logger.info("执行完全刷新模式 - 将处理所有历史数据")
  80. logger.info("获取过去30天的历史数据进行清洗...")
  81. else: # append
  82. logger.info("执行增量模式 - 只清洗最新一天的数据")
  83. # 调用实际处理函数
  84. result = clean_daily_book_sales()
  85. logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  86. logger.info(f"处理结果: {'成功' if result else '失败'}")
  87. return result
  88. if __name__ == "__main__":
  89. # 直接执行时调用统一入口函数
  90. run(table_name="book_sale_amt_daily", execution_mode="append")