Преглед изворни кода

做了两种尝试,效果均不满意,手工执行时合并后,查看脚本执行流程图。自动调度合并为一个DAG,并保留作业执行流程图。

wangxq пре 1 месец
родитељ
комит
fd4f7e472c
1 измењених фајлова са 147 додато и 173 уклоњено
  1. 147 173
      dags/dag_dataops_unified_scheduler.py

+ 147 - 173
dags/dag_dataops_unified_scheduler.py

@@ -407,7 +407,7 @@ def prepare_dag_schedule(**kwargs):
     kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
     logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
     
-    return inserted_count 
+    return inserted_count
 
 #############################################
 # 第二阶段: 数据处理阶段(Data Processing Phase)的函数
@@ -989,187 +989,161 @@ with DAG(
     prepare_group >> data_group >> summary_group
 
     # 实际数据处理任务的动态创建逻辑
-    # 这部分代码在DAG运行时执行,根据数据库数据和执行计划动态创建任务
+    # 这部分代码在DAG对象定义时执行,而不是在DAG运行时执行
     
-    # 从执行计划JSON中获取信息
-    execution_plan_json = '''{"exec_date": "2025-04-12", "resource_tasks": [], "model_tasks": [], "dependencies": {}}'''
-    
-    try:
-        # 尝试从文件中读取最新的执行计划,仅用于构建DAG视图
-        import os
-        plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-        if os.path.exists(plan_path):
-            with open(plan_path, 'r') as f:
-                execution_plan_json = f.read()
-    except Exception as e:
-        logger.warning(f"读取执行计划默认值时出错: {str(e)}")
-    
-    # 解析执行计划获取任务信息
-    try:
-        execution_plan = json.loads(execution_plan_json)
-        exec_date = execution_plan.get("exec_date", get_today_date())
-        resource_tasks = execution_plan.get("resource_tasks", [])
-        model_tasks = execution_plan.get("model_tasks", [])
-        dependencies = execution_plan.get("dependencies", {})
-        
-        # 任务字典,用于设置依赖关系
-        task_dict = {}
-        
-        # 1. 创建资源表任务
-        for task_info in resource_tasks:
-            table_name = task_info["target_table"]
-            script_name = task_info["script_name"]
-            exec_mode = task_info.get("script_exec_mode", "append")
-            
-            # 创建安全的任务ID - 直接使用表名作为ID,更简洁易读
-            safe_table_name = table_name.replace(".", "_").replace("-", "_")
-            
-            # 确保所有任务都是data_processing_phase的一部分
-            with data_group:
-                resource_task = PythonOperator(
-                    task_id=f"resource_{safe_table_name}",  # 不需要加前缀,TaskGroup会自动添加
-                    python_callable=process_resource,
-                    op_kwargs={
-                        "target_table": table_name,
-                        "script_name": script_name,
-                        "script_exec_mode": exec_mode,
-                        "exec_date": """{{ ti.xcom_pull(task_ids='prepare_phase.create_execution_plan') }}"""
-                    },
-                    retries=TASK_RETRY_CONFIG["retries"],
-                    retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
-                )
-            
-            # 将任务添加到字典
-            task_dict[table_name] = resource_task
-            
-            # 设置任务依赖 - 使用正确的引用方式
-            preparation_completed >> resource_task
-        
-        # 创建有向图,用于检测模型表之间的依赖关系
-        G = nx.DiGraph()
-        
-        # 将所有模型表添加为节点
-        for task_info in model_tasks:
-            table_name = task_info["target_table"]
-            G.add_node(table_name)
-        
-        # 添加模型表之间的依赖边
-        for source, deps in dependencies.items():
-            for dep in deps:
-                if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
-                    G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
+    # 创建一个DynamicTaskMapper用于动态创建任务
+    class DynamicTaskMapper(PythonOperator):
+        """用于动态映射任务的特殊PythonOperator。
+        该类作为一个普通任务执行,但会在运行时动态创建下游任务。"""
         
-        # 检测循环依赖并处理
-        cycles = list(nx.simple_cycles(G))
-        if cycles:
-            logger.warning(f"检测到循环依赖: {cycles}")
-            for cycle in cycles:
-                G.remove_edge(cycle[-1], cycle[0])
-                logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-        
-        # 生成拓扑排序,确定执行顺序
-        try:
-            execution_order = list(nx.topological_sort(G))
-            logger.info(f"计算出的执行顺序: {execution_order}")
-        except Exception as e:
-            logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
-            execution_order = [task_info["target_table"] for task_info in model_tasks]
-        
-        # 2. 按拓扑排序顺序创建模型表任务
-        for table_name in execution_order:
-            task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
-            if not task_info:
-                continue
+        def execute(self, context):
+            """在DAG运行时动态创建和执行任务"""
+            try:
+                logger.info("开始动态任务映射...")
                 
-            script_name = task_info["script_name"]
-            exec_mode = task_info.get("script_exec_mode", "append")
-            
-            # 创建安全的任务ID
-            safe_table_name = table_name.replace(".", "_").replace("-", "_")
-            
-            # 确保所有任务都是data_processing_phase的一部分
-            with data_group:
-                model_task = PythonOperator(
-                    task_id=f"model_{safe_table_name}", # 更简洁的ID
-                    python_callable=process_model,
-                    op_kwargs={
-                        "target_table": table_name,
-                        "script_name": script_name,
-                        "script_exec_mode": exec_mode,
-                        "exec_date": """{{ ti.xcom_pull(task_ids='prepare_phase.create_execution_plan') }}"""
-                    },
-                    retries=TASK_RETRY_CONFIG["retries"],
-                    retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
+                # 从XCom获取执行计划
+                ti = context['ti']
+                execution_plan_json = ti.xcom_pull(
+                    task_ids='prepare_phase.create_execution_plan'
                 )
-            
-            # 将任务添加到字典
-            task_dict[table_name] = model_task
-            
-            # 设置依赖关系
-            deps = dependencies.get(table_name, [])
-            has_dependency = False
-            
-            # 处理模型表之间的依赖
-            for dep in deps:
-                dep_table = dep.get("table_name")
-                dep_type = dep.get("table_type")
                 
-                if dep_table in task_dict:
-                    task_dict[dep_table] >> model_task
-                    has_dependency = True
-                    logger.info(f"设置依赖: {dep_table} >> {table_name}")
-            
-            # 如果没有依赖,则依赖于资源表任务
-            if not has_dependency:
-                # 依赖于prepare_phase的完成
-                preparation_completed >> model_task
+                if not execution_plan_json:
+                    logger.info("尝试从prepare_phase.prepare_dag_schedule获取执行计划")
+                    execution_plan_tmp = ti.xcom_pull(
+                        task_ids='prepare_phase.prepare_dag_schedule', 
+                        key='execution_plan'
+                    )
+                    
+                    if execution_plan_tmp:
+                        execution_plan_json = execution_plan_tmp
+                    else:
+                        logger.error("无法从XCom获取执行计划")
+                        raise ValueError("执行计划未找到")
                 
-                # 同时从所有资源表任务连接
-                for resource_table in resource_tasks:
-                    resource_name = resource_table["target_table"]
-                    if resource_name in task_dict:
-                        task_dict[resource_name] >> model_task
-                        logger.info(f"设置资源依赖: {resource_name} >> {table_name}")
-
-        # 如果没有模型表任务,将所有资源表任务视为终端任务
-        if not model_tasks and resource_tasks:
-            terminal_tasks = [task["target_table"] for task in resource_tasks]
-        else:
-            # 找出所有终端任务(没有下游依赖的任务)
-            terminal_tasks = []
-            
-            # 检查所有模型表任务
-            for table_name in execution_order:
-                # 检查是否有下游任务
-                has_downstream = False
+                # 解析执行计划
+                if isinstance(execution_plan_json, str):
+                    execution_plan = json.loads(execution_plan_json)
+                else:
+                    execution_plan = execution_plan_json
+                
+                # 获取资源任务和模型任务
+                resource_tasks = execution_plan.get("resource_tasks", [])
+                model_tasks = execution_plan.get("model_tasks", [])
+                dependencies = execution_plan.get("dependencies", {})
+                
+                logger.info(f"获取到执行计划: {len(resource_tasks)}个资源任务, {len(model_tasks)}个模型任务")
+                
+                # 处理资源任务
+                logger.info("处理资源任务...")
+                for task_info in resource_tasks:
+                    target_table = task_info["target_table"]
+                    script_name = task_info["script_name"]
+                    script_exec_mode = task_info.get("script_exec_mode", "append")
+                    
+                    logger.info(f"执行资源表任务: {target_table}, 脚本: {script_name}")
+                    try:
+                        process_resource(
+                            target_table=target_table,
+                            script_name=script_name,
+                            script_exec_mode=script_exec_mode,
+                            exec_date=context.get('ds')
+                        )
+                    except Exception as e:
+                        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
+                
+                # 构建模型表依赖图,确定执行顺序
+                G = nx.DiGraph()
+                
+                # 添加所有模型表节点
+                for task_info in model_tasks:
+                    G.add_node(task_info["target_table"])
+                
+                # 添加依赖边
                 for source, deps in dependencies.items():
-                    if source == table_name:  # 跳过自身
-                        continue
                     for dep in deps:
-                        if dep.get("table_name") == table_name:
-                            has_downstream = True
-                            break
-                    if has_downstream:
-                        break
+                        if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
+                            G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
                 
-                # 如果没有下游任务,添加到终端任务列表
-                if not has_downstream and table_name in task_dict:
-                    terminal_tasks.append(table_name)
-
-        # 如果既没有模型表任务也没有资源表任务,已有默认依赖链
-        if not terminal_tasks:
-            logger.warning("未找到任何任务,使用默认依赖链")
-        else:
-            # 将所有终端任务连接到完成标记
-            for table_name in terminal_tasks:
-                if table_name in task_dict:
-                    task_dict[table_name] >> processing_completed
-                    logger.info(f"设置终端任务: {table_name} >> processing_completed")
-
-    except Exception as e:
-        logger.error(f"构建任务DAG时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
+                # 检测并处理循环依赖
+                cycles = list(nx.simple_cycles(G))
+                if cycles:
+                    logger.warning(f"检测到循环依赖: {cycles}")
+                    for cycle in cycles:
+                        G.remove_edge(cycle[-1], cycle[0])
+                        logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
+                
+                # 生成拓扑排序,确定执行顺序
+                try:
+                    execution_order = list(nx.topological_sort(G))
+                    logger.info(f"计算出的执行顺序: {execution_order}")
+                except Exception as e:
+                    logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
+                    execution_order = [task_info["target_table"] for task_info in model_tasks]
+                
+                # 按顺序执行模型表任务
+                for table_name in execution_order:
+                    task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
+                    if not task_info:
+                        continue
+                        
+                    target_table = task_info["target_table"]
+                    script_name = task_info["script_name"]
+                    script_exec_mode = task_info.get("script_exec_mode", "append")
+                    
+                    logger.info(f"执行模型表任务: {target_table}, 脚本: {script_name}")
+                    try:
+                        process_model(
+                            target_table=target_table,
+                            script_name=script_name,
+                            script_exec_mode=script_exec_mode,
+                            exec_date=context.get('ds')
+                        )
+                    except Exception as e:
+                        logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
+                
+                return f"成功执行 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务"
+                
+            except Exception as e:
+                logger.error(f"动态任务映射失败: {str(e)}")
+                import traceback
+                logger.error(traceback.format_exc())
+                raise
+    
+    # 在data_processing_phase中使用修改后的DynamicTaskMapper
+    with data_group:
+        dynamic_task_mapper = DynamicTaskMapper(
+            task_id="dynamic_task_mapper",
+            python_callable=lambda **kwargs: "Dynamic task mapping placeholder",
+            provide_context=True
+        )
         
+        # 设置依赖关系
+        preparation_completed >> dynamic_task_mapper >> processing_completed
+    
+    # 注意:以下原始代码不再需要,因为我们现在使用DynamicTaskMapper来动态创建和执行任务
+    # 保留的原始try-except块的结尾,确保代码结构完整
+    
+    # 尝试从数据库获取最新的执行计划,用于WebUI展示
+    try:
+        # 使用一个只在DAG加载时执行一次的简单查询来获取表信息
+        # 这只用于UI展示,不影响实际执行
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        try:
+            cursor.execute("""
+                SELECT COUNT(*) FROM airflow_dag_schedule
+            """)
+            count = cursor.fetchone()
+            if count and count[0] > 0:
+                logger.info(f"数据库中有 {count[0]} 条任务记录可用于调度")
+            else:
+                logger.info("数据库中没有找到任务记录,DAG的第一次运行将创建初始计划")
+        except Exception as e:
+            logger.warning(f"查询数据库时出错: {str(e)}, 这不会影响DAG的实际执行")
+        finally:
+            cursor.close()
+            conn.close()
+    except Exception as e:
+        logger.warning(f"初始化DAG时发生错误: {str(e)}, 这不会影响DAG的实际执行")
         # 确保即使出错,也有清晰的执行路径
         # 已经有默认依赖链,不需要额外添加