Przeglądaj źródła

可以正确生成source表数组

wangxq 3 tygodni temu
rodzic
commit
67e158f76d

+ 167 - 35
dags/dag_dataops_pipeline_data_scheduler.py

@@ -144,6 +144,7 @@ def execute_python_script(target_table, script_name, script_exec_mode, exec_date
         script_name: 脚本名称 
         script_exec_mode: 脚本执行模式
         exec_date: 执行日期
+        source_tables: (可选) 源表列表
         
     返回:
         bool: 脚本执行结果
@@ -196,7 +197,7 @@ def execute_python_script(target_table, script_name, script_exec_mode, exec_date
             }
 
             ## 添加可能的额外参数
-            for key in ['target_type', 'storage_location', 'frequency']:
+            for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
                 if key in kwargs and kwargs[key] is not None:
                     run_params[key] = kwargs[key] 
 
@@ -294,7 +295,8 @@ def get_table_info_from_neo4j(table_name):
             # 查询表标签和状态
             query_table = """
                 MATCH (t {en_name: $table_name})
-                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
+                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency,
+                       t.type AS type, t.storage_location AS storage_location
             """
             result = session.run(query_table, table_name=table_name)
             record = result.single()
@@ -304,41 +306,135 @@ def get_table_info_from_neo4j(table_name):
                 table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
                 table_info['target_table_status'] = record.get("status", True)  # 默认为True
                 table_info['default_update_frequency'] = record.get("frequency")
+                table_info['frequency'] = record.get("frequency")
+                table_info['target_type'] = record.get("type")  # 获取type属性
+                table_info['storage_location'] = record.get("storage_location")  # 获取storage_location属性
                 
                 # 根据标签类型查询关系和脚本信息
                 if "DataResource" in labels:
-                    query_rel = """
-                        MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
-                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
-                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
-                    """
+                    # 检查是否为structure类型
+                    if table_info.get('target_type') == "structure":
+                        # 对于structure类型,设置默认值,不查询关系
+                        table_info['source_tables'] = []  # 使用空数组表示无源表
+                        table_info['script_name'] = "load_file.py"
+                        table_info['script_type'] = "python"
+                        
+                        # csv类型的DataResource没有上游,使用默认的append模式
+                        table_info['script_exec_mode'] = "append"
+                        logger.info(f"表 {table_name} 为structure类型,使用默认执行模式: append")
+
+                        return table_info
+                    else:
+                        query_rel = """
+                            MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
+                            WITH source, rel, 
+                                 CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
+                                 CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
+                            RETURN source.en_name AS source_table, script_name AS script_name,
+                                   script_type AS script_type, 'append' AS script_exec_mode
+                        """
                 elif "DataModel" in labels:
                     query_rel = """
                         MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
-                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
-                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
+                        WITH source, rel, 
+                             CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
+                             CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
+                        RETURN source.en_name AS source_table, script_name AS script_name,
+                               script_type AS script_type, 'append' AS script_exec_mode
                     """
                 else:
                     logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
                     return table_info
                 
+                # 收集所有关系记录
                 result = session.run(query_rel, table_name=table_name)
-                record = result.single()
+                # 检查result对象是否有collect方法,否则使用data方法或list直接转换
+                try:
+                    if hasattr(result, 'collect'):
+                        records = result.collect()  # 使用collect()获取所有记录
+                    else:
+                        # 尝试使用其他方法获取记录
+                        logger.info(f"表 {table_name} 的查询结果不支持collect方法,尝试使用其他方法")
+                        try:
+                            records = list(result)  # 直接转换为列表
+                        except Exception as e1:
+                            logger.warning(f"尝试列表转换失败: {str(e1)},尝试使用data方法")
+                            try:
+                                records = result.data()  # 使用data()方法
+                            except Exception as e2:
+                                logger.warning(f"所有方法都失败,使用空列表: {str(e2)}")
+                                records = []
+                except Exception as e:
+                    logger.warning(f"获取查询结果时出错: {str(e)},使用空列表")
+                    records = []
+                
+                # 记录查询到的原始记录
+                logger.info(f"表 {table_name} 查询到 {len(records)} 条关系记录")
+                for idx, rec in enumerate(records):
+                    logger.info(f"关系记录[{idx}]: source_table={rec.get('source_table')}, script_name={rec.get('script_name')}, " 
+                                f"script_type={rec.get('script_type')}, script_exec_mode={rec.get('script_exec_mode')}")
                 
-                if record:
-                    table_info['source_table'] = record.get("source_table")     
+                if records:
+                    # 按脚本名称分组源表
+                    scripts_info = {}
+                    for record in records:
+                        script_name = record.get("script_name")
+                        source_table = record.get("source_table")
+                        script_type = record.get("script_type", "python")
+                        script_exec_mode = record.get("script_exec_mode", "append")
+                        
+                        logger.info(f"处理记录: source_table={source_table}, script_name={script_name}")
 
-                    # 检查script_name是否为空
-                    script_name = record.get("script_name")
-                    if not script_name:
-                        logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
-                    table_info['script_name'] = script_name
+                        if not script_name:
+                            script_name = f"{table_name}_process.py"
+                            logger.warning(f"表 {table_name} 的关系中没有script_name属性,使用默认值: {script_name}")
+                            
+                        if script_name not in scripts_info:
+                            scripts_info[script_name] = {
+                                "sources": [],
+                                "script_type": script_type,
+                                "script_exec_mode": script_exec_mode
+                            }
+                        
+                        # 确保source_table有值且不为None才添加到sources列表中
+                        if source_table and source_table not in scripts_info[script_name]["sources"]:
+                            scripts_info[script_name]["sources"].append(source_table)
+                            logger.debug(f"为表 {table_name} 的脚本 {script_name} 添加源表: {source_table}")
                     
-                    # 设置默认值,确保即使属性为空也有默认值
-                    table_info['script_type'] = record.get("script_type", "python")  # 默认为python
-                    table_info['script_exec_mode'] = record.get("script_exec_mode", "append")  # 默认为append
+                    # 处理分组信息
+                    if scripts_info:
+                        # 存储完整的脚本信息
+                        table_info['scripts_info'] = scripts_info
+                        
+                        # 如果只有一个脚本,直接使用它
+                        if len(scripts_info) == 1:
+                            script_name = list(scripts_info.keys())[0]
+                            script_info = scripts_info[script_name]
+                            
+                            table_info['source_tables'] = script_info["sources"]  # 使用数组
+                            table_info['script_name'] = script_name
+                            table_info['script_type'] = script_info["script_type"]
+                            table_info['script_exec_mode'] = script_info["script_exec_mode"]
+                            logger.info(f"表 {table_name} 有单个脚本 {script_name},源表: {script_info['sources']}")
+                        else:
+                            # 如果有多个不同脚本,记录多脚本信息
+                            logger.info(f"表 {table_name} 有多个不同脚本: {list(scripts_info.keys())}")
+                            # 暂时使用第一个脚本的信息作为默认值
+                            first_script = list(scripts_info.keys())[0]
+                            table_info['source_tables'] = scripts_info[first_script]["sources"]
+                            table_info['script_name'] = first_script
+                            table_info['script_type'] = scripts_info[first_script]["script_type"]
+                            table_info['script_exec_mode'] = scripts_info[first_script]["script_exec_mode"]
+                    else:
+                        logger.warning(f"表 {table_name} 未找到有效的脚本信息")
+                        table_info['source_tables'] = []  # 使用空数组
+                        # 向下兼容
+                        table_info['source_table'] = None
                 else:
                     logger.warning(f"未找到表 {table_name} 的关系信息")
+                    table_info['source_tables'] = []  # 使用空数组
+                    # 向下兼容
+                    table_info['source_table'] = None
             else:
                 logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
     except Exception as e:
@@ -381,6 +477,10 @@ def process_dependencies(tables_info):
                             dep_info['is_directly_schedule'] = False
                             
                             # 处理调度频率继承
+                            if not dep_info.get('frequency'):
+                                dep_info['frequency'] = table_info.get('frequency')
+                            
+                            # 确保向下兼容
                             if not dep_info.get('default_update_frequency'):
                                 dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
                             
@@ -507,7 +607,7 @@ def prepare_dag_schedule(**kwargs):
     for table in valid_tables:
         if table.get('target_table_label') == 'DataResource':
             task_info = {
-                "source_table": table.get('source_table'),
+                "source_tables": [table.get('source_table')] if table.get('source_table') else [],
                 "target_table": table['target_table'],
                 "target_table_label": "DataResource",
                 "script_name": table.get('script_name'),
@@ -522,7 +622,7 @@ def prepare_dag_schedule(**kwargs):
             resource_tasks.append(task_info)
         elif table.get('target_table_label') == 'DataModel':
             model_tasks.append({
-                "source_table": table.get('source_table'),
+                "source_tables": [table.get('source_table')] if table.get('source_table') else [],
                 "target_table": table['target_table'],
                 "target_table_label": "DataModel",
                 "script_name": table.get('script_name'),
@@ -758,10 +858,15 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date,**kw
         exec_date = str(exec_date)
         logger.info(f"将exec_date转换为字符串: {exec_date}")
 
-        # 获取额外参数
+    # 获取额外参数
     target_type = kwargs.get('target_type')
     storage_location = kwargs.get('storage_location')
     frequency = kwargs.get('frequency')
+    source_tables = kwargs.get('source_tables', [])
+    
+    # 记录源表信息(如果有)
+    if source_tables and len(source_tables) > 0:
+        logger.info(f"资源表 {target_table} 有 {len(source_tables)} 个源表: {source_tables}")
     
     try:
         # 使用新的函数执行脚本,传递相应参数
@@ -773,7 +878,8 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date,**kw
             "script_name": script_name,
             "script_exec_mode": script_exec_mode,
             "exec_date": exec_date,
-            "frequency": frequency
+            "frequency": frequency,
+            "source_tables": source_tables
         }
         
         # 添加特殊参数(如果有)
@@ -799,8 +905,8 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date,**kw
         logger.info(f"===== 结束执行 {task_id} =====")
 
 
-def process_model(target_table, script_name, script_exec_mode, exec_date):
-    """处理单个模型表"""
+def process_model(target_table, script_name, script_exec_mode, exec_date, source_tables=None):
+    """处理单个模型表,支持多个源表"""
     task_id = f"model_{target_table}"
     logger.info(f"===== 开始执行 {task_id} =====")
     logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
@@ -810,6 +916,10 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
         exec_date = str(exec_date)
         logger.info(f"将exec_date转换为字符串: {exec_date}")
     
+    # 记录源表信息
+    if source_tables and len(source_tables) > 0:
+        logger.info(f"模型表 {target_table} 有 {len(source_tables)} 个源表: {source_tables}")
+    
     try:
         # 使用新的函数执行脚本,不依赖数据库
         logger.info(f"调用execute_python_script: target_table={target_table}, script_name={script_name}")
@@ -817,7 +927,8 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
             target_table=target_table,
             script_name=script_name,
             script_exec_mode=script_exec_mode,
-            exec_date=exec_date
+            exec_date=exec_date,
+            source_tables=source_tables  # 传递源表列表
         )
         logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
         return result
@@ -1054,6 +1165,7 @@ with DAG(
             table_name = task_info["target_table"]
             script_name = task_info["script_name"]
             exec_mode = task_info.get("script_exec_mode", "append")
+            source_tables = task_info.get("source_tables", [])  # 获取源表数组
             
             # 创建安全的任务ID
             safe_table_name = table_name.replace(".", "_").replace("-", "_")
@@ -1063,7 +1175,8 @@ with DAG(
                 "target_table": table_name,
                 "script_name": script_name,
                 "script_exec_mode": exec_mode,
-                "exec_date": str(exec_date)
+                "exec_date": str(exec_date),
+                "source_tables": source_tables  # 添加源表数组
             }
 
             # 添加特殊参数(如果有)
@@ -1090,6 +1203,13 @@ with DAG(
             
             # 设置与start_processing的依赖
             start_processing >> resource_task
+            
+            # 如果资源表有自己的源表依赖
+            if source_tables and isinstance(source_tables, list):
+                for source_table in source_tables:
+                    if source_table and source_table in task_dict:
+                        task_dict[source_table] >> resource_task
+                        logger.info(f"设置资源表依赖: {source_table} >> {table_name}")
         
         # 创建有向图,用于检测模型表之间的依赖关系
         G = nx.DiGraph()
@@ -1132,6 +1252,7 @@ with DAG(
                 
             script_name = task_info["script_name"]
             exec_mode = task_info.get("script_exec_mode", "append")
+            source_tables = task_info.get("source_tables", [])  # 获取源表数组
             
             # 创建安全的任务ID
             safe_table_name = table_name.replace(".", "_").replace("-", "_")
@@ -1146,7 +1267,8 @@ with DAG(
                         "script_name": script_name,
                         "script_exec_mode": exec_mode,
                         # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
-                        "exec_date": str(exec_date)
+                        "exec_date": str(exec_date),
+                        "source_tables": source_tables  # 传递源表数组
                     },
                     retries=TASK_RETRY_CONFIG["retries"],
                     retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
@@ -1155,19 +1277,29 @@ with DAG(
             # 将任务添加到字典
             task_dict[table_name] = model_task
             
-            # 设置依赖关系
-            deps = dependencies.get(table_name, [])
+            # 设置依赖关系,基于source_tables和dependencies
             has_dependency = False
             
-            # 处理模型表之间的依赖
+            # 先根据source_tables直接设置依赖
+            if isinstance(source_tables, list):
+                for source_table in source_tables:
+                    if source_table and source_table in task_dict:
+                        task_dict[source_table] >> model_task
+                        has_dependency = True
+                        logger.info(f"根据source_tables设置依赖: {source_table} >> {table_name}")
+            
+            # 然后处理dependencies中的依赖
+            deps = dependencies.get(table_name, [])
             for dep in deps:
                 dep_table = dep.get("table_name")
                 dep_type = dep.get("table_type")
                 
                 if dep_table in task_dict:
-                    task_dict[dep_table] >> model_task
-                    has_dependency = True
-                    logger.info(f"设置依赖: {dep_table} >> {table_name}")
+                    # 避免重复设置依赖
+                    if dep_table not in source_tables:
+                        task_dict[dep_table] >> model_task
+                        has_dependency = True
+                        logger.info(f"根据dependencies设置依赖: {dep_table} >> {table_name}")
             
             # 如果没有依赖,则依赖于start_processing和资源表任务
             if not has_dependency:

+ 195 - 59
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -154,58 +154,123 @@ def get_table_info_from_neo4j(table_name):
                     # 检查是否为structure类型
                     if table_info.get('target_type') == "structure":
                         # 对于structure类型,设置默认值,不查询关系
-                        table_info['source_table'] = None
+                        table_info['source_tables'] = []  # 使用空数组表示无源表
                         table_info['script_name'] = "load_file.py"
                         table_info['script_type'] = "python"
                         
-                        # 获取执行模式,注意csv类型的DataResource,它没有上游,所以exec_mode属性只能被写到节点上。
-                        query_exec_mode = """
-                            MATCH (t {en_name: $table_name})
-                            RETURN t.script_exec_mode AS script_exec_mode
-                        """
-                        result = session.run(query_exec_mode, table_name=table_name)
-                        record = result.single()
-                        if record and record.get("script_exec_mode"):
-                            table_info['script_exec_mode'] = record.get("script_exec_mode")
-                        else:
-                            # 如果没有找到执行模式,使用默认值
-                            table_info['script_exec_mode'] = "append"
-                            logger.info(f"表 {table_name} 未指定执行模式,使用默认值: append")
+                        # csv类型的DataResource没有上游,使用默认的append模式
+                        table_info['script_exec_mode'] = "append"
+                        logger.info(f"表 {table_name} 为structure类型,使用默认执行模式: append")
 
                         return table_info
                     else:
                         query_rel = """
                             MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
-                            RETURN source.en_name AS source_table, rel.script_name AS script_name,
-                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
-                    """
+                            WITH source, rel, 
+                                 CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
+                                 CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
+                            RETURN source.en_name AS source_table, script_name AS script_name,
+                                   script_type AS script_type, 'append' AS script_exec_mode
+                        """
                 elif "DataModel" in labels:
                     query_rel = """
                         MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
-                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
-                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
+                        WITH source, rel, 
+                             CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
+                             CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
+                        RETURN source.en_name AS source_table, script_name AS script_name,
+                               script_type AS script_type, 'append' AS script_exec_mode
                     """
                 else:
                     logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
                     return table_info
                 
+                # 收集所有关系记录
                 result = session.run(query_rel, table_name=table_name)
-                record = result.single()
+                # 检查result对象是否有collect方法,否则使用data方法或list直接转换
+                try:
+                    if hasattr(result, 'collect'):
+                        records = result.collect()  # 使用collect()获取所有记录
+                    else:
+                        # 尝试使用其他方法获取记录
+                        logger.info(f"表 {table_name} 的查询结果不支持collect方法,尝试使用其他方法")
+                        try:
+                            records = list(result)  # 直接转换为列表
+                        except Exception as e1:
+                            logger.warning(f"尝试列表转换失败: {str(e1)},尝试使用data方法")
+                            try:
+                                records = result.data()  # 使用data()方法
+                            except Exception as e2:
+                                logger.warning(f"所有方法都失败,使用空列表: {str(e2)}")
+                                records = []
+                except Exception as e:
+                    logger.warning(f"获取查询结果时出错: {str(e)},使用空列表")
+                    records = []
                 
-                if record:
-                    table_info['source_table'] = record.get("source_table")     
-
-                    # 检查script_name是否为空
-                    script_name = record.get("script_name")
-                    if not script_name:
-                        logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
-                    table_info['script_name'] = script_name
+                # 记录查询到的原始记录
+                logger.info(f"表 {table_name} 查询到 {len(records)} 条关系记录")
+                for idx, rec in enumerate(records):
+                    logger.info(f"关系记录[{idx}]: source_table={rec.get('source_table')}, script_name={rec.get('script_name')}, " 
+                                f"script_type={rec.get('script_type')}, script_exec_mode={rec.get('script_exec_mode')}")
+                
+                if records:
+                    # 按脚本名称分组源表
+                    scripts_info = {}
+                    for record in records:
+                        script_name = record.get("script_name")
+                        source_table = record.get("source_table")
+                        script_type = record.get("script_type", "python")
+                        script_exec_mode = record.get("script_exec_mode", "append")
+                        
+                        logger.info(f"处理记录: source_table={source_table}, script_name={script_name}")
+                        
+                        # 如果script_name为空,生成默认的脚本名
+                        if not script_name:
+                            script_name = f"{table_name}_process.py"
+                            logger.warning(f"表 {table_name} 的关系中没有script_name属性,使用默认值: {script_name}")
+                            
+                        if script_name not in scripts_info:
+                            scripts_info[script_name] = {
+                                "sources": [],
+                                "script_type": script_type,
+                                "script_exec_mode": script_exec_mode
+                            }
+                        
+                        # 确保source_table有值且不为None才添加到sources列表中
+                        if source_table and source_table not in scripts_info[script_name]["sources"]:
+                            scripts_info[script_name]["sources"].append(source_table)
+                            logger.debug(f"为表 {table_name} 的脚本 {script_name} 添加源表: {source_table}")
                     
-                    # 设置默认值,确保即使属性为空也有默认值
-                    table_info['script_type'] = record.get("script_type", "python")  # 默认为python
-                    table_info['script_exec_mode'] = record.get("script_exec_mode", "append")  # 默认为append
+                    # 处理分组信息
+                    if scripts_info:
+                        # 存储完整的脚本信息
+                        table_info['scripts_info'] = scripts_info
+                        
+                        # 如果只有一个脚本,直接使用它
+                        if len(scripts_info) == 1:
+                            script_name = list(scripts_info.keys())[0]
+                            script_info = scripts_info[script_name]
+                            
+                            table_info['source_tables'] = script_info["sources"]  # 使用数组
+                            table_info['script_name'] = script_name
+                            table_info['script_type'] = script_info["script_type"]
+                            table_info['script_exec_mode'] = script_info["script_exec_mode"]
+                            logger.info(f"表 {table_name} 有单个脚本 {script_name},源表: {script_info['sources']}")
+                        else:
+                            # 如果有多个不同脚本,记录多脚本信息
+                            logger.info(f"表 {table_name} 有多个不同脚本: {list(scripts_info.keys())}")
+                            # 暂时使用第一个脚本的信息作为默认值
+                            first_script = list(scripts_info.keys())[0]
+                            table_info['source_tables'] = scripts_info[first_script]["sources"]
+                            table_info['script_name'] = first_script
+                            table_info['script_type'] = scripts_info[first_script]["script_type"]
+                            table_info['script_exec_mode'] = scripts_info[first_script]["script_exec_mode"]
+                    else:
+                        logger.warning(f"表 {table_name} 未找到有效的脚本信息")
+                        table_info['source_tables'] = []  # 使用空数组
                 else:
                     logger.warning(f"未找到表 {table_name} 的关系信息")
+                    table_info['source_tables'] = []  # 使用空数组
             else:
                 logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
     except Exception as e:
@@ -579,10 +644,17 @@ def prepare_pipeline_dag_schedule(**kwargs):
         resource_tasks = []
         model_tasks = []
         
+        # 遍历所有有效表,创建任务信息
         for table in valid_tables:
+            # 确保每个表对象都有source_tables字段且是一个列表
+            if 'source_tables' not in table or not isinstance(table.get('source_tables'), list):
+                logger.warning(f"表 {table['target_table']} 没有source_tables或不是列表,初始化为空列表")
+                table['source_tables'] = []
+            
+            # 处理资源表任务
             if table.get('target_table_label') == 'DataResource':
                 task_info = {
-                    "source_table": table.get('source_table'),
+                    "source_tables": table.get('source_tables', []),  # 使用数组存储源表
                     "target_table": table['target_table'],
                     "target_table_label": "DataResource",
                     "script_name": table.get('script_name'),
@@ -593,50 +665,114 @@ def prepare_pipeline_dag_schedule(**kwargs):
                 if table.get('target_type') == "structure":
                     task_info["target_type"] = "structure"
                     task_info["storage_location"] = table.get('storage_location')  
-                                  
+                              
                 resource_tasks.append(task_info)
-
+            # 处理模型表任务
             elif table.get('target_table_label') == 'DataModel':
-                model_tasks.append({
-                    "source_table": table.get('source_table'),
-                    "target_table": table['target_table'],
-                    "target_table_label": "DataModel",
-                    "script_name": table.get('script_name'),
-                    "script_exec_mode": table.get('script_exec_mode', 'append'),
-                    "frequency": table.get('frequency')
-                })
+                # 检查是否有多个脚本信息
+                if 'scripts_info' in table and len(table['scripts_info']) > 1:
+                    # 处理多脚本情况,为每个脚本创建单独的任务
+                    logger.info(f"表 {table['target_table']} 有多个脚本,单独处理每个脚本")
+                    
+                    for script_name, script_info in table['scripts_info'].items():
+                        model_tasks.append({
+                            "source_tables": script_info.get("sources", []),  # 使用数组存储源表
+                            "target_table": table['target_table'],
+                            "target_table_label": "DataModel",
+                            "script_name": script_name,
+                            "script_exec_mode": script_info.get("script_exec_mode", 'append'),
+                            "script_type": script_info.get("script_type", 'python'),
+                            "frequency": table.get('frequency')
+                        })
+                else:
+                    # 处理单脚本情况
+                    model_tasks.append({
+                        "source_tables": table.get('source_tables', []),  # 使用数组存储源表
+                        "target_table": table['target_table'],
+                        "target_table_label": "DataModel",
+                        "script_name": table.get('script_name'),
+                        "script_exec_mode": table.get('script_exec_mode', 'append'),
+                        "frequency": table.get('frequency')
+                    })
         
-        # 获取依赖关系
-        model_table_names = [t['target_table'] for t in model_tasks]
+        # 获取和处理依赖关系
         dependencies = {}
+        model_table_names = [t['target_table'] for t in model_tasks]
+        
+        # 初始化依赖关系字典
+        for table_name in model_table_names:
+            dependencies[table_name] = []
         
+        # 查询Neo4j获取依赖关系
         driver = get_neo4j_driver()
         try:
             with driver.session() as session:
+                # 为每个模型表查询依赖
                 for table_name in model_table_names:
                     query = """
                         MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
                         RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
                     """
-                    result = session.run(query, table_name=table_name)
-                    
-                    deps = []
-                    for record in result:
-                        target = record.get("target")
-                        target_labels = record.get("target_labels", [])
+                    try:
+                        # 执行查询
+                        result = session.run(query, table_name=table_name)
+                        
+                        # 尝试获取记录
+                        records = []
+                        try:
+                            if hasattr(result, 'collect'):
+                                records = result.collect()
+                            else:
+                                records = list(result)
+                        except Exception as e:
+                            logger.warning(f"获取表 {table_name} 的依赖关系记录失败: {str(e)}")
+                            records = []
+                        
+                        # 源表列表,用于后续更新model_tasks
+                        source_tables_list = []
+                        
+                        # 处理依赖关系记录
+                        for record in records:
+                            target = record.get("target")
+                            target_labels = record.get("target_labels", [])
+                            
+                            if target:
+                                # 确定依赖表类型
+                                table_type = next((label for label in target_labels 
+                                                 if label in ["DataModel", "DataResource"]), None)
+                                
+                                # 添加依赖关系
+                                dependencies[table_name].append({
+                                    "table_name": target,
+                                    "table_type": table_type
+                                })
+                                
+                                # 记录源表
+                                source_tables_list.append(target)
+                                logger.info(f"添加其他依赖: {table_name} -> {target}")
                         
-                        if target:
-                            table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
-                            deps.append({
-                                "table_name": target,
-                                "table_type": table_type
-                            })
+                        # 更新model_tasks中的source_tables
+                        for mt in model_tasks:
+                            if mt['target_table'] == table_name:
+                                # 确保source_tables是数组
+                                if not isinstance(mt.get('source_tables'), list):
+                                    mt['source_tables'] = []
+                                
+                                # 添加依赖的源表
+                                for source_table in source_tables_list:
+                                    if source_table and source_table not in mt['source_tables']:
+                                        mt['source_tables'].append(source_table)
+                                        logger.info(f"从依赖关系中添加源表 {source_table} 到 {table_name}")
                     
-                    dependencies[table_name] = deps
+                    except Exception as e:
+                        logger.error(f"处理表 {table_name} 的依赖关系时出错: {str(e)}")
+                        
+        except Exception as e:
+            logger.error(f"查询Neo4j依赖关系时出错: {str(e)}")
         finally:
             driver.close()
         
-        # 创建执行计划
+        # 创建最终执行计划
         execution_plan = {
             "exec_date": exec_date,
             "resource_tasks": resource_tasks,
@@ -688,7 +824,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
             error_msg = f"保存执行计划到数据库时出错: {str(db_e)}"
             logger.error(error_msg)
             raise Exception(error_msg)
-                
+            
     except Exception as e:
         error_msg = f"创建或保存执行计划时出错: {str(e)}"
         logger.error(error_msg)