Преглед изворни кода

pipeline_prepare 同时把plan写成json文件和数据库表

wangxq пре 1 месец
родитељ
комит
935efc4d4b
1 измењених фајлова са 71 додато и 0 уклоњено
  1. 71 0
      dags/dag_dataops_pipeline_prepare_scheduler.py

+ 71 - 0
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -368,6 +368,45 @@ def cleanup_old_execution_plans(keep_count=None):
     
     return deleted_count
 
+def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
+    """
+    将执行计划保存到airflow_exec_plans表
+    
+    参数:
+        execution_plan (dict): 执行计划字典
+        dag_id (str): DAG的ID
+        run_id (str): DAG运行的ID
+        logical_date (datetime): 逻辑日期
+        ds (str): 日期字符串,格式为YYYY-MM-DD
+    
+    返回:
+        bool: 操作是否成功
+    """
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    
+    try:
+        # 将执行计划转换为JSON字符串
+        plan_json = json.dumps(execution_plan)
+        
+        # 插入记录
+        cursor.execute("""
+            INSERT INTO airflow_exec_plans
+            (dag_id, run_id, logical_date, ds, plan)
+            VALUES (%s, %s, %s, %s, %s)
+        """, (dag_id, run_id, logical_date, ds, plan_json))
+        
+        conn.commit()
+        logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, ds={ds}")
+        return True
+    except Exception as e:
+        logger.error(f"保存执行计划到数据库时出错: {str(e)}")
+        conn.rollback()
+        return False
+    finally:
+        cursor.close()
+        conn.close()
+
 def prepare_pipeline_dag_schedule(**kwargs):
     """准备Pipeline DAG调度任务的主函数"""
     # 检查是否是手动触发模式
@@ -546,6 +585,38 @@ def prepare_pipeline_dag_schedule(**kwargs):
             # dag_dataops_pipeline_data_scheduler.py文件的修改日期更新
             touch_data_scheduler_file()
             
+            # 保存执行计划到数据库表
+            try:
+                # 获取DAG运行信息
+                dag_run = kwargs.get('dag_run')
+                if dag_run:
+                    dag_id = dag_run.dag_id
+                    run_id = dag_run.run_id
+                    logical_date = dag_run.logical_date
+                else:
+                    # 如果无法获取dag_run,使用默认值
+                    dag_id = kwargs.get('dag').dag_id if 'dag' in kwargs else "dag_dataops_pipeline_prepare_scheduler"
+                    run_id = f"manual_{datetime.now().strftime('%Y%m%d%H%M%S')}"
+                    logical_date = datetime.now()
+                
+                # 保存到数据库
+                save_result = save_execution_plan_to_db(
+                    execution_plan=execution_plan,
+                    dag_id=dag_id,
+                    run_id=run_id,
+                    logical_date=logical_date,
+                    ds=exec_date
+                )
+                
+                if save_result:
+                    logger.info("执行计划已成功保存到数据库")
+                else:
+                    logger.warning("执行计划保存到数据库失败,但文件已保存成功")
+            except Exception as db_e:
+                # 捕获数据库保存错误,但不影响主流程
+                logger.error(f"保存执行计划到数据库时出错: {str(db_e)}")
+                logger.info("继续执行,因为文件已成功保存")
+            
         except Exception as e:
             logger.error(f"保存执行计划文件或触发DAG重新解析时出错: {str(e)}")
             # 出错时清理临时文件