books_total_process.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime
  7. # 配置日志记录器
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  11. handlers=[
  12. logging.StreamHandler(sys.stdout)
  13. ]
  14. )
  15. logger = logging.getLogger("book_total_process")
  16. def process_book_data(table_name=None, execution_date=None, execution_mode=None, script_name=None):
  17. """处理图书数据的示例函数"""
  18. # 获取当前脚本的文件名(如果没有传入)
  19. if script_name is None:
  20. script_name = os.path.basename(__file__)
  21. # 使用print输出所有参数
  22. print(f"===== 参数信息 (print输出) =====")
  23. print(f"table_name: {table_name}")
  24. print(f"exec_date: {execution_date}")
  25. print(f"execution_mode: {execution_mode}")
  26. print(f"script_name: {script_name}")
  27. print(f"================================")
  28. # 使用logger.info输出所有参数
  29. logger.info(f"===== 参数信息 (logger输出) =====")
  30. logger.info(f"table_name: {table_name}")
  31. logger.info(f"exec_date: {execution_date}")
  32. logger.info(f"execution_mode: {execution_mode}")
  33. logger.info(f"script_name: {script_name}")
  34. logger.info(f"================================")
  35. return True
  36. def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
  37. """
  38. 统一入口函数,符合Airflow动态脚本调用规范
  39. 参数:
  40. table_name (str): 要处理的表名
  41. execution_mode (str): 执行模式 (append/full_refresh)
  42. exec_date: 执行日期
  43. script_name: 脚本名称
  44. **kwargs: 其他可能的参数
  45. 返回:
  46. bool: 执行成功返回True,否则返回False
  47. """
  48. logger.info(f"开始执行脚本...")
  49. # 打印所有传入的参数
  50. logger.info(f"===== 传入参数信息 =====")
  51. logger.info(f"table_name: {table_name}")
  52. logger.info(f"execution_mode: {execution_mode}")
  53. logger.info(f"exec_date: {exec_date}")
  54. logger.info(f"script_name: {script_name}")
  55. # 打印所有可能的额外参数
  56. for key, value in kwargs.items():
  57. logger.info(f"额外参数 - {key}: {value}")
  58. logger.info(f"========================")
  59. # 如果没有传入script_name,使用当前脚本名
  60. if script_name is None:
  61. script_name = os.path.basename(__file__)
  62. # 实际调用内部处理函数
  63. return process_book_data(
  64. table_name=table_name,
  65. execution_date=exec_date,
  66. execution_mode=execution_mode,
  67. script_name=script_name
  68. )
  69. if __name__ == "__main__":
  70. # 直接执行时调用统一入口函数,传入测试参数
  71. run(
  72. table_name="books",
  73. execution_mode="full_refresh",
  74. exec_date=datetime.now(),
  75. script_name=os.path.basename(__file__)
  76. )