소스 검색

给execution_python.py增加参数输出

wangxq 5 일 전
부모
커밋
112c1a543a

+ 3 - 2
dags/dataops_productline_execute_dag.py

@@ -47,9 +47,10 @@ from airflow.models import Variable
 import logging
 import networkx as nx
 import json
-import os
 import pendulum
 from decimal import Decimal
+import os
+import sys
 from utils import (
     get_pg_conn, 
     get_neo4j_driver,
@@ -59,7 +60,7 @@ from utils import (
 from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG,SCHEDULE_TABLE_SCHEMA
 import pytz
 import pandas as pd
-import sys
+
 from airflow.utils.trigger_rule import TriggerRule
 
 # 创建日志记录器

+ 4 - 1
dags/dataops_productline_finalize_dag.py

@@ -15,11 +15,14 @@ import logging
 import json
 import pendulum
 import pytz
+import os
+import sys
+from utils import get_today_date
 from airflow.models import DagRun, TaskInstance
 from airflow.utils.state import State
 from sqlalchemy import desc
 from airflow import settings
-from utils import get_today_date
+
 from decimal import Decimal
 
 # 创建日志记录器

+ 2 - 5
dags/dataops_productline_manual_trigger_dag.py

@@ -62,14 +62,12 @@ from neo4j import GraphDatabase
 import psycopg2
 import networkx as nx
 import json
-from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
 import traceback
 import pendulum
-import pytz
+import sys
 from utils import get_pg_conn, get_cn_exec_date, check_script_exists, get_complete_script_info, get_table_label
 from airflow.exceptions import AirflowException
-from config import AIRFLOW_BASE_PATH, SCRIPTS_BASE_PATH, SCHEDULE_TABLE_SCHEMA
-import sys
+from config import AIRFLOW_BASE_PATH, NEO4J_CONFIG, SCRIPTS_BASE_PATH, SCHEDULE_TABLE_SCHEMA
 
 # 设置logger
 logger = logging.getLogger(__name__)
@@ -1071,7 +1069,6 @@ with DAG(
     description='script_name和target_table可以二选一,支持三种依赖级别:self(仅当前表或脚本)、resource(到Resource层)、source(到Source层)',
     schedule_interval=None,  # 设置为None表示只能手动触发
     catchup=False,
-    is_paused_upon_creation=False,
     params={
         "script_name": "",
         "target_table": "",

+ 12 - 7
dags/dataops_productline_prepare_dag.py

@@ -13,6 +13,7 @@ import glob
 from pathlib import Path
 import hashlib
 import pendulum
+import sys
 from utils import (
     get_pg_conn, 
     get_neo4j_driver,
@@ -774,15 +775,19 @@ def check_execution_plan_in_db(**kwargs):
                 # 删除历史执行计划,只保留最近N条
                 if EXECUTION_PLAN_KEEP_COUNT > 0:
                     cursor.execute(f"""
-                        DELETE FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans 
-                        WHERE dag_id = %s AND exec_date = %s
-                        AND id NOT IN (
-                            SELECT id FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
+                        WITH to_keep AS (
+                            SELECT dag_id, run_id, exec_date, insert_time
+                            FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
                             WHERE dag_id = %s AND exec_date = %s
-                            ORDER BY id DESC
-                            LIMIT {EXECUTION_PLAN_KEEP_COUNT - 1}
+                            ORDER BY insert_time DESC
+                            LIMIT %s
+                        )
+                        DELETE FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
+                        WHERE dag_id = %s AND exec_date = %s
+                        AND (dag_id, run_id, exec_date, insert_time) NOT IN (
+                            SELECT dag_id, run_id, exec_date, insert_time FROM to_keep
                         )
-                    """, (dag_id, ds, dag_id, ds))
+                    """, (dag_id, ds, EXECUTION_PLAN_KEEP_COUNT, dag_id, ds))
                     
                     deleted_rows = cursor.rowcount
                     conn.commit()

+ 3 - 2
dags/utils.py

@@ -1,12 +1,13 @@
 # utils.py
 import psycopg2
-from neo4j import GraphDatabase
+import os
+import sys
 from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
+from neo4j import GraphDatabase
 import logging
 import importlib.util
 from pathlib import Path
 import networkx as nx
-import os
 from airflow.exceptions import AirflowFailException
 from datetime import datetime, timedelta, date
 import functools

+ 0 - 97
dataops_scripts/book_sale_amt_2weekly_process.py

@@ -1,97 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_2weekly_process")
-
-def process_2weekly_book_sales():
-    """处理双周图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行双周图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按双周期汇总销售额...")
-        # 模拟处理步骤
-        today = datetime.now()
-        # 计算当前所在的周数
-        week_number = int(today.strftime("%U"))
-        # 计算当前双周期
-        biweekly_number = (week_number // 2) + 1
-        # 计算双周期的开始和结束日期
-        start_day = datetime.strptime(f"{today.year}-W{biweekly_number*2-1}-1", "%Y-W%W-%w")
-        end_day = start_day + timedelta(days=13)  # 两周共14天
-        date_range = f"{start_day.strftime('%Y-%m-%d')} 至 {end_day.strftime('%Y-%m-%d')}"
-        
-        logger.info(f"正在处理第 {biweekly_number} 个双周期 ({date_range}) 的数据")
-        
-        logger.info("汇总统计双周期内每天的销售数据...")
-        logger.info("计算与上一个双周期的对比...")
-        logger.info("计算热销书籍排行榜...")
-        logger.info("生成双周期销售趋势分析...")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理双周图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        logger.info("获取过去12个双周期的历史数据进行分析...")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新双周期数据")
-    
-    # 调用实际处理函数
-    result = process_2weekly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_2weekly", execution_mode="append") 

+ 0 - 92
dataops_scripts/book_sale_amt_2yearly_process.py

@@ -1,92 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_2yearly_process")
-
-def process_2yearly_book_sales():
-    """处理两年度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行两年度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按两年周期汇总销售额...")
-        # 模拟处理步骤
-        current_year = int(datetime.now().strftime("%Y"))
-        # 计算当前两年周期
-        cycle_start = current_year - (current_year % 2)
-        cycle_end = cycle_start + 1
-        logger.info(f"正在处理 {cycle_start}-{cycle_end} 两年周期的数据")
-        
-        logger.info("计算与上一个两年周期的对比...")
-        logger.info("计算每年在两年周期中的占比...")
-        logger.info("生成两年周期销售趋势分析...")
-        logger.info("生成中长期销售预测...")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理两年度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        # 两年周期数据通常需要处理多个周期的历史数据进行比较
-        logger.info("获取过去4个两年周期的历史数据进行分析...")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新两年周期数据")
-    
-    # 调用实际处理函数
-    result = process_2yearly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_2yearly", execution_mode="append") 

+ 0 - 166
dataops_scripts/book_sale_amt_daily_clean.py

@@ -1,166 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-import time
-import random
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_daily_clean")
-
-def clean_daily_book_sales(table_name=None, exec_date=None, execution_mode=None, script_name=None):
-    """清洗日度图书销售额数据的函数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-    
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (处理函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"script_name: {script_name}")
-    logger.info(f"======================================")
-    
-    logger.info(f"开始执行日度图书销售额数据清洗 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("执行数据清洗流程...")
-        
-        # 尝试使用传入的日期,如果没有则使用昨天
-        if exec_date:
-            if isinstance(exec_date, str):
-                try:
-                    date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
-                    date_str = exec_date
-                except ValueError:
-                    today = datetime.now()
-                    yesterday = today - timedelta(days=1)
-                    date_str = yesterday.strftime('%Y-%m-%d')
-                    logger.warning(f"无法解析传入的exec_date: {exec_date},使用昨天日期: {date_str}")
-            else:
-                try:
-                    date_str = exec_date.strftime('%Y-%m-%d')
-                except:
-                    today = datetime.now()
-                    yesterday = today - timedelta(days=1)
-                    date_str = yesterday.strftime('%Y-%m-%d')
-                    logger.warning(f"无法格式化传入的exec_date,使用昨天日期: {date_str}")
-        else:
-            today = datetime.now()
-            yesterday = today - timedelta(days=1)
-            date_str = yesterday.strftime('%Y-%m-%d')
-            logger.info(f"未传入exec_date,使用昨天日期: {date_str}")
-        
-        logger.info(f"正在清洗 {date_str} 的数据")
-        
-        logger.info("检查数据完整性...")
-        logger.info("检测并处理异常值...")
-        logger.info("填充缺失数据...")
-        logger.info("标准化数据格式...")
-        logger.info("去除重复记录...")
-        
-        logger.info("数据清洗完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        # 模拟处理时间
-        processing_time = random.uniform(0.5, 2.0)
-        logger.info(f"开始处理数据,预计需要 {processing_time:.2f} 秒")
-        time.sleep(processing_time)
-        
-        # 模拟处理逻辑
-        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-        logger.info(f"数据处理中... 当前时间: {current_time}")
-        
-        # 模拟数据清洗操作
-        logger.info(f"执行数据清洗操作: 移除异常值、填充缺失值、标准化格式")
-        time.sleep(processing_time)
-        
-        # 模拟写入数据库
-        success_rate = random.random()
-        if success_rate > 0.1:  # 90%的成功率
-            logger.info(f"表 {table_name} 数据清洗成功,处理日期: {date_str}")
-            return True
-        else:
-            logger.error(f"表 {table_name} 数据清洗或写入过程中出现随机错误")
-            return False
-    except Exception as e:
-        logger.error(f"清洗日度图书销售额数据时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        exec_date: 执行日期
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (入口函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"script_name: {script_name}")
-    
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"======================================")
-    
-    # 如果没有提供脚本名,使用当前脚本的文件名
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        logger.info("获取过去30天的历史数据进行清洗...")
-    else:  # append
-        logger.info("执行增量模式 - 只清洗最新一天的数据")
-    
-    # 调用实际处理函数
-    result = clean_daily_book_sales(
-        table_name=table_name, 
-        exec_date=exec_date,
-        execution_mode=execution_mode, 
-        script_name=script_name
-    )
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数,带上所有参数作为测试
-    run(
-        table_name="book_sale_amt_daily", 
-        execution_mode="append",
-        exec_date=datetime.now().strftime('%Y-%m-%d'),
-        script_name=os.path.basename(__file__)
-    ) 

+ 0 - 90
dataops_scripts/book_sale_amt_half_yearly_process.py

@@ -1,90 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_half_yearly_process")
-
-def process_half_yearly_book_sales():
-    """处理半年度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行半年度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按半年汇总销售额...")
-        # 模拟处理步骤
-        current_year = datetime.now().strftime("%Y")
-        current_month = datetime.now().month
-        half_year = "上半年" if current_month <= 6 else "下半年"
-        logger.info(f"正在处理 {current_year} 年 {half_year} 的数据")
-        
-        logger.info("计算同比增长率...")
-        logger.info("计算各月份在半年度中的占比...")
-        logger.info("生成半年度销售趋势分析...")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理半年度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        # 半年度数据通常需要处理多个半年度的历史数据进行比较
-        logger.info("获取过去3年的半年度历史数据进行分析...")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新半年度数据")
-    
-    # 调用实际处理函数
-    result = process_half_yearly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_half_yearly", execution_mode="append") 

+ 0 - 82
dataops_scripts/book_sale_amt_monthly_process.py

@@ -1,82 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_monthly_process")
-
-def process_monthly_book_sales():
-    """处理月度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行月度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按月汇总销售额...")
-        # 模拟处理步骤
-        current_month = datetime.now().strftime("%Y-%m")
-        logger.info(f"正在处理 {current_month} 的数据")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理月度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, update_mode='append', **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        update_mode (str): 更新模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {update_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {update_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if update_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新数据")
-    
-    # 调用实际处理函数
-    result = process_monthly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_monthly", update_mode="append") 

+ 0 - 87
dataops_scripts/book_sale_amt_weekly_process.py

@@ -1,87 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_weekly_process")
-
-def process_weekly_book_sales():
-    """处理周度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行周度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按周汇总销售额...")
-        # 模拟处理步骤
-        today = datetime.now()
-        # 计算本周的开始日期(周一)和结束日期(周日)
-        start_of_week = today - timedelta(days=today.weekday())
-        end_of_week = start_of_week + timedelta(days=6)
-        week_range = f"{start_of_week.strftime('%Y-%m-%d')} 至 {end_of_week.strftime('%Y-%m-%d')}"
-        
-        logger.info(f"正在处理周期 {week_range} 的数据")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理周度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新数据")
-    
-    # 调用实际处理函数
-    result = process_weekly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_weekly", execution_mode="append") 

+ 0 - 88
dataops_scripts/book_sale_amt_yearly_process.py

@@ -1,88 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_yearly_process")
-
-def process_yearly_book_sales():
-    """处理年度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行年度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按年汇总销售额...")
-        # 模拟处理步骤
-        current_year = datetime.now().strftime("%Y")
-        logger.info(f"正在处理 {current_year} 年的数据")
-        
-        logger.info("计算同比增长率...")
-        logger.info("计算各月份占比...")
-        logger.info("生成年度销售趋势分析...")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理年度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        # 年度数据通常需要处理多年的历史数据进行比较
-        logger.info("获取过去5年的历史数据进行分析...")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新年度数据")
-    
-    # 调用实际处理函数
-    result = process_yearly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_yearly", execution_mode="append") 

+ 0 - 91
dataops_scripts/books_total_process.py

@@ -1,91 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_total_process")
-
-def process_book_data(table_name=None, execution_date=None, execution_mode=None, script_name=None):
-    """处理图书数据的示例函数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-    
-    # 使用print输出所有参数
-    print(f"===== 参数信息 (print输出) =====")
-    print(f"table_name: {table_name}")
-    print(f"exec_date: {execution_date}")
-    print(f"execution_mode: {execution_mode}")
-    print(f"script_name: {script_name}")
-    print(f"================================")
-    
-    # 使用logger.info输出所有参数
-    logger.info(f"===== 参数信息 (logger输出) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"exec_date: {execution_date}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"script_name: {script_name}")
-    logger.info(f"================================")
-    
-    return True
-
-def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        exec_date: 执行日期
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"开始执行脚本...")
-    
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"script_name: {script_name}")
-    
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"========================")
-    
-    # 如果没有传入script_name,使用当前脚本名
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-    
-    # 实际调用内部处理函数
-    return process_book_data(
-        table_name=table_name,
-        execution_date=exec_date,
-        execution_mode=execution_mode,
-        script_name=script_name
-    )
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数,传入测试参数
-    run(
-        table_name="books", 
-        execution_mode="full_refresh",
-        exec_date=datetime.now(),
-        script_name=os.path.basename(__file__)
-    )

+ 0 - 112
dataops_scripts/emp_training_stats_table.py

@@ -1,112 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("emp_training_stats_table")
-
-def process_emp_training_stats(table_name=None, exec_date=None, execution_mode=None, script_name=None, **kwargs):
-    """处理员工培训统计数据的模拟函数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (处理函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"======================================")
-
-    logger.info(f"开始执行员工培训统计数据处理 - 脚本: {script_name}, 表: {table_name}")
-
-    try:
-        # 模拟数据处理过程
-        logger.info("模拟处理员工培训统计数据...")
-        # 在实际应用中,这里可以添加具体的数据处理逻辑
-        logger.info(f"处理日期: {exec_date}, 模式: {execution_mode}")
-        
-        # 模拟处理成功
-        logger.info(f"表 {table_name} 数据处理成功")
-        return True
-    except Exception as e:
-        logger.error(f"处理员工培训统计数据时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        exec_date: 执行日期 (可以是字符串 YYYY-MM-DD 或 datetime 对象)
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
-
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
-    logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)})")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"=========================================")
-
-    # 如果没有提供脚本名,使用当前脚本的文件名
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-
-    # 记录详细的执行信息
-    start_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
-
-    # 调用实际处理函数
-    result = process_emp_training_stats(
-        table_name=table_name,
-        exec_date=exec_date,
-        execution_mode=execution_mode,
-        script_name=script_name,
-        **kwargs  # 将额外参数传递给处理函数
-    )
-
-    end_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"总耗时: {end_time - start_time}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-
-    return result
-
-if __name__ == "__main__":
-    # 提供一些默认值以便直接运行脚本进行测试
-    test_params = {
-        "table_name": "emp_training_stats",
-        "execution_mode": "append",
-        "exec_date": datetime.now().strftime('%Y-%m-%d'),
-        "script_name": os.path.basename(__file__),
-        "test_param1": "value1",
-        "test_param2": 123
-    }
-    logger.info(f"以主脚本方式运行,使用测试参数: {test_params}")
-    run(**test_params) 

+ 6 - 0
dataops_scripts/execution_python.py

@@ -381,6 +381,12 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, s
             **kwargs
         }
 
+        # 打印exec_locals的参数列表
+        logger.info("-- 代码片段执行的 exec_locals参数列表 --")
+        for key, value in exec_locals.items():
+            logger.info(f"参数: {key} = {value}")
+
+
         # 安全执行Python片段
         try:
             # 开始执行 Python 片段...

+ 0 - 134
dataops_scripts/load_file_test.py

@@ -1,134 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-import logging
-import sys
-import os
-from datetime import datetime
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("load_file_mock") # 使用 mock 后缀以区分
-
-def mock_load_file(table_name=None, update_mode='append', exec_date=None, 
-                   target_type=None, storage_location=None, frequency=None, script_name=None, **kwargs):
-    """模拟加载文件数据,仅打印参数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (模拟处理函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"update_mode: {update_mode}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"target_type: {target_type}")
-    logger.info(f"storage_location: {storage_location}")
-    logger.info(f"frequency: {frequency}")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"=========================================")
-
-    logger.info(f"开始模拟文件加载 - 脚本: {script_name}, 表: {table_name}")
-
-    try:
-        logger.info("模拟检查参数...")
-        if not storage_location:
-            logger.warning("警告: 未提供 storage_location (文件路径)")
-        else:
-            logger.info(f"模拟检查文件是否存在: {storage_location}")
-
-        logger.info(f"模拟更新模式: {update_mode}")
-        if update_mode == 'full_refresh':
-            logger.info(f"模拟: 如果是全量刷新,将清空表 {table_name}")
-        
-        logger.info("模拟读取和处理文件...")
-        # 模拟成功
-        logger.info(f"模拟: 表 {table_name} 文件加载成功")
-        return True
-    except Exception as e:
-        logger.error(f"模拟加载文件时出错: {str(e)}")
-        return False
-
-def run(table_name, update_mode='append', exec_date=None, target_type=None, 
-        storage_location=None, frequency=None, script_name=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范 (模拟版本)
-
-    参数:
-        table_name (str): 要处理的表名
-        update_mode (str): 更新模式 (append/full_refresh)
-        exec_date: 执行日期
-        target_type: 目标类型
-        storage_location: 文件路径
-        frequency: 更新频率
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
-
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
-    logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"update_mode: {update_mode}")
-    logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)}) ")
-    logger.info(f"target_type: {target_type}")
-    logger.info(f"storage_location: {storage_location}")
-    logger.info(f"frequency: {frequency}")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"=========================================")
-
-    # 如果没有提供脚本名,使用当前脚本的文件名
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-
-    # 记录详细的执行信息
-    start_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' (模拟) 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
-
-    # 调用实际处理函数 (模拟版本)
-    result = mock_load_file(
-        table_name=table_name,
-        update_mode=update_mode,
-        exec_date=exec_date,
-        target_type=target_type,
-        storage_location=storage_location,
-        frequency=frequency,
-        script_name=script_name,
-        **kwargs  # 将额外参数传递给处理函数
-    )
-
-    end_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' (模拟) 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"总耗时: {end_time - start_time}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-
-    return result
-
-if __name__ == "__main__":
-    # 提供一些默认值以便直接运行脚本进行测试
-    test_params = {
-        "table_name": "sample_table",
-        "update_mode": "full_refresh",
-        "exec_date": datetime.now().strftime('%Y-%m-%d'),
-        "target_type": "structure",
-        "storage_location": "/path/to/mock/file.csv",
-        "frequency": "daily",
-        "script_name": os.path.basename(__file__),
-        "custom_param": "abc",
-        "another_param": 456
-    }
-    logger.info(f"以主脚本方式运行 (模拟),使用测试参数: {test_params}")
-    run(**test_params)