Explorar o código

pipeline_data不再依赖json文件

wangxq hai 1 mes
pai
achega
15bb995811
Modificáronse 1 ficheiros con 273 adicións e 261 borrados
  1. 273 261
      dags/dag_dataops_pipeline_data_scheduler.py

+ 273 - 261
dags/dag_dataops_pipeline_data_scheduler.py

@@ -499,21 +499,6 @@ def prepare_dag_schedule(**kwargs):
     kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
     logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
     
-    # 保存执行计划到文件
-    try:
-        plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-        with open(plan_path, 'w') as f:
-            json.dump(execution_plan, f, default=json_serial, indent=2)
-        logger.info(f"将执行计划保存到文件: {plan_path}")
-        
-        # 添加ready标记文件
-        ready_path = f"{plan_path}.ready"
-        with open(ready_path, 'w') as f:
-            f.write(datetime.now().isoformat())
-        logger.info(f"已创建ready标记文件: {ready_path}")
-    except Exception as file_e:
-        logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
-    
     return inserted_count
 
 def check_execution_plan_file(**kwargs):
@@ -732,43 +717,12 @@ def create_execution_plan(**kwargs):
                 "dependencies": dependencies
             }
             
-            # 保存执行计划
+            # 保存执行计划到XCom
             kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(new_execution_plan, default=json_serial))
             logger.info(f"创建新的执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
             
-            # 保存执行计划到文件
-            try:
-                plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-                with open(plan_path, 'w') as f:
-                    json.dump(new_execution_plan, f, default=json_serial, indent=2)
-                logger.info(f"将执行计划保存到文件: {plan_path}")
-                
-                # 创建ready标记文件
-                ready_path = f"{plan_path}.ready"
-                with open(ready_path, 'w') as f:
-                    f.write(datetime.now().isoformat())
-                logger.info(f"已创建ready标记文件: {ready_path}")
-            except Exception as file_e:
-                logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
-            
             return json.dumps(new_execution_plan, default=json_serial)
         
-        # 如果从XCom获取到了执行计划,也保存到文件
-        try:
-            plan_json = json.loads(execution_plan) if isinstance(execution_plan, str) else execution_plan
-            plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-            with open(plan_path, 'w') as f:
-                json.dump(plan_json, f, default=json_serial, indent=2)
-            logger.info(f"将从XCom获取的执行计划保存到文件: {plan_path}")
-            
-            # 创建ready标记文件
-            ready_path = f"{plan_path}.ready"
-            with open(ready_path, 'w') as f:
-                f.write(datetime.now().isoformat())
-            logger.info(f"已创建ready标记文件: {ready_path}")
-        except Exception as file_e:
-            logger.error(f"保存从XCom获取的执行计划到文件时出错: {str(file_e)}")
-        
         logger.info(f"成功获取执行计划")
         return execution_plan
     except Exception as e:
@@ -781,21 +735,6 @@ def create_execution_plan(**kwargs):
             "dependencies": {}
         }
         
-        # 尝试保存空执行计划到文件
-        try:
-            plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-            with open(plan_path, 'w') as f:
-                json.dump(empty_plan, f, default=json_serial, indent=2)
-            logger.info(f"将空执行计划保存到文件: {plan_path}")
-            
-            # 创建ready标记文件
-            ready_path = f"{plan_path}.ready"
-            with open(ready_path, 'w') as f:
-                f.write(datetime.now().isoformat())
-            logger.info(f"已创建ready标记文件: {ready_path}")
-        except Exception as file_e:
-            logger.error(f"保存空执行计划到文件时出错: {str(file_e)}")
-            
         return json.dumps(empty_plan, default=json_serial)
 
 def process_resource(target_table, script_name, script_exec_mode, exec_date):
@@ -1332,6 +1271,83 @@ def summarize_execution(**kwargs):
         # 返回一个简单的错误报告,确保任务不会失败
         return f"执行汇总时出现错误: {str(e)}"
 
