2 Commits 7d2659f2bf ... 7e96cefe90

Author SHA1 Message Date
  wangxq 7e96cefe90 增加一个用于路径测试的DAG. 5 days ago
  wangxq 112c1a543a 给execution_python.py增加参数输出 5 days ago

+ 0 - 0
dags/__init__.py


+ 26 - 0
dags/check_sys_path_dag.py

@@ -0,0 +1,26 @@
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from datetime import datetime
+import sys
+
+def print_sys_path():
+    print("=== sys.path 内容如下 ===")
+    for path in sys.path:
+        print(path)
+
+default_args = {
+    'start_date': datetime(2025, 1, 1),
+}
+
+with DAG(
+    dag_id='check_sys_path_dag',
+    default_args=default_args,
+    schedule_interval=None,
+    catchup=False,
+    tags=['debug'],
+) as dag:
+
+    task_print_path = PythonOperator(
+        task_id='print_sys_path',
+        python_callable=print_sys_path
+    )

+ 3 - 2
dags/dataops_productline_execute_dag.py

@@ -47,9 +47,10 @@ from airflow.models import Variable
 import logging
 import logging
 import networkx as nx
 import networkx as nx
 import json
 import json
-import os
 import pendulum
 import pendulum
 from decimal import Decimal
 from decimal import Decimal
