Bläddra i källkod

准备增加传入logical date的功能

wangxq 3 veckor sedan
förälder
incheckning
bd617b4665
1 ändrade filer med 257 tillägg och 22 borttagningar
  1. 257 22
      dags/dataops_productline_manual_trigger_dag.py

+ 257 - 22
dags/dataops_productline_manual_trigger_dag.py

@@ -3,7 +3,10 @@
 手动触发数据产品线脚本执行DAG
 
 功能:
-- 根据指定的脚本名称和目标表名,构建并执行其上游依赖链
+- 支持灵活的参数组合:
+  - 可以只提供脚本名称,自动查找目标表
+  - 可以只提供目标表,智能处理对应的脚本组合
+  - 可以同时提供脚本名称和目标表
 - 支持三种依赖级别:
   - 'self':只执行当前脚本,不处理上游依赖
   - 'dependency':依据脚本之间的直接依赖关系构建执行链
@@ -14,11 +17,28 @@
   - 'sql':从data_transform_scripts表获取SQL脚本内容并执行
 
 参数:
-- script_name:目标脚本名称
-- target_table:目标表名
+- script_name:[可选] 目标脚本名称
+- target_table:[可选] 目标表名
 - dependency_level:依赖级别
 
 使用示例:
+1. 只提供脚本名称:
+{
+    "conf": {
+        "script_name": "book_sale_amt_monthly_process.py",
+        "dependency_level": "dependency"
+    }
+}
+
+2. 只提供目标表:
+{
+    "conf": {
+        "target_table": "book_sale_amt_monthly",
+        "dependency_level": "dependency"
+    }
+}
+
+3. 同时提供脚本名称和目标表:
 {
     "conf": {
         "script_name": "book_sale_amt_monthly_process.py",
@@ -104,11 +124,9 @@ def get_dag_params(**context):
     dependency_level = params.get('dependency_level')
     logger.info(f"获取的依赖级别值: {dependency_level}")
 
-    if not script_name:
-        raise ValueError("必须提供script_name参数")
-        
-    if not target_table:
-        raise ValueError("必须提供target_table参数")
+    # 验证参数组合
+    if not script_name and not target_table:
+        raise ValueError("必须至少提供script_name或target_table参数之一")
     
     # 验证dependency_level参数
     if dependency_level not in ['self', 'dependency', 'full']:
@@ -147,6 +165,129 @@ def get_table_label(table_name):
     finally:
         driver.close()
 
+def find_target_table_for_script(script_name):
+    """
+    根据脚本名称查找对应的目标表
+    
+    参数:
+        script_name (str): 脚本名称
+        
+    返回:
+        str: 目标表名,如果找不到则返回None
+    """
+    driver = GraphDatabase.driver(
+        NEO4J_CONFIG['uri'], 
+        auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
+    )
+    
+    # 首先查找DataModel表中的脚本关系
+    query_datamodel = """
+        MATCH (source)-[rel:DERIVED_FROM]->(target)
+        WHERE rel.script_name = $script_name
+        RETURN source.en_name AS target_table LIMIT 1
+    """
+    
+    # 如果在DataModel中找不到,尝试查找DataResource表
+    query_dataresource = """
+        MATCH (source)-[rel:ORIGINATES_FROM]->(target)
+        WHERE rel.script_name = $script_name
+        RETURN source.en_name AS target_table LIMIT 1
+    """
+    
+    try:
+        with driver.session() as session:
+            # 先查找DataModel
+            result = session.run(query_datamodel, script_name=script_name)
+            record = result.single()
+            
+            if record and record.get("target_table"):
+                return record.get("target_table")
+            
+            # 如果在DataModel中找不到,尝试DataResource
+            result = session.run(query_dataresource, script_name=script_name)
+            record = result.single()
+            
+            if record and record.get("target_table"):
+                return record.get("target_table")
+            
+            logger.warning(f"未找到脚本 {script_name} 对应的目标表")
+            return None
+    except Exception as e:
+        logger.error(f"查找脚本 {script_name} 对应的目标表时出错: {str(e)}")
+        return None
+    finally:
+        driver.close()
+
+def find_scripts_for_table(table_name):
+    """
+    根据表名查找对应的脚本信息
+    
+    参数:
+        table_name (str): 表名
+        
+    返回:
+        list: 脚本信息列表,如果找不到则返回空列表
+    """
+    driver = GraphDatabase.driver(
+        NEO4J_CONFIG['uri'], 
+        auth=(NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
+    )
+    
+    table_label = get_table_label(table_name)
+    scripts = []
+    
+    try:
+        with driver.session() as session:
+            if table_label == "DataModel":
+                # 查询DataModel的所有脚本关系
+                query = """
+                    MATCH (target:DataModel {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
+                    RETURN rel.script_name AS script_name, rel.script_type AS script_type
+                """
+            elif table_label == "DataResource":
+                # 查询DataResource的所有脚本关系
+                query = """
+                    MATCH (target:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
+                    RETURN rel.script_name AS script_name, rel.script_type AS script_type
+                """
+            else:
+                logger.warning(f"表 {table_name} 不是DataModel或DataResource类型")
+                return scripts
+            
+            result = session.run(query, table_name=table_name)
+            
+            for record in result:
+                script_name = record.get("script_name")
+                script_type = record.get("script_type", "python_script")
+                
+                if script_name:
+                    scripts.append({
+                        "script_name": script_name,
+                        "script_type": script_type
+                    })
+            
+            # 如果找不到脚本,使用表名作为基础生成默认脚本名
+            if not scripts:
+                logger.warning(f"表 {table_name} 没有关联的脚本,尝试使用默认脚本名")
+                
+                # 尝试查找可能的默认脚本文件
+                default_script_name = f"{table_name}_process.py"
+                script_path = os.path.join(SCRIPTS_BASE_PATH, default_script_name)
+                
+                if os.path.exists(script_path):
+                    logger.info(f"发现默认脚本文件: {default_script_name}")
+                    scripts.append({
+                        "script_name": default_script_name,
+                        "script_type": "python_script"
+                    })
+    except Exception as e:
+        logger.error(f"查找表 {table_name} 对应的脚本时出错: {str(e)}")
+    finally:
+        driver.close()
+    
+    logger.info(f"表 {table_name} 关联的脚本: {scripts}")
+    return scripts
+
 def get_script_info_from_neo4j(script_name, target_table):
     """
     从Neo4j获取脚本和表的详细信息
@@ -685,6 +826,69 @@ def choose_executor(script_info):
         logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
         return execute_python_script
 
+def prepare_script_info(script_name=None, target_table=None, dependency_level=None):
+    """
+    准备脚本信息,根据输入的参数组合智能处理
+    
+    参数:
+        script_name: [可选] 脚本名称
+        target_table: [可选] 目标表名
+        dependency_level: 依赖级别
+        
+    返回:
+        list: 脚本信息列表
+    """
+    all_script_infos = []
+    
+    # 情况1: 同时提供脚本名称和目标表名
+    if script_name and target_table:
+        logger.info(f"方案1: 同时提供了脚本名称和目标表名")
+        script_info = get_script_info_from_neo4j(script_name, target_table)
+        if script_info:
+            all_script_infos.append(script_info)
+    
+    # 情况2: 只提供脚本名称,自动查找目标表
+    elif script_name and not target_table:
+        logger.info(f"方案2: 只提供了脚本名称,自动查找目标表")
+        target_table = find_target_table_for_script(script_name)
+        if target_table:
+            logger.info(f"找到脚本 {script_name} 对应的目标表: {target_table}")
+            script_info = get_script_info_from_neo4j(script_name, target_table)
+            if script_info:
+                all_script_infos.append(script_info)
+        else:
+            logger.error(f"未找到脚本 {script_name} 对应的目标表")
+    
+    # 情况3: 只提供目标表名,查找并处理相关的脚本
+    elif not script_name and target_table:
+        logger.info(f"方案3: 只提供了目标表名,查找相关的脚本")
+        scripts = find_scripts_for_table(target_table)
+        
+        if not scripts:
+            logger.warning(f"未找到表 {target_table} 关联的脚本")
+            return all_script_infos
+        
+        # 查看是否所有脚本名称都相同
+        script_names = set(script['script_name'] for script in scripts)
+        
+        if len(script_names) == 1:
+            # 如果只有一个不同的脚本名称,处理为单个脚本
+            single_script_name = next(iter(script_names))
+            logger.info(f"表 {target_table} 只关联了一个脚本: {single_script_name}")
+            script_info = get_script_info_from_neo4j(single_script_name, target_table)
+            if script_info:
+                all_script_infos.append(script_info)
+        else:
+            # 如果有多个不同的脚本名称,分别处理每个脚本
+            logger.info(f"表 {target_table} 关联了多个不同脚本: {script_names}")
+            for script in scripts:
+                script_name = script['script_name']
+                script_info = get_script_info_from_neo4j(script_name, target_table)
+                if script_info:
+                    all_script_infos.append(script_info)
+    
+    return all_script_infos
+
 def prepare_dependency_chain(**context):
     """
     准备依赖链并保存到XCom
@@ -699,25 +903,49 @@ def prepare_dependency_chain(**context):
     logger.info(f"- full: 处理所有间接依赖")
     logger.info(f"当前依赖级别: {dependency_level}")
     
-    # 获取脚本信息
-    script_info = get_script_info_from_neo4j(script_name, target_table)
+    # 准备脚本信息
+    script_infos = prepare_script_info(script_name, target_table, dependency_level)
     
-    # 验证脚本信息
-    if not script_info.get('target_table_label'):
-        logger.warning(f"未能确定表 {target_table} 的类型")
+    if not script_infos:
+        logger.error(f"未能获取有效的脚本信息")
         return False
     
-    # 获取脚本依赖链
-    dependency_chain = get_upstream_script_dependencies(script_info, dependency_level)
+    # 获取完整的依赖链
+    all_dependencies = []
     
-    if not dependency_chain:
-        logger.warning(f"没有找到脚本 {script_name} 的依赖链")
+    for script_info in script_infos:
+        # 验证脚本信息
+        if not script_info.get('target_table_label'):
+            logger.warning(f"未能确定表 {script_info.get('target_table')} 的类型")
+            continue
+        
+        # 获取脚本依赖链
+        dependency_chain = get_upstream_script_dependencies(script_info, dependency_level)
+        
+        if dependency_chain:
+            all_dependencies.extend(dependency_chain)
+        else:
+            logger.warning(f"没有找到脚本 {script_info.get('script_name')} 的依赖链")
+    
+    # 去重
+    unique_dependencies = []
+    seen_script_ids = set()
+    
+    for dep in all_dependencies:
+        script_id = dep.get('script_id')
+        if script_id and script_id not in seen_script_ids:
+            seen_script_ids.add(script_id)
+            unique_dependencies.append(dep)
+    
+    if not unique_dependencies:
+        logger.error("没有找到任何有效的依赖链")
         return False
     
     # 保存依赖链到XCom
     ti = context['ti']
-    ti.xcom_push(key='dependency_chain', value=dependency_chain)
+    ti.xcom_push(key='dependency_chain', value=unique_dependencies)
     
+    logger.info(f"成功准备了 {len(unique_dependencies)} 个脚本的依赖链")
     return True
 
 def execute_script_chain(**context):
@@ -846,8 +1074,16 @@ with DAG(
     catchup=False,
     is_paused_upon_creation=False,
     params={
-        'script_name': '',
-        'target_table': '',
+        'script_name': {
+            'type': 'string',
+            'default': '',
+            'description': '[可选] 目标脚本名称'
+        },
+        'target_table': {
+            'type': 'string',
+            'default': '',
+            'description': '[可选] 目标表名'
+        },
         'dependency_level': {
             'type': 'string',
             'enum': ['self', 'dependency', 'full'],
@@ -886,5 +1122,4 @@ with DAG(
     )
     
     # 设置任务依赖关系
-    prepare_task >> execute_task >> report_task >> completed_task
-        # 使用Python脚
+    prepare_task >> execute_task >> report_task >> completed_task