+# 添加新函数,用于从数据库获取执行计划
+def get_execution_plan_from_db(ds):
+    """
+    从数据库airflow_exec_plans表中获取执行计划
+    
+    参数:
+        ds (str): 执行日期,格式为'YYYY-MM-DD'
+        
+    返回:
+        dict: 执行计划字典,如果找不到则返回None
+    """
+    logger.info(f"尝试从数据库获取执行日期 {ds} 的执行计划")
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    execution_plan = None
+    
+    try:
+        # 查询条件a: 当前日期=表的ds,如果有多条记录,取insert_time最大的一条
+        cursor.execute("""
+            SELECT plan, run_id, insert_time
+            FROM airflow_exec_plans
+            WHERE dag_id = 'dag_dataops_pipeline_prepare_scheduler' AND ds = %s
+            ORDER BY insert_time DESC
+            LIMIT 1
+        """, (ds,))
+        result = cursor.fetchone()
+        
+        if result:
+            # 获取计划、run_id和insert_time
+            plan_json, run_id, insert_time = result
+            logger.info(f"找到当前日期 ds={ds} 的执行计划记录,run_id: {run_id}, insert_time: {insert_time}")
+            
+            # 处理plan_json可能已经是dict的情况
+            if isinstance(plan_json, dict):
+                execution_plan = plan_json
+            else:
+                execution_plan = json.loads(plan_json)
+                
+            return execution_plan
+        
+        # 查询条件b: 找不到当前日期的记录,查找ds<当前ds的最新记录
+        logger.info(f"未找到当前日期 ds={ds} 的执行计划记录,尝试查找历史记录")
+        cursor.execute("""
+            SELECT plan, run_id, insert_time, ds
+            FROM airflow_exec_plans
+            WHERE dag_id = 'dag_dataops_pipeline_prepare_scheduler' AND ds < %s
+            ORDER BY ds DESC, insert_time DESC
+            LIMIT 1
+        """, (ds,))
+        result = cursor.fetchone()
+        
+        if result:
+            # 获取计划、run_id、insert_time和ds
+            plan_json, run_id, insert_time, plan_ds = result
+            logger.info(f"找到历史执行计划记录,ds: {plan_ds}, run_id: {run_id}, insert_time: {insert_time}")
+            
+            # 处理plan_json可能已经是dict的情况
+            if isinstance(plan_json, dict):
+                execution_plan = plan_json
+            else:
+                execution_plan = json.loads(plan_json)
+                
+            return execution_plan
+        
+        # 找不到任何执行计划记录
+        logger.error(f"在数据库中未找到任何执行计划记录,当前DAG ds={ds}")
+        return None
+        
+    except Exception as e:
+        logger.error(f"从数据库获取执行计划时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        return None
+    finally:
+        cursor.close()
+        conn.close()
+
 # 创建DAG
 with DAG(
     "dag_dataops_pipeline_data_scheduler", 
@@ -1431,210 +1447,206 @@ with DAG(
     # 设置三个阶段之间的依赖关系
     prepare_group >> data_group >> summary_group
 
-    # 尝试从执行计划文件中获取信息 - 这部分在DAG解析时执行
+    # 尝试从数据库获取执行计划
     try:
-        # 尝试从文件中读取最新的执行计划,用于构建DAG图
-        plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-        ready_path = f"{plan_path}.ready"
-        
-        if os.path.exists(plan_path) and os.path.exists(ready_path):
-            try:
-                # 读取ready文件中的时间戳
-                with open(ready_path, 'r') as f:
-                    ready_timestamp = f.read().strip()
-                    logger.info(f"执行计划ready标记时间: {ready_timestamp}")
+        # 获取当前DAG的执行日期
+        exec_date = get_today_date()  # 使用当天日期作为默认值
+        logger.info(f"当前DAG执行日期 ds={exec_date},尝试从数据库获取执行计划")
+        
+        # 从数据库获取执行计划
+        execution_plan = get_execution_plan_from_db(exec_date)
+        
+        # 检查是否成功获取到执行计划
+        if execution_plan is None:
+            error_msg = f"无法从数据库获取有效的执行计划,当前DAG ds={exec_date}"
+            logger.error(error_msg)
+            # 使用全局变量而不是异常来强制DAG失败
+            raise ValueError(error_msg)
+        
+        # 如果获取到了执行计划,处理它
+        logger.info(f"成功从数据库获取执行计划")
+        
+        # 提取信息
+        exec_date = execution_plan.get("exec_date", exec_date)
+        resource_tasks = execution_plan.get("resource_tasks", [])
+        model_tasks = execution_plan.get("model_tasks", [])
+        dependencies = execution_plan.get("dependencies", {})
+        
+        logger.info(f"执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
+        
+        # 如果执行计划为空(没有任务),也应该失败
+        if not resource_tasks and not model_tasks:
+            error_msg = f"执行计划中没有任何任务,当前DAG ds={exec_date}"
+            logger.error(error_msg)
+            raise ValueError(error_msg)
+        
+        # 动态创建处理任务
+        task_dict = {}
+        
+        # 1. 创建资源表任务
+        for task_info in resource_tasks:
+            table_name = task_info["target_table"]
+            script_name = task_info["script_name"]
+            exec_mode = task_info.get("script_exec_mode", "append")
+            
+            # 创建安全的任务ID
+            safe_table_name = table_name.replace(".", "_").replace("-", "_")
+            
+            # 确保所有任务都是data_processing_phase的一部分
+            with data_group:
+                resource_task = PythonOperator(
+                    task_id=f"resource_{safe_table_name}",
+                    python_callable=process_resource,
+                    op_kwargs={
+                        "target_table": table_name,
+                        "script_name": script_name,
+                        "script_exec_mode": exec_mode,
+                        # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
+                        # 这样 execute_with_monitoring 函数才能正确更新数据库
+                        "exec_date": str(exec_date)
+                    },
+                    retries=TASK_RETRY_CONFIG["retries"],
+                    retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
+                )
+            
+            # 将任务添加到字典
+            task_dict[table_name] = resource_task
+            
+            # 设置与start_processing的依赖
+            start_processing >> resource_task
+        
+        # 创建有向图,用于检测模型表之间的依赖关系
+        G = nx.DiGraph()
+        
+        # 将所有模型表添加为节点
+        for task_info in model_tasks:
+            table_name = task_info["target_table"]
+            G.add_node(table_name)
+        
+        # 添加模型表之间的依赖边
+        for source, deps in dependencies.items():
+            for dep in deps:
+                if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
+                    G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
+        
+        # 检测循环依赖并处理
+        try:
+            cycles = list(nx.simple_cycles(G))
+            if cycles:
+                logger.warning(f"检测到循环依赖: {cycles}")
+                for cycle in cycles:
+                    G.remove_edge(cycle[-1], cycle[0])
+                    logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
+        except Exception as e:
+            logger.error(f"检测循环依赖时出错: {str(e)}")
+        
+        # 生成拓扑排序,确定执行顺序
+        execution_order = []
+        try:
+            execution_order = list(nx.topological_sort(G))
+        except Exception as e:
+            logger.error(f"生成拓扑排序失败: {str(e)}")
+            execution_order = [task_info["target_table"] for task_info in model_tasks]
+        
+        # 2. 按拓扑排序顺序创建模型表任务
+        for table_name in execution_order:
+            task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
+            if not task_info:
+                continue
                 
-                # 读取执行计划文件
-                with open(plan_path, 'r') as f:
-                    execution_plan_json = f.read()
-                    execution_plan = json.loads(execution_plan_json)
-                    logger.info(f"从文件加载执行计划: {plan_path}")
-                    
-                    # 提取信息
-                    exec_date = execution_plan.get("exec_date", get_today_date())
-                    resource_tasks = execution_plan.get("resource_tasks", [])
-                    model_tasks = execution_plan.get("model_tasks", [])
-                    dependencies = execution_plan.get("dependencies", {})
-                    
-                    logger.info(f"执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
-                    
-                    # 动态创建处理任务
-                    task_dict = {}
-                    
-                    # 1. 创建资源表任务
-                    for task_info in resource_tasks:
-                        table_name = task_info["target_table"]
-                        script_name = task_info["script_name"]
-                        exec_mode = task_info.get("script_exec_mode", "append")
-                        
-                        # 创建安全的任务ID
-                        safe_table_name = table_name.replace(".", "_").replace("-", "_")
-                        
-                        # 确保所有任务都是data_processing_phase的一部分
-                        with data_group:
-                            resource_task = PythonOperator(
-                                task_id=f"resource_{safe_table_name}",
-                                python_callable=process_resource,
-                                op_kwargs={
-                                    "target_table": table_name,
-                                    "script_name": script_name,
-                                    "script_exec_mode": exec_mode,
-                                    # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
-                                    # 这样 execute_with_monitoring 函数才能正确更新数据库
-                                    "exec_date": str(exec_date)
-                                },
-                                retries=TASK_RETRY_CONFIG["retries"],
-                                retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
-                            )
-                        
-                        # 将任务添加到字典
-                        task_dict[table_name] = resource_task
-                        
-                        # 设置与start_processing的依赖
-                        start_processing >> resource_task
-                    
-                    # 创建有向图,用于检测模型表之间的依赖关系
-                    G = nx.DiGraph()
-                    
-                    # 将所有模型表添加为节点
-                    for task_info in model_tasks:
-                        table_name = task_info["target_table"]
-                        G.add_node(table_name)
-                    
-                    # 添加模型表之间的依赖边
-                    for source, deps in dependencies.items():
-                        for dep in deps:
-                            if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
-                                G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
-                    
-                    # 检测循环依赖并处理
-                    try:
-                        cycles = list(nx.simple_cycles(G))
-                        if cycles:
-                            logger.warning(f"检测到循环依赖: {cycles}")
-                            for cycle in cycles:
-                                G.remove_edge(cycle[-1], cycle[0])
-                                logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-                    except Exception as e:
-                        logger.error(f"检测循环依赖时出错: {str(e)}")
-                    
-                    # 生成拓扑排序,确定执行顺序
-                    execution_order = []
-                    try:
-                        execution_order = list(nx.topological_sort(G))
-                    except Exception as e:
-                        logger.error(f"生成拓扑排序失败: {str(e)}")
-                        execution_order = [task_info["target_table"] for task_info in model_tasks]
-                    
-                    # 2. 按拓扑排序顺序创建模型表任务
-                    for table_name in execution_order:
-                        task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
-                        if not task_info:
-                            continue
-                            
-                        script_name = task_info["script_name"]
-                        exec_mode = task_info.get("script_exec_mode", "append")
-                        
-                        # 创建安全的任务ID
-                        safe_table_name = table_name.replace(".", "_").replace("-", "_")
-                        
-                        # 确保所有任务都是data_processing_phase的一部分
-                        with data_group:
-                            model_task = PythonOperator(
-                                task_id=f"model_{safe_table_name}",
-                                python_callable=process_model,
-                                op_kwargs={
-                                    "target_table": table_name,
-                                    "script_name": script_name,
-                                    "script_exec_mode": exec_mode,
-                                    # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
-                                    # 这样 execute_with_monitoring 函数才能正确更新数据库
-                                    "exec_date": str(exec_date)
-                                },
-                                retries=TASK_RETRY_CONFIG["retries"],
-                                retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
-                            )
-                        
-                        # 将任务添加到字典
-                        task_dict[table_name] = model_task
-                        
-                        # 设置依赖关系
-                        deps = dependencies.get(table_name, [])
-                        has_dependency = False
-                        
-                        # 处理模型表之间的依赖
-                        for dep in deps:
-                            dep_table = dep.get("table_name")
-                            dep_type = dep.get("table_type")
-                            
-                            if dep_table in task_dict:
-                                task_dict[dep_table] >> model_task
-                                has_dependency = True
-                                logger.info(f"设置依赖: {dep_table} >> {table_name}")
-                        
-                        # 如果没有依赖,则依赖于start_processing和资源表任务
-                        if not has_dependency:
-                            # 从start_processing任务直接连接
-                            start_processing >> model_task
-                            
-                            # 同时从所有资源表任务连接
-                            resource_count = 0
-                            for resource_table in resource_tasks:
-                                if resource_count >= 5:  # 最多设置5个依赖
-                                    break
-                                
-                                resource_name = resource_table["target_table"]
-                                if resource_name in task_dict:
-                                    task_dict[resource_name] >> model_task
-                                    resource_count += 1
-                    
-                    # 找出所有终端任务(没有下游依赖的任务)
-                    terminal_tasks = []
-                    
-                    # 检查所有模型表任务
-                    for table_name in execution_order:
-                        # 检查是否有下游任务
-                        has_downstream = False
-                        for source, deps in dependencies.items():
-                            if source == table_name:  # 跳过自身
-                                continue
-                            for dep in deps:
-                                if dep.get("table_name") == table_name:
-                                    has_downstream = True
-                                    break
-                            if has_downstream:
-                                break
-                        
-                        # 如果没有下游任务,添加到终端任务列表
-                        if not has_downstream and table_name in task_dict:
-                            terminal_tasks.append(table_name)
-                    
-                    # 如果没有模型表任务,将所有资源表任务视为终端任务
-                    if not model_tasks and resource_tasks:
-                        terminal_tasks = [task["target_table"] for task in resource_tasks]
-                        logger.info(f"没有模型表任务,将所有资源表任务视为终端任务: {terminal_tasks}")
+            script_name = task_info["script_name"]
+            exec_mode = task_info.get("script_exec_mode", "append")
+            
+            # 创建安全的任务ID
+            safe_table_name = table_name.replace(".", "_").replace("-", "_")
+            
+            # 确保所有任务都是data_processing_phase的一部分
+            with data_group:
+                model_task = PythonOperator(
+                    task_id=f"model_{safe_table_name}",
+                    python_callable=process_model,
+                    op_kwargs={
+                        "target_table": table_name,
+                        "script_name": script_name,
+                        "script_exec_mode": exec_mode,
+                        # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
+                        # 这样 execute_with_monitoring 函数才能正确更新数据库
+                        "exec_date": str(exec_date)
+                    },
+                    retries=TASK_RETRY_CONFIG["retries"],
+                    retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
+                )
+            
+            # 将任务添加到字典
+            task_dict[table_name] = model_task
+            
+            # 设置依赖关系
+            deps = dependencies.get(table_name, [])
+            has_dependency = False
+            
+            # 处理模型表之间的依赖
+            for dep in deps:
+                dep_table = dep.get("table_name")
+                dep_type = dep.get("table_type")
+                
+                if dep_table in task_dict:
+                    task_dict[dep_table] >> model_task
+                    has_dependency = True
+                    logger.info(f"设置依赖: {dep_table} >> {table_name}")
+            
+            # 如果没有依赖,则依赖于start_processing和资源表任务
+            if not has_dependency:
+                # 从start_processing任务直接连接
+                start_processing >> model_task
+                
+                # 同时从所有资源表任务连接
+                resource_count = 0
+                for resource_table in resource_tasks:
+                    if resource_count >= 5:  # 最多设置5个依赖
+                        break
                     
-                    # 如果既没有模型表任务也没有资源表任务,已有默认依赖链
-                    if not terminal_tasks:
-                        logger.warning("未找到任何任务,使用默认依赖链")
-                    else:
-                        # 将所有终端任务连接到完成标记
-                        for table_name in terminal_tasks:
-                            if table_name in task_dict:
-                                task_dict[table_name] >> processing_completed
-                                logger.info(f"设置终端任务: {table_name} >> processing_completed")            
-            except Exception as plan_e:
-                logger.error(f"解析执行计划文件时出错: {str(plan_e)}")
-                import traceback
-                logger.error(traceback.format_exc())
+                    resource_name = resource_table["target_table"]
+                    if resource_name in task_dict:
+                        task_dict[resource_name] >> model_task
+                        resource_count += 1
+        
+        # 找出所有终端任务(没有下游依赖的任务)
+        terminal_tasks = []
+        
+        # 检查所有模型表任务
+        for table_name in execution_order:
+            # 检查是否有下游任务
+            has_downstream = False
+            for source, deps in dependencies.items():
+                if source == table_name:  # 跳过自身
+                    continue
+                for dep in deps:
+                    if dep.get("table_name") == table_name:
+                        has_downstream = True
+                        break
+                if has_downstream:
+                    break
+            
+            # 如果没有下游任务,添加到终端任务列表
+            if not has_downstream and table_name in task_dict:
+                terminal_tasks.append(table_name)
+        
+        # 如果没有模型表任务,将所有资源表任务视为终端任务
+        if not model_tasks and resource_tasks:
+            terminal_tasks = [task["target_table"] for task in resource_tasks]
+            logger.info(f"没有模型表任务,将所有资源表任务视为终端任务: {terminal_tasks}")
+        
+        # 如果既没有模型表任务也没有资源表任务,已有默认依赖链
+        if not terminal_tasks:
+            logger.warning("未找到任何任务,使用默认依赖链")
         else:
-            if not os.path.exists(plan_path):
-                logger.warning(f"执行计划文件不存在: {plan_path}")
-            if not os.path.exists(ready_path):
-                logger.warning(f"执行计划ready标记文件不存在: {ready_path}")
-            logger.warning("将使用默认DAG结构")
+            # 将所有终端任务连接到完成标记
+            for table_name in terminal_tasks:
+                if table_name in task_dict:
+                    task_dict[table_name] >> processing_completed
+                    logger.info(f"设置终端任务: {table_name} >> processing_completed")
     except Exception as e:
-        logger.error(f"加载执行计划文件时出错: {str(e)}")
+        logger.error(f"加载执行计划时出错: {str(e)}")
         import traceback
         logger.error(traceback.format_exc())
 logger.info(f"DAG dag_dataops_pipeline_data_scheduler 定义完成")