+import os
+import sys
 from utils import (
 from utils import (
     get_pg_conn, 
     get_pg_conn, 
     get_neo4j_driver,
     get_neo4j_driver,
@@ -59,7 +60,7 @@ from utils import (
 from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG,SCHEDULE_TABLE_SCHEMA
 from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG,SCHEDULE_TABLE_SCHEMA
 import pytz
 import pytz
 import pandas as pd
 import pandas as pd
-import sys
+
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.trigger_rule import TriggerRule
 
 
 # 创建日志记录器
 # 创建日志记录器

+ 4 - 1
dags/dataops_productline_finalize_dag.py

@@ -15,11 +15,14 @@ import logging
 import json
 import json
 import pendulum
 import pendulum
 import pytz
 import pytz
+import os
+import sys
+from utils import get_today_date
 from airflow.models import DagRun, TaskInstance
 from airflow.models import DagRun, TaskInstance
 from airflow.utils.state import State
 from airflow.utils.state import State
 from sqlalchemy import desc
 from sqlalchemy import desc
 from airflow import settings
 from airflow import settings
-from utils import get_today_date
+
 from decimal import Decimal
 from decimal import Decimal
 
 
 # 创建日志记录器
 # 创建日志记录器

+ 2 - 5
dags/dataops_productline_manual_trigger_dag.py

@@ -62,14 +62,12 @@ from neo4j import GraphDatabase
 import psycopg2
 import psycopg2
 import networkx as nx
 import networkx as nx
 import json
 import json
-from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG
 import traceback
 import traceback
 import pendulum
 import pendulum
-import pytz
+import sys
 from utils import get_pg_conn, get_cn_exec_date, check_script_exists, get_complete_script_info, get_table_label
 from utils import get_pg_conn, get_cn_exec_date, check_script_exists, get_complete_script_info, get_table_label
 from airflow.exceptions import AirflowException
 from airflow.exceptions import AirflowException
-from config import AIRFLOW_BASE_PATH, SCRIPTS_BASE_PATH, SCHEDULE_TABLE_SCHEMA
-import sys
+from config import AIRFLOW_BASE_PATH, NEO4J_CONFIG, SCRIPTS_BASE_PATH, SCHEDULE_TABLE_SCHEMA
 
 
 # 设置logger
 # 设置logger
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
@@ -1071,7 +1069,6 @@ with DAG(
     description='script_name和target_table可以二选一,支持三种依赖级别:self(仅当前表或脚本)、resource(到Resource层)、source(到Source层)',
     description='script_name和target_table可以二选一,支持三种依赖级别:self(仅当前表或脚本)、resource(到Resource层)、source(到Source层)',
     schedule_interval=None,  # 设置为None表示只能手动触发
     schedule_interval=None,  # 设置为None表示只能手动触发
     catchup=False,
     catchup=False,
-    is_paused_upon_creation=False,
     params={
     params={
         "script_name": "",
         "script_name": "",
         "target_table": "",
         "target_table": "",

+ 12 - 7
dags/dataops_productline_prepare_dag.py

@@ -13,6 +13,7 @@ import glob
 from pathlib import Path
 from pathlib import Path
 import hashlib
 import hashlib
 import pendulum
 import pendulum
+import sys
 from utils import (
 from utils import (
     get_pg_conn, 
     get_pg_conn, 
     get_neo4j_driver,
     get_neo4j_driver,
@@ -774,15 +775,19 @@ def check_execution_plan_in_db(**kwargs):
                 # 删除历史执行计划,只保留最近N条
                 # 删除历史执行计划,只保留最近N条
                 if EXECUTION_PLAN_KEEP_COUNT > 0:
                 if EXECUTION_PLAN_KEEP_COUNT > 0:
                     cursor.execute(f"""
                     cursor.execute(f"""
-                        DELETE FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans 
-                        WHERE dag_id = %s AND exec_date = %s
-                        AND id NOT IN (
-                            SELECT id FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
+                        WITH to_keep AS (
+                            SELECT dag_id, run_id, exec_date, insert_time
+                            FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
                             WHERE dag_id = %s AND exec_date = %s
                             WHERE dag_id = %s AND exec_date = %s
-                            ORDER BY id DESC
-                            LIMIT {EXECUTION_PLAN_KEEP_COUNT - 1}
+                            ORDER BY insert_time DESC
+                            LIMIT %s
+                        )
+                        DELETE FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
+                        WHERE dag_id = %s AND exec_date = %s
+                        AND (dag_id, run_id, exec_date, insert_time) NOT IN (
+                            SELECT dag_id, run_id, exec_date, insert_time FROM to_keep
                         )
                         )
-                    """, (dag_id, ds, dag_id, ds))
+                    """, (dag_id, ds, EXECUTION_PLAN_KEEP_COUNT, dag_id, ds))
                     
                     
                     deleted_rows = cursor.rowcount
                     deleted_rows = cursor.rowcount
                     conn.commit()
                     conn.commit()

+ 3 - 2
dags/utils.py

@@ -1,12 +1,13 @@
 # utils.py
 # utils.py
 import psycopg2
 import psycopg2
-from neo4j import GraphDatabase
+import os
+import sys
 from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
 from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
+from neo4j import GraphDatabase
 import logging
 import logging
 import importlib.util
 import importlib.util
 from pathlib import Path
 from pathlib import Path
 import networkx as nx
 import networkx as nx
-import os
 from airflow.exceptions import AirflowFailException
 from airflow.exceptions import AirflowFailException
 from datetime import datetime, timedelta, date
 from datetime import datetime, timedelta, date
 import functools
 import functools

+ 0 - 97
dataops_scripts/book_sale_amt_2weekly_process.py

@@ -1,97 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_2weekly_process")
-
-def process_2weekly_book_sales():
-    """处理双周图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行双周图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按双周期汇总销售额...")
-        # 模拟处理步骤
-        today = datetime.now()
-        # 计算当前所在的周数
-        week_number = int(today.strftime("%U"))
-        # 计算当前双周期
-        biweekly_number = (week_number // 2) + 1
-        # 计算双周期的开始和结束日期
-        start_day = datetime.strptime(f"{today.year}-W{biweekly_number*2-1}-1", "%Y-W%W-%w")
-        end_day = start_day + timedelta(days=13)  # 两周共14天
-        date_range = f"{start_day.strftime('%Y-%m-%d')} 至 {end_day.strftime('%Y-%m-%d')}"
-        
-        logger.info(f"正在处理第 {biweekly_number} 个双周期 ({date_range}) 的数据")
-        
-        logger.info("汇总统计双周期内每天的销售数据...")
-        logger.info("计算与上一个双周期的对比...")
-        logger.info("计算热销书籍排行榜...")
-        logger.info("生成双周期销售趋势分析...")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理双周图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        logger.info("获取过去12个双周期的历史数据进行分析...")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新双周期数据")
-    
-    # 调用实际处理函数
-    result = process_2weekly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_2weekly", execution_mode="append") 

+ 0 - 92
dataops_scripts/book_sale_amt_2yearly_process.py

@@ -1,92 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_2yearly_process")
-
-def process_2yearly_book_sales():
-    """处理两年度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行两年度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按两年周期汇总销售额...")
-        # 模拟处理步骤
-        current_year = int(datetime.now().strftime("%Y"))
-        # 计算当前两年周期
-        cycle_start = current_year - (current_year % 2)
-        cycle_end = cycle_start + 1
-        logger.info(f"正在处理 {cycle_start}-{cycle_end} 两年周期的数据")
-        
-        logger.info("计算与上一个两年周期的对比...")
-        logger.info("计算每年在两年周期中的占比...")
-        logger.info("生成两年周期销售趋势分析...")
-        logger.info("生成中长期销售预测...")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理两年度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        # 两年周期数据通常需要处理多个周期的历史数据进行比较
-        logger.info("获取过去4个两年周期的历史数据进行分析...")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新两年周期数据")
-    
-    # 调用实际处理函数
-    result = process_2yearly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_2yearly", execution_mode="append") 

+ 0 - 166
dataops_scripts/book_sale_amt_daily_clean.py

@@ -1,166 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-import time
-import random
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_daily_clean")
-
-def clean_daily_book_sales(table_name=None, exec_date=None, execution_mode=None, script_name=None):
-    """清洗日度图书销售额数据的函数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-    
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (处理函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"script_name: {script_name}")
-    logger.info(f"======================================")
-    
-    logger.info(f"开始执行日度图书销售额数据清洗 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("执行数据清洗流程...")
-        
-        # 尝试使用传入的日期,如果没有则使用昨天
-        if exec_date:
-            if isinstance(exec_date, str):
-                try:
-                    date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
-                    date_str = exec_date
-                except ValueError:
-                    today = datetime.now()
-                    yesterday = today - timedelta(days=1)
-                    date_str = yesterday.strftime('%Y-%m-%d')
-                    logger.warning(f"无法解析传入的exec_date: {exec_date},使用昨天日期: {date_str}")
-            else:
-                try:
-                    date_str = exec_date.strftime('%Y-%m-%d')
-                except:
-                    today = datetime.now()
-                    yesterday = today - timedelta(days=1)
-                    date_str = yesterday.strftime('%Y-%m-%d')
-                    logger.warning(f"无法格式化传入的exec_date,使用昨天日期: {date_str}")
-        else:
-            today = datetime.now()
-            yesterday = today - timedelta(days=1)
-            date_str = yesterday.strftime('%Y-%m-%d')
-            logger.info(f"未传入exec_date,使用昨天日期: {date_str}")
-        
-        logger.info(f"正在清洗 {date_str} 的数据")
-        
-        logger.info("检查数据完整性...")
-        logger.info("检测并处理异常值...")
-        logger.info("填充缺失数据...")
-        logger.info("标准化数据格式...")
-        logger.info("去除重复记录...")
-        
-        logger.info("数据清洗完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        # 模拟处理时间
-        processing_time = random.uniform(0.5, 2.0)
-        logger.info(f"开始处理数据,预计需要 {processing_time:.2f} 秒")
-        time.sleep(processing_time)
-        
-        # 模拟处理逻辑
-        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-        logger.info(f"数据处理中... 当前时间: {current_time}")
-        
-        # 模拟数据清洗操作
-        logger.info(f"执行数据清洗操作: 移除异常值、填充缺失值、标准化格式")
-        time.sleep(processing_time)
-        
-        # 模拟写入数据库
-        success_rate = random.random()
-        if success_rate > 0.1:  # 90%的成功率
-            logger.info(f"表 {table_name} 数据清洗成功,处理日期: {date_str}")
-            return True
-        else:
-            logger.error(f"表 {table_name} 数据清洗或写入过程中出现随机错误")
-            return False
-    except Exception as e:
-        logger.error(f"清洗日度图书销售额数据时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        exec_date: 执行日期
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (入口函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"script_name: {script_name}")
-    
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"======================================")
-    
-    # 如果没有提供脚本名,使用当前脚本的文件名
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        logger.info("获取过去30天的历史数据进行清洗...")
-    else:  # append
-        logger.info("执行增量模式 - 只清洗最新一天的数据")
-    
-    # 调用实际处理函数
-    result = clean_daily_book_sales(
-        table_name=table_name, 
-        exec_date=exec_date,
-        execution_mode=execution_mode, 
-        script_name=script_name
-    )
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数,带上所有参数作为测试
-    run(
-        table_name="book_sale_amt_daily", 
-        execution_mode="append",
-        exec_date=datetime.now().strftime('%Y-%m-%d'),
-        script_name=os.path.basename(__file__)
-    ) 

+ 0 - 90
dataops_scripts/book_sale_amt_half_yearly_process.py

@@ -1,90 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_half_yearly_process")
-
-def process_half_yearly_book_sales():
-    """处理半年度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行半年度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按半年汇总销售额...")
-        # 模拟处理步骤
-        current_year = datetime.now().strftime("%Y")
-        current_month = datetime.now().month
-        half_year = "上半年" if current_month <= 6 else "下半年"
-        logger.info(f"正在处理 {current_year} 年 {half_year} 的数据")
-        
-        logger.info("计算同比增长率...")
-        logger.info("计算各月份在半年度中的占比...")
-        logger.info("生成半年度销售趋势分析...")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理半年度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        # 半年度数据通常需要处理多个半年度的历史数据进行比较
-        logger.info("获取过去3年的半年度历史数据进行分析...")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新半年度数据")
-    
-    # 调用实际处理函数
-    result = process_half_yearly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_half_yearly", execution_mode="append") 

+ 0 - 82
dataops_scripts/book_sale_amt_monthly_process.py

@@ -1,82 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_monthly_process")
-
-def process_monthly_book_sales():
-    """处理月度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行月度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按月汇总销售额...")
-        # 模拟处理步骤
-        current_month = datetime.now().strftime("%Y-%m")
-        logger.info(f"正在处理 {current_month} 的数据")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理月度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, update_mode='append', **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        update_mode (str): 更新模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {update_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {update_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if update_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新数据")
-    
-    # 调用实际处理函数
-    result = process_monthly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_monthly", update_mode="append") 

+ 0 - 87
dataops_scripts/book_sale_amt_weekly_process.py

@@ -1,87 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_weekly_process")
-
-def process_weekly_book_sales():
-    """处理周度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行周度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按周汇总销售额...")
-        # 模拟处理步骤
-        today = datetime.now()
-        # 计算本周的开始日期(周一)和结束日期(周日)
-        start_of_week = today - timedelta(days=today.weekday())
-        end_of_week = start_of_week + timedelta(days=6)
-        week_range = f"{start_of_week.strftime('%Y-%m-%d')} 至 {end_of_week.strftime('%Y-%m-%d')}"
-        
-        logger.info(f"正在处理周期 {week_range} 的数据")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理周度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新数据")
-    
-    # 调用实际处理函数
-    result = process_weekly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_weekly", execution_mode="append") 

+ 0 - 88
dataops_scripts/book_sale_amt_yearly_process.py

@@ -1,88 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime, timedelta
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_sale_amt_yearly_process")
-
-def process_yearly_book_sales():
-    """处理年度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    logger.info(f"开始执行年度图书销售额处理 - 脚本: {script_name}")
-    
-    try:
-        # 模拟数据处理过程
-        logger.info("从数据源获取原始销售数据...")
-        # 实际应用中这里会连接到数据库或其他数据源
-        
-        logger.info("按年汇总销售额...")
-        # 模拟处理步骤
-        current_year = datetime.now().strftime("%Y")
-        logger.info(f"正在处理 {current_year} 年的数据")
-        
-        logger.info("计算同比增长率...")
-        logger.info("计算各月份占比...")
-        logger.info("生成年度销售趋势分析...")
-        
-        logger.info("数据处理完成,准备保存结果...")
-        # 实际应用中这里会将结果保存到数据库
-        
-        return True
-    except Exception as e:
-        logger.error(f"处理年度图书销售额时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
-    
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    
-    # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
-    logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    
-    # 根据执行模式判断处理逻辑
-    if execution_mode == "full_refresh":
-        logger.info("执行完全刷新模式 - 将处理所有历史数据")
-        # 年度数据通常需要处理多年的历史数据进行比较
-        logger.info("获取过去5年的历史数据进行分析...")
-    else:  # append
-        logger.info("执行增量模式 - 只处理最新年度数据")
-    
-    # 调用实际处理函数
-    result = process_yearly_book_sales()
-    
-    logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-    
-    return result
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_yearly", execution_mode="append") 

+ 0 - 91
dataops_scripts/books_total_process.py

@@ -1,91 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("book_total_process")
-
-def process_book_data(table_name=None, execution_date=None, execution_mode=None, script_name=None):
-    """处理图书数据的示例函数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-    
-    # 使用print输出所有参数
-    print(f"===== 参数信息 (print输出) =====")
-    print(f"table_name: {table_name}")
-    print(f"exec_date: {execution_date}")
-    print(f"execution_mode: {execution_mode}")
-    print(f"script_name: {script_name}")
-    print(f"================================")
-    
-    # 使用logger.info输出所有参数
-    logger.info(f"===== 参数信息 (logger输出) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"exec_date: {execution_date}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"script_name: {script_name}")
-    logger.info(f"================================")
-    
-    return True
-
-def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-    
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        exec_date: 执行日期
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
-    
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    logger.info(f"开始执行脚本...")
-    
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"script_name: {script_name}")
-    
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"========================")
-    
-    # 如果没有传入script_name,使用当前脚本名
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-    
-    # 实际调用内部处理函数
-    return process_book_data(
-        table_name=table_name,
-        execution_date=exec_date,
-        execution_mode=execution_mode,
-        script_name=script_name
-    )
-
-if __name__ == "__main__":
-    # 直接执行时调用统一入口函数,传入测试参数
-    run(
-        table_name="books", 
-        execution_mode="full_refresh",
-        exec_date=datetime.now(),
-        script_name=os.path.basename(__file__)
-    )

+ 0 - 112
dataops_scripts/emp_training_stats_table.py

@@ -1,112 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import logging
-import sys
-import os
-from datetime import datetime
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("emp_training_stats_table")
-
-def process_emp_training_stats(table_name=None, exec_date=None, execution_mode=None, script_name=None, **kwargs):
-    """处理员工培训统计数据的模拟函数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (处理函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"======================================")
-
-    logger.info(f"开始执行员工培训统计数据处理 - 脚本: {script_name}, 表: {table_name}")
-
-    try:
-        # 模拟数据处理过程
-        logger.info("模拟处理员工培训统计数据...")
-        # 在实际应用中,这里可以添加具体的数据处理逻辑
-        logger.info(f"处理日期: {exec_date}, 模式: {execution_mode}")
-        
-        # 模拟处理成功
-        logger.info(f"表 {table_name} 数据处理成功")
-        return True
-    except Exception as e:
-        logger.error(f"处理员工培训统计数据时出错: {str(e)}")
-        return False
-
-def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范
-
-    参数:
-        table_name (str): 要处理的表名
-        execution_mode (str): 执行模式 (append/full_refresh)
-        exec_date: 执行日期 (可以是字符串 YYYY-MM-DD 或 datetime 对象)
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
-
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
-    logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"execution_mode: {execution_mode}")
-    logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)})")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"=========================================")
-
-    # 如果没有提供脚本名,使用当前脚本的文件名
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-
-    # 记录详细的执行信息
-    start_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
-
-    # 调用实际处理函数
-    result = process_emp_training_stats(
-        table_name=table_name,
-        exec_date=exec_date,
-        execution_mode=execution_mode,
-        script_name=script_name,
-        **kwargs  # 将额外参数传递给处理函数
-    )
-
-    end_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"总耗时: {end_time - start_time}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-
-    return result
-
-if __name__ == "__main__":
-    # 提供一些默认值以便直接运行脚本进行测试
-    test_params = {
-        "table_name": "emp_training_stats",
-        "execution_mode": "append",
-        "exec_date": datetime.now().strftime('%Y-%m-%d'),
-        "script_name": os.path.basename(__file__),
-        "test_param1": "value1",
-        "test_param2": 123
-    }
-    logger.info(f"以主脚本方式运行,使用测试参数: {test_params}")
-    run(**test_params) 

+ 6 - 0
dataops_scripts/execution_python.py

@@ -381,6 +381,12 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, s
             **kwargs
             **kwargs
         }
         }
 
 
+        # 打印exec_locals的参数列表
+        logger.info("-- 代码片段执行的 exec_locals参数列表 --")
+        for key, value in exec_locals.items():
+            logger.info(f"参数: {key} = {value}")
+
+
         # 安全执行Python片段
         # 安全执行Python片段
         try:
         try:
             # 开始执行 Python 片段...
             # 开始执行 Python 片段...

+ 0 - 134
dataops_scripts/load_file_test.py

@@ -1,134 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-import logging
-import sys
-import os
-from datetime import datetime
-
-# 配置日志记录器
-logging.basicConfig(
-    level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    handlers=[
-        logging.StreamHandler(sys.stdout)
-    ]
-)
-
-logger = logging.getLogger("load_file_mock") # 使用 mock 后缀以区分
-
-def mock_load_file(table_name=None, update_mode='append', exec_date=None, 
-                   target_type=None, storage_location=None, frequency=None, script_name=None, **kwargs):
-    """模拟加载文件数据,仅打印参数"""
-    # 获取当前脚本的文件名(如果没有传入)
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-
-    # 打印所有传入的参数
-    logger.info(f"===== 传入参数信息 (模拟处理函数内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"update_mode: {update_mode}")
-    logger.info(f"exec_date: {exec_date}")
-    logger.info(f"target_type: {target_type}")
-    logger.info(f"storage_location: {storage_location}")
-    logger.info(f"frequency: {frequency}")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"=========================================")
-
-    logger.info(f"开始模拟文件加载 - 脚本: {script_name}, 表: {table_name}")
-
-    try:
-        logger.info("模拟检查参数...")
-        if not storage_location:
-            logger.warning("警告: 未提供 storage_location (文件路径)")
-        else:
-            logger.info(f"模拟检查文件是否存在: {storage_location}")
-
-        logger.info(f"模拟更新模式: {update_mode}")
-        if update_mode == 'full_refresh':
-            logger.info(f"模拟: 如果是全量刷新,将清空表 {table_name}")
-        
-        logger.info("模拟读取和处理文件...")
-        # 模拟成功
-        logger.info(f"模拟: 表 {table_name} 文件加载成功")
-        return True
-    except Exception as e:
-        logger.error(f"模拟加载文件时出错: {str(e)}")
-        return False
-
-def run(table_name, update_mode='append', exec_date=None, target_type=None, 
-        storage_location=None, frequency=None, script_name=None, **kwargs):
-    """
-    统一入口函数,符合Airflow动态脚本调用规范 (模拟版本)
-
-    参数:
-        table_name (str): 要处理的表名
-        update_mode (str): 更新模式 (append/full_refresh)
-        exec_date: 执行日期
-        target_type: 目标类型
-        storage_location: 文件路径
-        frequency: 更新频率
-        script_name: 脚本名称
-        **kwargs: 其他可能的参数
-
-    返回:
-        bool: 执行成功返回True,否则返回False
-    """
-    # 打印所有传入的参数 (在入口函数再次打印,确保信息完整)
-    logger.info(f"===== 传入参数信息 (入口函数 run 内) =====")
-    logger.info(f"table_name: {table_name}")
-    logger.info(f"update_mode: {update_mode}")
-    logger.info(f"exec_date: {exec_date} (类型: {type(exec_date)}) ")
-    logger.info(f"target_type: {target_type}")
-    logger.info(f"storage_location: {storage_location}")
-    logger.info(f"frequency: {frequency}")
-    logger.info(f"script_name: {script_name}")
-    # 打印所有可能的额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}")
-    logger.info(f"=========================================")
-
-    # 如果没有提供脚本名,使用当前脚本的文件名
-    if script_name is None:
-        script_name = os.path.basename(__file__)
-
-    # 记录详细的执行信息
-    start_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' (模拟) 开始执行: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
-
-    # 调用实际处理函数 (模拟版本)
-    result = mock_load_file(
-        table_name=table_name,
-        update_mode=update_mode,
-        exec_date=exec_date,
-        target_type=target_type,
-        storage_location=storage_location,
-        frequency=frequency,
-        script_name=script_name,
-        **kwargs  # 将额外参数传递给处理函数
-    )
-
-    end_time = datetime.now()
-    logger.info(f"脚本 '{script_name}' (模拟) 结束执行: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
-    logger.info(f"总耗时: {end_time - start_time}")
-    logger.info(f"处理结果: {'成功' if result else '失败'}")
-
-    return result
-
-if __name__ == "__main__":
-    # 提供一些默认值以便直接运行脚本进行测试
-    test_params = {
-        "table_name": "sample_table",
-        "update_mode": "full_refresh",
-        "exec_date": datetime.now().strftime('%Y-%m-%d'),
-        "target_type": "structure",
-        "storage_location": "/path/to/mock/file.csv",
-        "frequency": "daily",
-        "script_name": os.path.basename(__file__),
-        "custom_param": "abc",
-        "another_param": 456
-    }
-    logger.info(f"以主脚本方式运行 (模拟),使用测试参数: {test_params}")
-    run(**test_params)