load_data.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime
  7. # 配置日志记录器
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  11. handlers=[
  12. logging.StreamHandler(sys.stdout)
  13. ]
  14. )
  15. logger = logging.getLogger("load_data")
  16. def load_data_from_source(source_name="default", execution_date=None):
  17. """从数据源加载数据的示例函数"""
  18. if execution_date is None:
  19. execution_date = datetime.now()
  20. # 获取当前脚本的文件名
  21. script_name = os.path.basename(__file__)
  22. # 使用print输出脚本名称
  23. print(f"当前脚本名称是 {script_name} - 来自print输出 - 正在处理{source_name}数据")
  24. # 使用logger.info输出脚本名称
  25. logger.info(f"当前脚本名称是 {script_name} - 来自logger.info输出 - 执行日期: {execution_date}")
  26. return True
  27. def run(table_name, execution_mode, **kwargs):
  28. """
  29. 统一入口函数,符合Airflow动态脚本调用规范
  30. 参数:
  31. table_name (str): 要处理的表名
  32. execution_mode (str): 执行模式 (append/full_refresh)
  33. **kwargs: 其他可能的参数
  34. 返回:
  35. bool: 执行成功返回True,否则返回False
  36. """
  37. logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
  38. # 获取当前脚本的文件名
  39. script_name = os.path.basename(__file__)
  40. logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
  41. # 实际调用内部处理函数
  42. return load_data_from_source(source_name=table_name)
  43. if __name__ == "__main__":
  44. # 直接执行时调用统一入口函数
  45. run(table_name="test_table", execution_mode="append")