book_sale_amt_daily_clean.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime, timedelta
  7. import time
  8. import random
  9. # 配置日志记录器
  10. logging.basicConfig(
  11. level=logging.INFO,
  12. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  13. handlers=[
  14. logging.StreamHandler(sys.stdout)
  15. ]
  16. )
  17. logger = logging.getLogger("book_sale_amt_daily_clean")
  18. def clean_daily_book_sales(table_name=None, exec_date=None, execution_mode=None, script_name=None):
  19. """清洗日度图书销售额数据的函数"""
  20. # 获取当前脚本的文件名(如果没有传入)
  21. if script_name is None:
  22. script_name = os.path.basename(__file__)
  23. # 打印所有传入的参数
  24. logger.info(f"===== 传入参数信息 (处理函数内) =====")
  25. logger.info(f"table_name: {table_name}")
  26. logger.info(f"exec_date: {exec_date}")
  27. logger.info(f"execution_mode: {execution_mode}")
  28. logger.info(f"script_name: {script_name}")
  29. logger.info(f"======================================")
  30. logger.info(f"开始执行日度图书销售额数据清洗 - 脚本: {script_name}")
  31. try:
  32. # 模拟数据处理过程
  33. logger.info("从数据源获取原始销售数据...")
  34. # 实际应用中这里会连接到数据库或其他数据源
  35. logger.info("执行数据清洗流程...")
  36. # 尝试使用传入的日期,如果没有则使用昨天
  37. if exec_date:
  38. if isinstance(exec_date, str):
  39. try:
  40. date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
  41. date_str = exec_date
  42. except ValueError:
  43. today = datetime.now()
  44. yesterday = today - timedelta(days=1)
  45. date_str = yesterday.strftime('%Y-%m-%d')
  46. logger.warning(f"无法解析传入的exec_date: {exec_date},使用昨天日期: {date_str}")
  47. else:
  48. try:
  49. date_str = exec_date.strftime('%Y-%m-%d')
  50. except:
  51. today = datetime.now()
  52. yesterday = today - timedelta(days=1)
  53. date_str = yesterday.strftime('%Y-%m-%d')
  54. logger.warning(f"无法格式化传入的exec_date,使用昨天日期: {date_str}")
  55. else:
  56. today = datetime.now()
  57. yesterday = today - timedelta(days=1)
  58. date_str = yesterday.strftime('%Y-%m-%d')
  59. logger.info(f"未传入exec_date,使用昨天日期: {date_str}")
  60. logger.info(f"正在清洗 {date_str} 的数据")
  61. logger.info("检查数据完整性...")
  62. logger.info("检测并处理异常值...")
  63. logger.info("填充缺失数据...")
  64. logger.info("标准化数据格式...")
  65. logger.info("去除重复记录...")
  66. logger.info("数据清洗完成,准备保存结果...")
  67. # 实际应用中这里会将结果保存到数据库
  68. # 模拟处理时间
  69. processing_time = random.uniform(0.5, 2.0)
  70. logger.info(f"开始处理数据,预计需要 {processing_time:.2f} 秒")
  71. time.sleep(processing_time)
  72. # 模拟处理逻辑
  73. current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  74. logger.info(f"数据处理中... 当前时间: {current_time}")
  75. # 模拟数据清洗操作
  76. logger.info(f"执行数据清洗操作: 移除异常值、填充缺失值、标准化格式")
  77. time.sleep(processing_time)
  78. # 模拟写入数据库
  79. success_rate = random.random()
  80. if success_rate > 0.1: # 90%的成功率
  81. logger.info(f"表 {table_name} 数据清洗成功,处理日期: {date_str}")
  82. return True
  83. else:
  84. logger.error(f"表 {table_name} 数据清洗或写入过程中出现随机错误")
  85. return False
  86. except Exception as e:
  87. logger.error(f"清洗日度图书销售额数据时出错: {str(e)}")
  88. return False
  89. def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
  90. """
  91. 统一入口函数,符合Airflow动态脚本调用规范
  92. 参数:
  93. table_name (str): 要处理的表名
  94. execution_mode (str): 执行模式 (append/full_refresh)
  95. exec_date: 执行日期
  96. script_name: 脚本名称
  97. **kwargs: 其他可能的参数
  98. 返回:
  99. bool: 执行成功返回True,否则返回False
  100. """
  101. # 打印所有传入的参数
  102. logger.info(f"===== 传入参数信息 (入口函数内) =====")
  103. logger.info(f"table_name: {table_name}")
  104. logger.info(f"execution_mode: {execution_mode}")
  105. logger.info(f"exec_date: {exec_date}")
  106. logger.info(f"script_name: {script_name}")
  107. # 打印所有可能的额外参数
  108. for key, value in kwargs.items():
  109. logger.info(f"额外参数 - {key}: {value}")
  110. logger.info(f"======================================")
  111. # 如果没有提供脚本名,使用当前脚本的文件名
  112. if script_name is None:
  113. script_name = os.path.basename(__file__)
  114. # 记录详细的执行信息
  115. logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  116. # 根据执行模式判断处理逻辑
  117. if execution_mode == "full_refresh":
  118. logger.info("执行完全刷新模式 - 将处理所有历史数据")
  119. logger.info("获取过去30天的历史数据进行清洗...")
  120. else: # append
  121. logger.info("执行增量模式 - 只清洗最新一天的数据")
  122. # 调用实际处理函数
  123. result = clean_daily_book_sales(
  124. table_name=table_name,
  125. exec_date=exec_date,
  126. execution_mode=execution_mode,
  127. script_name=script_name
  128. )
  129. logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  130. logger.info(f"处理结果: {'成功' if result else '失败'}")
  131. return result
  132. if __name__ == "__main__":
  133. # 直接执行时调用统一入口函数,带上所有参数作为测试
  134. run(
  135. table_name="book_sale_amt_daily",
  136. execution_mode="append",
  137. exec_date=datetime.now().strftime('%Y-%m-%d'),
  138. script_name=os.path.basename(__file__)
  139. )