books_total_process.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. # 配置日志记录器
  7. logging.basicConfig(
  8. level=logging.INFO,
  9. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  10. handlers=[
  11. logging.StreamHandler(sys.stdout)
  12. ]
  13. )
  14. logger = logging.getLogger("book_total_process")
  15. def process_book_data():
  16. """处理图书数据的示例函数"""
  17. # 获取当前脚本的文件名
  18. script_name = os.path.basename(__file__)
  19. # 使用logger.info输出脚本名称
  20. logger.info(f"当前脚本名称是 {script_name} - 来自logger.info输出")
  21. return True
  22. def run(table_name, execution_mode, **kwargs):
  23. """
  24. 统一入口函数,符合Airflow动态脚本调用规范
  25. 参数:
  26. table_name (str): 要处理的表名
  27. execution_mode (str): 执行模式 (append/full_refresh)
  28. **kwargs: 其他可能的参数
  29. 返回:
  30. bool: 执行成功返回True,否则返回False
  31. """
  32. logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
  33. # 获取当前脚本的文件名
  34. script_name = os.path.basename(__file__)
  35. # 同时使用print和logger输出以便比较
  36. logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
  37. # 实际调用内部处理函数
  38. return process_book_data()
  39. if __name__ == "__main__":
  40. # 直接执行时调用统一入口函数
  41. run(table_name="books", execution_mode="full_refresh")