Browse Source

从pipeline_prepare删除了与airflow_dag_schedule相关的操作

wangxq 1 month ago
parent
commit
2e8886e1b7
1 changed files with 2 additions and 130 deletions
  1. 2 130
      dags/dag_dataops_pipeline_prepare_scheduler.py

+ 2 - 130
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -236,122 +236,6 @@ def filter_invalid_tables(tables_info):
     
     
     return valid_tables
     return valid_tables
 
 
-def check_if_date_exists(exec_date):
-    """
-    检查指定日期是否已存在于airflow_dag_schedule表中
-    
-    参数:
-        exec_date (str): 执行日期,格式为'YYYY-MM-DD'
-        
-    返回:
-        bool: 如果日期已存在返回True,否则返回False
-    """
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s
-        """, (exec_date,))
-        result = cursor.fetchone()
-        count = result[0] if result else 0
-        exists = count > 0
-        
-        if exists:
-            logger.info(f"执行日期 {exec_date} 已存在于airflow_dag_schedule表中,共有 {count} 条记录")
-        else:
-            logger.info(f"执行日期 {exec_date} 不存在于airflow_dag_schedule表中")
-            
-        return exists
-    except Exception as e:
-        logger.error(f"检查日期 {exec_date} 是否存在时出错: {str(e)}")
-        return False
-    finally:
-        cursor.close()
-        conn.close()
-
-def clear_existing_records(exec_date):
-    """
-    清除指定日期的所有记录
-    
-    参数:
-        exec_date (str): 执行日期,格式为'YYYY-MM-DD'
-        
-    返回:
-        int: 清除的记录数量
-    """
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s
-        """, (exec_date,))
-        result = cursor.fetchone()
-        count = result[0] if result else 0
-        
-        if count > 0:
-            cursor.execute("""
-                DELETE FROM airflow_dag_schedule 
-                WHERE exec_date = %s
-            """, (exec_date,))
-            conn.commit()
-            logger.info(f"已清除日期 {exec_date} 的 {count} 条记录")
-        else:
-            logger.info(f"日期 {exec_date} 没有记录需要清除")
-        
-        return count
-    except Exception as e:
-        logger.error(f"清除日期 {exec_date} 的记录时出错: {str(e)}")
-        conn.rollback()
-        return 0
-    finally:
-        cursor.close()
-        conn.close()
-
-def write_to_airflow_dag_schedule(exec_date, tables_info):
-    """将表信息写入airflow_dag_schedule表"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    
-    try:
-        # 批量插入新数据
-        inserted_count = 0
-        for table in tables_info:
-            cursor.execute("""
-                INSERT INTO airflow_dag_schedule (
-                    exec_date, source_table, target_table, target_table_label,
-                    target_table_status, is_directly_schedule, default_update_frequency,
-                    script_name, script_type, script_exec_mode
-                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
-            """, (
-                exec_date,
-                table.get('source_table'),
-                table['target_table'],
-                table.get('target_table_label'),
-                table.get('target_table_status', True),
-                table.get('is_directly_schedule', False),
-                table.get('default_update_frequency'),
-                table.get('script_name'),
-                table.get('script_type', 'python'),
-                table.get('script_exec_mode', 'append')
-            ))
-            inserted_count += 1
-        
-        conn.commit()
-        logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
-        return inserted_count
-    except Exception as e:
-        logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
-        conn.rollback()
-        # 不要返回0,而是重新抛出异常,确保错误被正确传播
-        raise
-    finally:
-        cursor.close()
-        conn.close()
-
 def touch_data_scheduler_file():
 def touch_data_scheduler_file():
     """
     """
     更新数据调度器DAG文件的修改时间,触发重新解析
     更新数据调度器DAG文件的修改时间,触发重新解析
@@ -541,9 +425,6 @@ def prepare_pipeline_dag_schedule(**kwargs):
         return 0
         return 0
     
     
     # 继续处理,创建新的执行计划
     # 继续处理,创建新的执行计划
-    # 清除数据库中的现有记录
-    clear_existing_records(exec_date)
-    
     # 1. 获取启用的表
     # 1. 获取启用的表
     enabled_tables = get_enabled_tables()
     enabled_tables = get_enabled_tables()
     logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
     logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
@@ -569,16 +450,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
     valid_tables = filter_invalid_tables(enriched_tables)
     valid_tables = filter_invalid_tables(enriched_tables)
     logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
     logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
     
     
-    # 5. 写入airflow_dag_schedule表
-    inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
-    
-    # 6. 检查插入操作是否成功,如果失败则抛出异常
-    if inserted_count == 0 and valid_tables:
-        error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
-        logger.error(error_msg)
-        raise Exception(error_msg)
-    
-    # 7. 保存最新执行计划,供DAG读取使用
+    # 保存最新执行计划,供DAG读取使用
     try:
     try:
         # 构建执行计划
         # 构建执行计划
         resource_tasks = []
         resource_tasks = []
@@ -691,7 +563,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
         # 强制抛出异常,确保任务失败,阻止下游DAG执行
         # 强制抛出异常,确保任务失败,阻止下游DAG执行
         raise Exception(error_msg)
         raise Exception(error_msg)
     
     
-    return inserted_count
+    return len(valid_tables)  # 返回有效表数量代替插入记录数
 
 
 def check_execution_plan_file(**kwargs):
 def check_execution_plan_file(**kwargs):
     """
     """