1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import logging
- import sys
- import os
- from datetime import datetime
- # 配置日志记录器
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.StreamHandler(sys.stdout)
- ]
- )
- logger = logging.getLogger("load_data")
- def load_data_from_source(source_name="default", execution_date=None, execution_mode=None, script_name=None):
- """从数据源加载数据的示例函数"""
- if execution_date is None:
- execution_date = datetime.now()
-
- # 获取当前脚本的文件名(如果没有传入)
- if script_name is None:
- script_name = os.path.basename(__file__)
-
-
- # 使用logger.info输出所有参数
- logger.info(f"===== 参数信息 (logger输出) =====")
- logger.info(f"table_name: {source_name}")
- logger.info(f"exec_date: {execution_date}")
- logger.info(f"execution_mode: {execution_mode}")
- logger.info(f"script_name: {script_name}")
- logger.info(f"================================")
-
- return True
- def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
- """
- 统一入口函数,符合Airflow动态脚本调用规范
-
- 参数:
- table_name (str): 要处理的表名
- execution_mode (str): 执行模式 (append/full_refresh)
- exec_date: 执行日期
- script_name: 脚本名称
- **kwargs: 其他可能的参数
-
- 返回:
- bool: 执行成功返回True,否则返回False
- """
- logger.info(f"开始执行脚本...")
-
- # 打印所有传入的参数
- logger.info(f"===== 传入参数信息 =====")
- logger.info(f"table_name: {table_name}")
- logger.info(f"execution_mode: {execution_mode}")
- logger.info(f"exec_date: {exec_date}")
- logger.info(f"script_name: {script_name}")
-
- # 打印所有可能的额外参数
- for key, value in kwargs.items():
- logger.info(f"额外参数 - {key}: {value}")
- logger.info(f"========================")
-
- # 实际调用内部处理函数
- return load_data_from_source(
- source_name=table_name,
- execution_date=exec_date,
- execution_mode=execution_mode,
- script_name=script_name
- )
- if __name__ == "__main__":
- # 直接执行时调用统一入口函数,传入测试参数
- run(
- table_name="test_table",
- execution_mode="append",
- exec_date=datetime.now(),
- script_name=os.path.basename(__file__)
- )
|