load_file.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import logging
  4. import sys
  5. import os
  6. from datetime import datetime
  7. # 配置日志记录器
  8. logging.basicConfig(
  9. level=logging.INFO,
  10. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  11. handlers=[
  12. logging.StreamHandler(sys.stdout)
  13. ]
  14. )
  15. logger = logging.getLogger("load_file_mock") # 使用 mock 后缀以区分
  16. def mock_load_file(table_name=None, execution_mode='append', exec_date=None,
  17. target_type=None, storage_location=None, frequency=None, script_name=None, **kwargs):
  18. """模拟加载文件数据,仅打印参数"""
  19. # 获取当前脚本的文件名(如果没有传入)
  20. if script_name is None:
  21. script_name = os.path.basename(__file__)
  22. # 打印所有传入的参数
  23. logger.info(f"===== 传入参数信息 (模拟处理函数内) =====")
  24. logger.info(f"table_name: {table_name}")
  25. logger.info(f"execution_mode: {execution_mode}")
  26. logger.info(f"exec_date: {exec_date}")
  27. logger.info(f"target_type: {target_type}")
  28. logger.info(f"storage_location: {storage_location}")
  29. logger.info(f"frequency: {frequency}")
  30. logger.info(f"script_name: {script_name}")
  31. # 打印所有可能的额外参数
  32. for key, value in kwargs.items():
  33. logger.info(f"额外参数 - {key}: {value}")
  34. logger.info(f"=========================================")
  35. logger.info(f"开始模拟文件加载 - 脚本: {script_name}, 表: {table_name}")
  36. try:
  37. logger.info("模拟检查参数...")
  38. if not storage_location:
  39. logger.warning("警告: 未提供 storage_location (文件路径)")
  40. else:
  41. logger.info(f"模拟检查文件是否存在: {storage_location}")
  42. logger.info(f"模拟执行模式: {execution_mode}")
  43. if execution_mode == 'full_refresh':
  44. logger.info(f"模拟: 如果是全量刷新,将清空表 {table_name}")
  45. logger.info("模拟读取和处理文件...")
  46. # 模拟成功
  47. logger.info(f"模拟: 表 {table_name} 文件加载成功")
  48. return True
  49. except Exception as e:
  50. logger.error(f"模拟加载文件时出错: {str(e)}")
  51. return False
  52. def run(table_name, execution_mode='append', exec_date=None, target_type=None,
  53. storage_location=None, frequency=None, script_name=None, **kwargs):
  54. """
  55. 统一入口函数,符合Airflow动态脚本调用规范 (模拟版本)
  56. 参数:
  57. table_name (str): 要处理的表名
  58. execution_mode (str): 执行模式 (append/full_refresh)
  59. exec_date: 执行日期
  60. target_type: 目标类型
  61. storage_location: 文件路径
  62. frequency: 更新频率
  63. script_name: 脚本名称
  64. **kwargs: 其他可能的参数
  65. 返回:
  66. bool: 执行成功返回True,否则返回False
  67. """
  68. # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
  69. logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
  70. logger.info(f"table_name: {table_name}")
  71. logger.info(f"execution_mode: {execution_mode}")
  72. logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)}) ")
  73. logger.info(f"target_type: {target_type}")
  74. logger.info(f"storage_location: {storage_location}")
  75. logger.info(f"frequency: {frequency}")
  76. logger.info(f"script_name: {script_name}")
  77. # 打印所有可能的额外参数
  78. for key, value in kwargs.items():
  79. logger.info(f"额外参数 - {key}: {value}")
  80. logger.info(f"=========================================")
  81. # 如果没有提供脚本名,使用当前脚本的文件名
  82. if script_name is None:
  83. script_name = os.path.basename(__file__)
  84. # 记录详细的执行信息
  85. start_time = datetime.now()
  86. logger.info(f"脚本 '{script_name}' (模拟) 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
  87. # 调用实际处理函数 (模拟版本)
  88. result = mock_load_file(
  89. table_name=table_name,
  90. execution_mode=execution_mode,
  91. exec_date=exec_date,
  92. target_type=target_type,
  93. storage_location=storage_location,
  94. frequency=frequency,
  95. script_name=script_name,
  96. **kwargs # 将额外参数传递给处理函数
  97. )
  98. end_time = datetime.now()
  99. logger.info(f"脚本 '{script_name}' (模拟) 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
  100. logger.info(f"总耗时: {end_time - start_time}")
  101. logger.info(f"处理结果: {'成功' if result else '失败'}")
  102. return result
  103. if __name__ == "__main__":
  104. # 提供一些默认值以便直接运行脚本进行测试
  105. test_params = {
  106. "table_name": "sample_table",
  107. "execution_mode": "full_refresh",
  108. "exec_date": datetime.now().strftime('%Y-%m-%d'),
  109. "target_type": "structure",
  110. "storage_location": "/path/to/mock/file.csv",
  111. "frequency": "daily",
  112. "script_name": os.path.basename(__file__),
  113. "custom_param": "abc",
  114. "another_param": 456
  115. }
  116. logger.info(f"以主脚本方式运行 (模拟),使用测试参数: {test_params}")
  117. run(**test_params)