소스 검색

修正找不到exec_mode的问题

wangxq 3 주 전
부모
커밋
be150f78b0
1개의 변경된 파일254개의 추가작업 그리고 65개의 파일을 삭제
  1. 254 65
      dags/dataops_productline_manual_trigger_dag.py

+ 254 - 65
dags/dataops_productline_manual_trigger_dag.py

@@ -9,8 +9,8 @@
   - 可以同时提供脚本名称和目标表
 - 支持三种依赖级别:
   - 'self':只执行当前脚本,不处理上游依赖
-  - 'dependency':依据脚本之间的直接依赖关系构建执行链
-  - 'full':构建完整依赖链,包括所有间接依赖
+  - 'resource':到Resource层
+  - 'source':到Source层
 - 支持三种脚本类型:
   - 'python_script':执行物理Python脚本文件
   - 'python':从data_transform_scripts表获取Python脚本内容并执行
@@ -29,7 +29,7 @@
 {
     "conf": {
         "script_name": "book_sale_amt_monthly_process.py",
-        "dependency_level": "dependency"
+        "dependency_level": "resource"
     }
 }
 
@@ -37,7 +37,7 @@
 {
     "conf": {
         "target_table": "book_sale_amt_monthly",
-        "dependency_level": "dependency"
+        "dependency_level": "resource"
     }
 }
 
@@ -46,7 +46,7 @@
     "conf": {
         "script_name": "book_sale_amt_monthly_process.py",
         "target_table": "book_sale_amt_monthly",
-        "dependency_level": "dependency"
+        "dependency_level": "resource"
     }
 }
 """
@@ -103,7 +103,7 @@ def get_pg_conn():
 
 def get_execution_mode(table_name):
     """
-    从PostgreSQL获取表的执行模式
+    从Neo4j获取表的执行模式
     
     参数:
         table_name (str): 表名
