emp_training_stats_table.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime
  7. # 配置日志记录器
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  11. handlers=[
  12. logging.StreamHandler(sys.stdout)
  13. ]
  14. )
  15. logger = logging.getLogger("emp_training_stats_table")
  16. def process_emp_training_stats(table_name=None, exec_date=None, execution_mode=None, script_name=None, **kwargs):
  17. """处理员工培训统计数据的模拟函数"""
  18. # 获取当前脚本的文件名(如果没有传入)
  19. if script_name is None:
  20. script_name = os.path.basename(__file__)
  21. # 打印所有传入的参数
  22. logger.info(f"===== 传入参数信息 (处理函数内) =====")
  23. logger.info(f"table_name: {table_name}")
  24. logger.info(f"exec_date: {exec_date}")
  25. logger.info(f"execution_mode: {execution_mode}")
  26. logger.info(f"script_name: {script_name}")
  27. # 打印所有可能的额外参数
  28. for key, value in kwargs.items():
  29. logger.info(f"额外参数 - {key}: {value}")
  30. logger.info(f"======================================")
  31. logger.info(f"开始执行员工培训统计数据处理 - 脚本: {script_name}, 表: {table_name}")
  32. try:
  33. # 模拟数据处理过程
  34. logger.info("模拟处理员工培训统计数据...")
  35. # 在实际应用中,这里可以添加具体的数据处理逻辑
  36. logger.info(f"处理日期: {exec_date}, 模式: {execution_mode}")
  37. # 模拟处理成功
  38. logger.info(f"表 {table_name} 数据处理成功")
  39. return True
  40. except Exception as e:
  41. logger.error(f"处理员工培训统计数据时出错: {str(e)}")
  42. return False
  43. def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
  44. """
  45. 统一入口函数,符合Airflow动态脚本调用规范
  46. 参数:
  47. table_name (str): 要处理的表名
  48. execution_mode (str): 执行模式 (append/full_refresh)
  49. exec_date: 执行日期 (可以是字符串 YYYY-MM-DD 或 datetime 对象)
  50. script_name: 脚本名称
  51. **kwargs: 其他可能的参数
  52. 返回:
  53. bool: 执行成功返回True,否则返回False
  54. """
  55. # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
  56. logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
  57. logger.info(f"table_name: {table_name}")
  58. logger.info(f"execution_mode: {execution_mode}")
  59. logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)})")
  60. logger.info(f"script_name: {script_name}")
  61. # 打印所有可能的额外参数
  62. for key, value in kwargs.items():
  63. logger.info(f"额外参数 - {key}: {value}")
  64. logger.info(f"=========================================")
  65. # 如果没有提供脚本名,使用当前脚本的文件名
  66. if script_name is None:
  67. script_name = os.path.basename(__file__)
  68. # 记录详细的执行信息
  69. start_time = datetime.now()
  70. logger.info(f"脚本 '{script_name}' 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
  71. # 调用实际处理函数
  72. result = process_emp_training_stats(
  73. table_name=table_name,
  74. exec_date=exec_date,
  75. execution_mode=execution_mode,
  76. script_name=script_name,
  77. **kwargs # 将额外参数传递给处理函数
  78. )
  79. end_time = datetime.now()
  80. logger.info(f"脚本 '{script_name}' 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
  81. logger.info(f"总耗时: {end_time - start_time}")
  82. logger.info(f"处理结果: {'成功' if result else '失败'}")
  83. return result
  84. if __name__ == "__main__":
  85. # 提供一些默认值以便直接运行脚本进行测试
  86. test_params = {
  87. "table_name": "emp_training_stats",
  88. "execution_mode": "append",
  89. "exec_date": datetime.now().strftime('%Y-%m-%d'),
  90. "script_name": os.path.basename(__file__),
  91. "test_param1": "value1",
  92. "test_param2": 123
  93. }
  94. logger.info(f"以主脚本方式运行,使用测试参数: {test_params}")
  95. run(**test_params)