Bladeren bron

在配置文件中添加schema参数.

wangxq 6 dagen geleden
bovenliggende
commit
7d2659f2bf

+ 3 - 0
dags/config.py

@@ -49,3 +49,6 @@ EXECUTION_PLAN_KEEP_COUNT = 5
 
 # ETL作业幂等性开关
 ENABLE_ETL_IDEMPOTENCY = True
+
+#dataops中的SCHEDULE相关的表 schema
+SCHEDULE_TABLE_SCHEMA = "dags"

+ 16 - 24
dags/dataops_productline_execute_dag.py

@@ -56,10 +56,11 @@ from utils import (
     get_today_date,
     get_cn_exec_date
 )
-from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
+from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG,SCHEDULE_TABLE_SCHEMA
 import pytz
 import pandas as pd
 import sys
+from airflow.utils.trigger_rule import TriggerRule
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
@@ -479,33 +480,24 @@ def execute_python(script_id, script_name, target_table, update_mode, schedule_f
 
 def get_execution_plan_from_db(ds):
     """
-    从数据库获取产品线执行计划
+    从数据库获取特定执行日期的执行计划
     
     参数:
-        ds (str): 执行日期,格式为'YYYY-MM-DD'
+        ds (str): 执行日期,格式为YYYY-MM-DD
         
     返回:
-        dict: 执行计划字典,如果找返回None
+        dict: 执行计划字典,如果找到返回None
     """
-    # 记录输入参数详细信息
-    if isinstance(ds, datetime):
-        if ds.tzinfo:
-            logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 带时区: {ds.tzinfo}")
-        else:
-            logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 无时区")
-    else:
-        logger.debug(f"【执行日期】get_execution_plan_from_db接收到: {ds}, 类型: {type(ds)}")
-    
-    logger.info(f"尝试从数据库获取执行日期 {ds} 的产品线执行计划")
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    execution_plan = None
+    logger.info(f"从数据库获取执行日期 exec_date={ds} 的执行计划")
     
     try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
         # 查询条件a: 当前日期=表的exec_date,如果有多条记录,取logical_date最大的一条
-        cursor.execute("""
+        cursor.execute(f"""
             SELECT plan
-            FROM airflow_exec_plans
+            FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
             WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date = %s
             ORDER BY logical_date DESC
             LIMIT 1
@@ -527,9 +519,9 @@ def get_execution_plan_from_db(ds):
         
         # 查询条件b: 找不到当前日期的记录,查找exec_date<当前ds的最新记录
         logger.info(f"未找到当前日期 exec_date={ds} 的执行计划记录,尝试查找历史记录")
-        cursor.execute("""
+        cursor.execute(f"""
             SELECT plan, exec_date
-            FROM airflow_exec_plans
+            FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
             WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date < %s
             ORDER BY exec_date DESC, logical_date DESC
             LIMIT 1
@@ -646,14 +638,14 @@ def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
             local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
             
             # 插入记录
-            cursor.execute("""
-                INSERT INTO airflow_exec_plans
+            cursor.execute(f"""
+                INSERT INTO {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
                 (dag_id, run_id, logical_date, local_logical_date, exec_date, plan)
                 VALUES (%s, %s, %s, %s, %s, %s)
             """, (dag_id, run_id, logical_date, local_logical_date, ds, plan_json))
             
             conn.commit()
-            logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, exec_date={ds}")
+            logger.info(f"成功将执行计划保存到{SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, exec_date={ds}")
             return True
         except Exception as e:
             logger.error(f"保存执行计划到数据库时出错: {str(e)}")

+ 7 - 3
dags/dataops_productline_manual_trigger_dag.py

@@ -68,6 +68,8 @@ import pendulum
 import pytz
 from utils import get_pg_conn, get_cn_exec_date, check_script_exists, get_complete_script_info, get_table_label
 from airflow.exceptions import AirflowException
+from config import AIRFLOW_BASE_PATH, SCRIPTS_BASE_PATH, SCHEDULE_TABLE_SCHEMA
+import sys
 
 # 设置logger
 logger = logging.getLogger(__name__)
@@ -83,6 +85,8 @@ default_args = {
     'retry_delay': timedelta(minutes=1),
 }
 
+
+
 def get_dag_params(**context):
     """获取DAG运行参数"""
     #params = context.get('params', {})
@@ -542,7 +546,7 @@ def execute_python_script(script_info):
 
 def execute_sql(script_info):
     """
-    执行SQL脚本(从data_transform_scripts表获取)
+    执行SQL脚本(从{SCHEDULE_TABLE_SCHEMA}.data_transform_scripts表获取)
     
     参数:
         script_info: 脚本信息字典
@@ -632,7 +636,7 @@ def execute_sql(script_info):
 
 def execute_python(script_info):
     """
-    执行Python脚本(从data_transform_scripts表获取)
+    执行Python脚本(从{SCHEDULE_TABLE_SCHEMA}.data_transform_scripts表获取)
     
     参数:
         script_info: 脚本信息字典
@@ -650,7 +654,7 @@ def execute_python(script_info):
     exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
     
     # 记录开始执行
-    logger.info(f"===== 开始执行Python脚本(data_transform_scripts): {script_name} =====")
+    logger.info(f"===== 开始执行Python脚本({SCHEDULE_TABLE_SCHEMA}.data_transform_scripts): {script_name} =====")
     logger.info(f"目标表: {target_table}")
     logger.info(f"更新模式: {update_mode}")
     logger.info(f"表标签: {target_table_label}")

+ 65 - 57
dags/dataops_productline_prepare_dag.py

@@ -18,7 +18,8 @@ from utils import (
     get_neo4j_driver,
     get_cn_exec_date
 )
-from config import PG_CONFIG, NEO4J_CONFIG, DATAOPS_DAGS_PATH
+from config import PG_CONFIG, NEO4J_CONFIG, DATAOPS_DAGS_PATH, EXECUTION_PLAN_KEEP_COUNT, AIRFLOW_BASE_PATH, SCHEDULE_TABLE_SCHEMA
+from airflow.utils.trigger_rule import TriggerRule
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
@@ -720,72 +721,79 @@ def touch_product_scheduler_file():
 
 def check_execution_plan_in_db(**kwargs):
     """
-    检查当天的执行计划是否存在于数据库中
-    返回False将阻止所有下游任务执行
+    检查数据库中是否已存在指定DAG运行的执行计划
+    
+    参数:
+        **kwargs: Airflow上下文参数
+        
+    返回:
+        bool: 如果存在执行计划返回True,否则返回False
     """
-    # 获取执行日期
-    dag_run = kwargs.get('dag_run')
-    logger.info(f"This DAG run was triggered via: {dag_run.run_type}")
-    logical_date = dag_run.logical_date    
-    exec_date, local_logical_date = get_cn_exec_date(logical_date)
-    logger.info(f"logical_date: {logical_date} ")
-    logger.info(f"local_logical_date: {local_logical_date} ")
-    logger.info(f"检查执行日期 exec_date: {exec_date} 的执行计划是否存在于数据库中")
-   
-    # 检查数据库中是否存在执行计划
+    # 从上下文中获取dag_id, run_id和执行日期
+    dag_id = kwargs.get('dag_run').dag_id
+    run_id = kwargs.get('dag_run').run_id
+    logical_date = kwargs.get('logical_date')
+    
+    # 转换为本地时间
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    
+    # 获取执行日期字符串
+    ds = kwargs.get('ds')  # YYYY-MM-DD格式
+    
+    logger.info(f"检查数据库中是否存在执行计划")
+    logger.info(f"DAG ID: {dag_id}")
+    logger.info(f"运行ID: {run_id}")
+    logger.info(f"逻辑日期: {logical_date}")
+    logger.info(f"本地逻辑日期: {local_logical_date}")
+    logger.info(f"执行日期: {ds}")
+    
     try:
         conn = get_pg_conn()
         cursor = conn.cursor()
+        
         try:
-            cursor.execute("""
-                SELECT plan
-                FROM airflow_exec_plans
-                WHERE exec_date = %s AND dag_id = 'dataops_productline_prepare_dag'
-                ORDER BY logical_date DESC
-                LIMIT 1
-            """, (exec_date,))
+            # 查询是否存在相应的执行计划
+            cursor.execute(f"""
+                SELECT COUNT(*) 
+                FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans 
+                WHERE dag_id = %s AND run_id = %s AND exec_date = %s
+            """, (dag_id, run_id, ds))
             
             result = cursor.fetchone()
-            if not result:
-                logger.error(f"数据库中不存在执行日期 {exec_date} 的执行计划")
-                return False
+            count = result[0] if result else 0
             
-            # 检查执行计划内容是否有效
-            try:
-                # PostgreSQL的jsonb类型会被psycopg2自动转换为Python字典,无需再使用json.loads
-                plan_data = result[0]            
-                # 检查必要字段
-                if "exec_date" not in plan_data:
-                    logger.error("执行计划缺少exec_date字段")
-                    return False
-                    
-                if not isinstance(plan_data.get("scripts", []), list):
-                    logger.error("执行计划的scripts字段无效")
-                    return False
-                    
-                if not isinstance(plan_data.get("resource_scripts", []), list):
-                    logger.error("执行计划的resource_scripts字段无效")
-                    return False
-
-                if not isinstance(plan_data.get("model_scripts", []), list):
-                    logger.error("执行计划的model_scripts字段无效")
-                    return False
-                
-                # 检查是否有脚本数据
-                scripts = plan_data.get("scripts", [])
-                resource_scripts = plan_data.get("resource_scripts", [])
-                model_scripts = plan_data.get("model_scripts", [])
-                
-                logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本,{len(resource_scripts)} 个资源脚本和 {len(model_scripts)} 个模型脚本")
+            logger.info(f"找到 {count} 条执行计划记录")
+            
+            # 如果存在记录,返回True
+            if count > 0:
+                logger.info(f"数据库中已存在此次运行的执行计划")
                 return True
+            else:
+                logger.info(f"数据库中不存在此次运行的执行计划")
+                
+                # 删除历史执行计划,只保留最近N条
+                if EXECUTION_PLAN_KEEP_COUNT > 0:
+                    cursor.execute(f"""
+                        DELETE FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans 
+                        WHERE dag_id = %s AND exec_date = %s
+                        AND id NOT IN (
+                            SELECT id FROM {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
+                            WHERE dag_id = %s AND exec_date = %s
+                            ORDER BY id DESC
+                            LIMIT {EXECUTION_PLAN_KEEP_COUNT - 1}
+                        )
+                    """, (dag_id, ds, dag_id, ds))
+                    
+                    deleted_rows = cursor.rowcount
+                    conn.commit()
+                    logger.info(f"已删除 {deleted_rows} 条旧的执行计划记录")
                 
-            except Exception as je:
-                logger.error(f"处理执行计划数据时出错: {str(je)}")
                 return False
-            
+                
         except Exception as e:
-            logger.error(f"检查数据库中执行计划时出错: {str(e)}")
-            raise Exception(f"PostgreSQL查询执行计划失败: {str(e)}")
+            logger.error(f"查询执行计划时出错: {str(e)}")
+            conn.rollback()
+            raise Exception(f"查询执行计划失败: {str(e)}")
         finally:
             cursor.close()
             conn.close()
@@ -819,8 +827,8 @@ def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
             local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
             
             # 插入记录
-            cursor.execute("""
-                INSERT INTO airflow_exec_plans
+            cursor.execute(f"""
+                INSERT INTO {SCHEDULE_TABLE_SCHEMA}.airflow_exec_plans
                 (dag_id, run_id, logical_date, local_logical_date, exec_date, plan)
                 VALUES (%s, %s, %s, %s, %s, %s)
             """, (dag_id, run_id, logical_date, local_logical_date, ds, plan_json))

+ 8 - 2
dataops_scripts/execution_python.py

@@ -7,7 +7,13 @@ from datetime import datetime, timedelta
 import psycopg2
 import textwrap
 from airflow.exceptions import AirflowException
+import importlib
+import time
+import traceback
+import re
 
+from script_utils import get_config_param
+SCHEDULE_TABLE_SCHEMA = get_config_param("SCHEDULE_TABLE_SCHEMA")
 # 配置日志
 logging.basicConfig(
     level=logging.INFO,
@@ -73,9 +79,9 @@ def get_python_script(target_table, script_name):
         
         query = """
             SELECT script_content, target_dt_column
-            FROM data_transform_scripts
+            FROM {}.data_transform_scripts
             WHERE target_table = %s AND script_name = %s LIMIT 1
-        """
+        """.format(SCHEDULE_TABLE_SCHEMA)
         
         logger.info(f"执行SQL查询: {query}")
         logger.info(f"查询参数: target_table={target_table}, script_name={script_name}")

+ 10 - 2
dataops_scripts/execution_sql.py

@@ -9,6 +9,14 @@ import jinja2
 # 修改导入方式,避免使用相对导入
 import sys
 import os
+import traceback
+import time
+from datetime import datetime, timedelta
+import pytz
+import re
+
+from script_utils import get_config_param
+SCHEDULE_TABLE_SCHEMA = get_config_param("SCHEDULE_TABLE_SCHEMA")
 
 # 配置日志记录器
 logging.basicConfig(
@@ -97,10 +105,10 @@ def get_script_content(target_table, script_name):
         
         query = """
             SELECT script_content, target_dt_column 
-            FROM data_transform_scripts 
+            FROM {}.data_transform_scripts 
             WHERE target_table = %s AND script_name = %s 
             LIMIT 1
-        """
+        """.format(SCHEDULE_TABLE_SCHEMA)
         
         logger.info(f"执行SQL查询: {query}")
         logger.info(f"查询参数: target_table={target_table}, script_name={script_name}")

+ 7 - 3
dataops_scripts/load_file.py

@@ -5,12 +5,16 @@ import sys
 import os
 import pandas as pd
 import psycopg2
-from datetime import datetime
+from datetime import datetime, timedelta
 import csv
 import glob
 import shutil
 import re
 import argparse
+import psycopg2.extras
+
+from script_utils import get_config_param
+SCHEDULE_TABLE_SCHEMA = get_config_param("SCHEDULE_TABLE_SCHEMA")
 
 # 修改Python导入路径,确保能找到同目录下的script_utils模块
 current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -462,10 +466,10 @@ def run(table_name, update_mode='append', exec_date=None, target_type=None,
                         logger.info(f"查询表 {table_name} 的target_dt_column...")
                         target_dt_query = """
                             SELECT target_dt_column 
-                            FROM data_transform_scripts 
+                            FROM {}.data_transform_scripts 
                             WHERE target_table = %s 
                             LIMIT 1
-                        """
+                        """.format(SCHEDULE_TABLE_SCHEMA)
                         cursor.execute(target_dt_query, (table_name,))
                         result = cursor.fetchone()
                         

+ 24 - 2
dataops_scripts/script_utils.py

@@ -16,6 +16,9 @@ from datetime import datetime, timedelta
 import pytz
 import re  # 添加re模块以支持正则表达式
 
+# 添加导入SCHEDULE_TABLE_SCHEMA
+#from dags.config import SCHEDULE_TABLE_SCHEMA
+
 # 导入Airflow相关包
 try:
     from airflow.models import Variable
@@ -479,10 +482,11 @@ def get_target_dt_column(table_name, script_name=None):
             return None
         
         # 查询data_transform_scripts表
+        schema = get_config_param("SCHEDULE_TABLE_SCHEMA")
         try:
             query = f"""
                 SELECT target_dt_column
-                FROM data_transform_scripts
+                FROM {schema}.data_transform_scripts
                 WHERE target_table = '{table_name}'
             """
             if script_name:
@@ -511,4 +515,22 @@ def get_target_dt_column(table_name, script_name=None):
         return None
     finally:
         if 'driver' in locals() and driver:
-            driver.close()
+            driver.close()
+
+def get_config_param(param_name, default_value=None):
+    """
+    从config模块动态获取配置参数
+    
+    参数:
+        param_name (str): 参数名
+        default_value: 默认值
+        
+    返回:
+        参数值,如果不存在则返回默认值
+    """
+    try:
+        config_module = load_config_module()
+        return getattr(config_module, param_name)
+    except Exception as e:
+        logger.warning(f"获取配置参数 {param_name} 失败: {str(e)},使用默认值: {default_value}")
+        return default_value