@@ -112,37 +112,76 @@ def get_execution_mode(table_name):
         str: 执行模式,如果未找到则返回"append"作为默认值
     """
     try:
-        conn = get_pg_conn()
-        cursor = conn.cursor()
-        cursor.execute("""
-            SELECT execution_mode 
-            FROM table_schedule 
-            WHERE table_name = %s
-        """, (table_name,))
-        result = cursor.fetchone()
-        cursor.close()
-        conn.close()
+        driver = GraphDatabase.driver(
+            NEO4J_CONFIG['uri'], 
+            auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
+        )
         
-        if result:
-            return result[0]
-        else:
-            logger.warning(f"未找到表 {table_name} 的执行模式,使用默认值 'append'")
+        # 先检查是否为structure类型的DataResource
+        with driver.session() as session:
+            query_structure = """
+                MATCH (n:DataResource {en_name: $table_name})
+                RETURN n.type AS type
+            """
+            
+            result = session.run(query_structure, table_name=table_name)
+            record = result.single()
+            
+            if record and record.get("type") == "structure":
+                logger.info(f"表 {table_name} 是structure类型的DataResource,使用默认执行模式'append'")
+                return "append"
+        
+        # 查询执行模式,分别尝试DataModel和DataResource
+        with driver.session() as session:
+            # 首先检查DataModel类型表
+            query_model = """
+                MATCH (n:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->()
+                RETURN rel.exec_mode AS execution_mode LIMIT 1
+            """
+            
+            result = session.run(query_model, table_name=table_name)
+            record = result.single()
+            
+            if record and record.get("execution_mode"):
+                return record.get("execution_mode")
+            
+            # 然后检查DataResource类型表
+            query_resource = """
+                MATCH (n:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->()
+                RETURN rel.exec_mode AS execution_mode LIMIT 1
+            """
+            
+            result = session.run(query_resource, table_name=table_name)
+            record = result.single()
+            
+            if record and record.get("execution_mode"):
+                return record.get("execution_mode")
+            
+            # 如果上面两种方式都找不到,使用默认值
+            logger.warning(f"未在Neo4j中找到表 {table_name} 的执行模式,使用默认值 'append'")
             return "append"
+            
     except Exception as e:
         logger.error(f"获取表 {table_name} 的执行模式时出错: {str(e)}")
         return "append"
+    finally:
+        if driver:
+            driver.close()
 
 def get_dag_params(**context):
     """获取DAG运行参数"""
-    params = context.get('params', {})
-    script_name = params.get('script_name')
-    target_table = params.get('target_table')
+    #params = context.get('params', {})
+    dag_run = context.get('dag_run')
+    params = dag_run.conf if dag_run and dag_run.conf else {}
+    
+    script_name = params.get('script_name', '')
+    target_table = params.get('target_table', '')
     
     # 记录原始参数信息
     logger.info(f"接收到的原始参数: {params}")
     
     # 获取依赖级别参数
-    dependency_level = params.get('dependency_level')
+    dependency_level = params.get('dependency_level', 'self')
     logger.info(f"获取的依赖级别值: {dependency_level}")
 
     # 获取 logical_date
@@ -153,12 +192,13 @@ def get_dag_params(**context):
 
     # 验证参数组合
     if not script_name and not target_table:
+        logger.error("必须至少提供script_name或target_table参数之一")
         raise ValueError("必须至少提供script_name或target_table参数之一")
     
     # 验证dependency_level参数
-    if dependency_level not in ['self', 'dependency', 'full']:
-        logger.warning(f"无效的依赖级别参数: {dependency_level},使用默认值'dependency'")
-        dependency_level = 'dependency'
+    if dependency_level not in ['self', 'resource', 'source']:
+        logger.warning(f"无效的依赖级别参数: {dependency_level},使用默认值'self'")
+        dependency_level = 'self'
     
     logger.info(f"最终使用的参数 - 脚本名称: {script_name}, 目标表: {target_table}, 依赖级别: {dependency_level}, 执行日期: {exec_date}")
     return script_name, target_table, dependency_level, exec_date, logical_date
@@ -265,6 +305,23 @@ def find_scripts_for_table(table_name):
     
     try:
         with driver.session() as session:
+            # 先检查是否为structure类型的DataResource
+            if table_label == "DataResource":
+                query_type = """
+                    MATCH (n:DataResource {en_name: $table_name})
+                    RETURN n.type AS type
+                """
+                result = session.run(query_type, table_name=table_name)
+                record = result.single()
+                
+                if record and record.get("type") == "structure":
+                    logger.info(f"表 {table_name} 是structure类型的DataResource,使用默认脚本'load_file.py'")
+                    scripts.append({
+                        "script_name": "load_file.py",
+                        "script_type": "python_script"
+                    })
+                    return scripts
+                
             if table_label == "DataModel":
                 # 查询DataModel的所有脚本关系
                 query = """
@@ -331,15 +388,53 @@ def get_script_info_from_neo4j(script_name, target_table):
         auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
     )
     
+    # 获取表的标签类型
+    table_label = get_table_label(target_table)
+    
     script_info = {
         'script_name': script_name,
         'target_table': target_table,
         'script_id': f"{script_name.replace('.', '_')}_{target_table}",
-        'target_table_label': get_table_label(target_table),
+        'target_table_label': table_label,
         'source_tables': [],
         'script_type': 'python_script'  # 默认类型改为python_script,表示物理脚本文件
     }
     
+    # 检查是否为structure类型的DataResource
+    try:
+        with driver.session() as session:
+            if table_label == 'DataResource':
+                query_structure = """
+                    MATCH (n:DataResource {en_name: $table_name})
+                    RETURN n.type AS type, n.storage_location AS storage_location, n.frequency AS frequency
+                """
+                result = session.run(query_structure, table_name=target_table)
+                record = result.single()
+                
+                if record and record.get("type") == "structure":
+                    logger.info(f"表 {target_table} 是structure类型的DataResource")
+                    
+                    # 设置特殊属性
+                    script_info['target_type'] = 'structure'
+                    storage_location = record.get("storage_location")
+                    frequency = record.get("frequency", "daily")
+                    
+                    if storage_location:
+                        script_info['storage_location'] = storage_location
+                    script_info['frequency'] = frequency
+                    
+                    # 如果没有指定脚本名称或指定的是default,则设置为load_file.py
+                    if not script_name or script_name.lower() == 'default' or script_name == 'load_file.py':
+                        script_info['script_name'] = 'load_file.py'
+                        script_info['script_id'] = f"load_file_py_{target_table}"
+                        script_info['execution_mode'] = "append"
+                        logger.info(f"对于structure类型的DataResource表 {target_table},使用默认脚本'load_file.py'")
+                        return script_info
+                    
+    except Exception as e:
+        logger.error(f"检查表 {target_table} 是否为structure类型时出错: {str(e)}")
+        logger.error(traceback.format_exc())
+    
     # 根据表标签类型查询脚本信息和依赖关系
     try:
         with driver.session() as session:
@@ -347,8 +442,7 @@ def get_script_info_from_neo4j(script_name, target_table):
                 # 查询DataModel的上游依赖
                 query = """
                     MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
-                    RETURN source.en_name AS source_table, labels(source) AS source_labels, 
-                           rel.script_name AS script_name, rel.script_type AS script_type
+                    RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
                 """
                 result = session.run(query, table_name=target_table)
                 
@@ -368,8 +462,7 @@ def get_script_info_from_neo4j(script_name, target_table):
                 # 查询DataResource的上游依赖
                 query = """
                     MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
-                    RETURN source.en_name AS source_table, labels(source) AS source_labels, 
-                           rel.script_name AS script_name, rel.script_type AS script_type
+                    RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
                 """
                 result = session.run(query, table_name=target_table)
                 
@@ -409,6 +502,13 @@ def get_script_info_from_neo4j(script_name, target_table):
                         script_info['storage_location'] = storage_location
                     if frequency:
                         script_info['frequency'] = frequency
+                    
+                    # 如果是structure类型,再次检查是否应使用默认脚本
+                    if target_type == 'structure' and (not script_name or script_name.lower() == 'default' or script_name == 'load_file.py'):
+                        script_info['script_name'] = 'load_file.py'
+                        script_info['script_id'] = f"load_file_py_{target_table}"
+                        script_info['execution_mode'] = "append"
+                        logger.info(f"对于structure类型的DataResource表 {target_table},使用默认脚本'load_file.py'")
     
     except Exception as e:
         logger.error(f"从Neo4j获取脚本 {script_name} 和表 {target_table} 的信息时出错: {str(e)}")
@@ -422,7 +522,7 @@ def get_script_info_from_neo4j(script_name, target_table):
     logger.info(f"获取到脚本信息: {script_info}")
     return script_info
 
-def get_upstream_script_dependencies(script_info, dependency_level='dependency'):
+def get_upstream_script_dependencies(script_info, dependency_level='resource'):
     """
     获取脚本的上游依赖
 
@@ -430,8 +530,8 @@ def get_upstream_script_dependencies(script_info, dependency_level='dependency')
         script_info: 脚本信息
         dependency_level: 依赖级别
             - self: 只考虑当前脚本
-            - dependency: 考虑直接依赖
-            - full: 考虑所有间接依赖
+            - resource: 到Resource层
+            - source: 到Source层
     
     返回:
         list: 依赖链脚本列表
@@ -475,19 +575,76 @@ def get_upstream_script_dependencies(script_info, dependency_level='dependency')
             
             try:
                 with driver.session() as session:
-                    # 根据表的类型,查询不同的关系
+                    # 获取表的类型
                     table_label = get_table_label(source_table)
                     
+                    # 检查依赖级别和表类型
+                    # 如果是resource级别且表类型是DataSource,则不继续处理
+                    if dependency_level == 'resource' and table_label == 'DataSource':
+                        logger.info(f"依赖级别为'resource',跳过DataSource类型表: {source_table}")
+                        continue
+                    
+                    # 如果是DataResource表,检查是否为structure类型
+                    if table_label == 'DataResource':
+                        query_structure = """
+                            MATCH (n:DataResource {en_name: $table_name})
+                            RETURN n.type AS type, n.storage_location AS storage_location, n.frequency AS frequency
+                        """
+                        result = session.run(query_structure, table_name=source_table)
+                        record = result.single()
+                        
+                        if record and record.get("type") == "structure":
+                            logger.info(f"上游表 {source_table} 是structure类型的DataResource,使用默认脚本'load_file.py'")
+                            
+                            # 构建structure类型表的脚本信息
+                            upstream_script_name = "load_file.py"
+                            upstream_target_table = source_table
+                            upstream_id = f"load_file_py_{upstream_target_table}"
+                            
+                            # 如果还没有处理过这个脚本
+                            if upstream_id not in visited:
+                                # 创建脚本信息
+                                upstream_info = {
+                                    'script_name': upstream_script_name,
+                                    'target_table': upstream_target_table,
+                                    'script_id': upstream_id,
+                                    'target_table_label': 'DataResource',
+                                    'target_type': 'structure',
+                                    'source_tables': [],
+                                    'script_type': 'python_script',
+                                    'execution_mode': 'append',
+                                    'frequency': record.get("frequency", "daily")
+                                }
+                                
+                                # 添加storage_location如果存在
+                                if record.get("storage_location"):
+                                    upstream_info["storage_location"] = record.get("storage_location")
+                                
+                                # 添加到图中
+                                G.add_node(upstream_id, **upstream_info)
+                                G.add_edge(current_id, upstream_id)
+                                
+                                # 记录处理过的脚本
+                                processed_scripts[upstream_id] = upstream_info
+                                
+                                # 添加到已访问集合
+                                visited.add(upstream_id)
+                                queue.append(upstream_info)
+                            
+                            # 处理完structure类型表后继续下一个源表
+                            continue
+                    
+                    # 根据表的类型,查询不同的关系
                     if table_label == 'DataModel':
                         query = """
-                            MATCH (target:DataModel {en_name: $table_name})<-[rel:DERIVED_FROM]-(source)
-                            RETURN source.en_name AS target_table, rel.script_name AS script_name, rel.script_type AS script_type
+                            MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
+                            RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
                             LIMIT 1
                         """
                     elif table_label == 'DataResource':
                         query = """
-                            MATCH (target:DataResource {en_name: $table_name})<-[rel:ORIGINATES_FROM]-(source)
-                            RETURN source.en_name AS target_table, rel.script_name AS script_name, rel.script_type AS script_type
+                            MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
+                            RETURN source.en_name AS source_table, rel.script_name AS script_name, rel.script_type AS script_type
                             LIMIT 1
                         """
                     else:
@@ -501,12 +658,16 @@ def get_upstream_script_dependencies(script_info, dependency_level='dependency')
                         upstream_script_name = record.get("script_name")
                         upstream_target_table = source_table
                         upstream_script_type = record.get("script_type", "python_script")
+                        source_table_from_db = record.get("source_table")
+                        
+                        # 记录源表信息
+                        if source_table_from_db:
+                            logger.info(f"表 {source_table} 的上游源表: {source_table_from_db}")
                         
                         # 构建上游脚本ID
                         upstream_id = f"{upstream_script_name.replace('.', '_')}_{upstream_target_table}"
                         
-                        # 如果是full级别,或者是dependency级别且是直接依赖,则处理
-                        if dependency_level == 'full' or (dependency_level == 'dependency' and upstream_id not in visited):
+                        if upstream_id not in visited:
                             # 获取完整的上游脚本信息
                             upstream_info = get_script_info_from_neo4j(upstream_script_name, upstream_target_table)
                             
@@ -517,10 +678,9 @@ def get_upstream_script_dependencies(script_info, dependency_level='dependency')
                             # 记录处理过的脚本
                             processed_scripts[upstream_id] = upstream_info
                             
-                            # 只有full级别才继续递归处理
-                            if dependency_level == 'full' and upstream_id not in visited:
-                                visited.add(upstream_id)
-                                queue.append(upstream_info)
+                            # 继续递归处理,除非遇到限制条件
+                            visited.add(upstream_id)
+                            queue.append(upstream_info)
                     else:
                         logger.warning(f"未找到表 {source_table} 对应的脚本信息")
             
@@ -876,6 +1036,11 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
     """
     all_script_infos = []
     
+    # 如果script_name和target_table都为空或None
+    if not script_name and not target_table:
+        logger.error("script_name和target_table参数都为空,无法确定要执行的脚本")
+        raise ValueError("必须至少提供script_name或target_table参数之一")
+    
     # 情况1: 同时提供脚本名称和目标表名
     if script_name and target_table:
         logger.info(f"方案1: 同时提供了脚本名称和目标表名")
@@ -898,10 +1063,47 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
     # 情况3: 只提供目标表名,查找并处理相关的脚本
     elif not script_name and target_table:
         logger.info(f"方案3: 只提供了目标表名,查找相关的脚本")
+        
+        # 首先检查是否为structure类型的DataResource表
+        table_label = get_table_label(target_table)
+        
+        if table_label == 'DataResource':
+            driver = GraphDatabase.driver(
+                NEO4J_CONFIG['uri'], 
+                auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
+            )
+            
+            try:
+                with driver.session() as session:
+                    query = """
+                        MATCH (n:DataResource {en_name: $table_name})
+                        RETURN n.type AS type
+                    """
+                    result = session.run(query, table_name=target_table)
+                    record = result.single()
+                    
+                    if record and record.get("type") == "structure":
+                        logger.info(f"表 {target_table} 是structure类型的DataResource,使用默认脚本'load_file.py'")
+                        script_info = get_script_info_from_neo4j('load_file.py', target_table)
+                        if script_info:
+                            all_script_infos.append(script_info)
+                            return all_script_infos
+            finally:
+                driver.close()
+        
+        # 如果不是structure类型,使用原有逻辑查找脚本
         scripts = find_scripts_for_table(target_table)
         
         if not scripts:
             logger.warning(f"未找到表 {target_table} 关联的脚本")
+            
+            # 如果是DataResource的表,再次检查是否为structure类型
+            if table_label == 'DataResource':
+                logger.info(f"尝试使用默认脚本'load_file.py'处理表 {target_table}")
+                script_info = get_script_info_from_neo4j('load_file.py', target_table)
+                if script_info:
+                    all_script_infos.append(script_info)
+            
             return all_script_infos
         
         # 查看是否所有脚本名称都相同
@@ -935,8 +1137,8 @@ def prepare_dependency_chain(**context):
     # 记录依赖级别信息
     logger.info(f"依赖级别说明:")
     logger.info(f"- self: 只执行当前脚本,不处理上游依赖")
-    logger.info(f"- dependency: 处理直接依赖")
-    logger.info(f"- full: 处理所有间接依赖")
+    logger.info(f"- resource: 到Resource层")
+    logger.info(f"- source: 到Source层")
     logger.info(f"当前依赖级别: {dependency_level}")
     logger.info(f"执行日期: {exec_date}")
     
@@ -1127,27 +1329,14 @@ def generate_execution_report(**context):
 with DAG(
     'dataops_productline_manual_trigger_dag',
     default_args=default_args,
-    description='手动触发指定脚本的执行,支持三种依赖级别:self(仅当前脚本)、dependency(直接依赖)、full(所有间接依赖)',
+    description='script_name和target_table可以二选一,支持三种依赖级别:self(仅当前表或脚本)、resource(到Resource层)、source(到Source层)',
     schedule_interval=None,  # 设置为None表示只能手动触发
     catchup=False,
     is_paused_upon_creation=False,
     params={
-        'script_name': {
-            'type': 'string',
-            'default': '',
-            'description': '[可选] 目标脚本名称'
-        },
-        'target_table': {
-            'type': 'string',
-            'default': '',
-            'description': '[可选] 目标表名'
-        },
-        'dependency_level': {
-            'type': 'string',
-            'enum': ['self', 'dependency', 'full'],
-            'default': 'dependency',
-            'description': '依赖级别: self-仅当前脚本, dependency-直接依赖, full-所有间接依赖'
-        }
+        "script_name": "",
+        "target_table": "",
+        "dependency_level": "self"
     },
 ) as dag: