Explorar el Código

修改手工DAG,增加对python/sql/python_script的支持

wangxq hace 3 semanas
padre
commit
e9d59e2fb6
Se han modificado 1 ficheros con 248 adiciones y 15 borrados
  1. 248 15
      dags/dataops_productline_manual_trigger_dag.py

+ 248 - 15
dags/dataops_productline_manual_trigger_dag.py

@@ -1,4 +1,3 @@
-
 # dataops_productline_manual_trigger_dag.py
 """
 手动触发数据产品线脚本执行DAG
@@ -9,6 +8,10 @@
   - 'self':只执行当前脚本,不处理上游依赖
   - 'dependency':依据脚本之间的直接依赖关系构建执行链
   - 'full':构建完整依赖链,包括所有间接依赖
+- 支持三种脚本类型:
+  - 'python_script':执行物理Python脚本文件
+  - 'python':从data_transform_scripts表获取Python脚本内容并执行
+  - 'sql':从data_transform_scripts表获取SQL脚本内容并执行
 
 参数:
 - script_name:目标脚本名称
@@ -166,7 +169,7 @@ def get_script_info_from_neo4j(script_name, target_table):
         'script_id': f"{script_name.replace('.', '_')}_{target_table}",
         'target_table_label': get_table_label(target_table),
         'source_tables': [],
-        'script_type': 'python'  # 默认类型
+        'script_type': 'python_script'  # 默认类型改为python_script,表示物理脚本文件
     }
     
     # 根据表标签类型查询脚本信息和依赖关系
@@ -185,7 +188,7 @@ def get_script_info_from_neo4j(script_name, target_table):
                     source_table = record.get("source_table")
                     source_labels = record.get("source_labels", [])
                     db_script_name = record.get("script_name")
-                    script_type = record.get("script_type", "python")
+                    script_type = record.get("script_type", "python_script")
                     
                     # 验证脚本名称匹配
                     if db_script_name and db_script_name == script_name:
@@ -206,7 +209,7 @@ def get_script_info_from_neo4j(script_name, target_table):
                     source_table = record.get("source_table")
                     source_labels = record.get("source_labels", [])
                     db_script_name = record.get("script_name")
-                    script_type = record.get("script_type", "python")
+                    script_type = record.get("script_type", "python_script")
                     
                     # 验证脚本名称匹配
                     if db_script_name and db_script_name == script_name:
@@ -222,7 +225,7 @@ def get_script_info_from_neo4j(script_name, target_table):
             if script_info['target_table_label'] == 'DataResource':
                 query = """
                     MATCH (n:DataResource {en_name: $table_name})
-                    RETURN n.type AS target_type, n.storage_location AS storage_location
+                    RETURN n.type AS target_type, n.storage_location AS storage_location, n.frequency AS frequency
                 """
                 result = session.run(query, table_name=target_table)
                 record = result.single()
@@ -230,11 +233,14 @@ def get_script_info_from_neo4j(script_name, target_table):
                 if record:
                     target_type = record.get("target_type")
                     storage_location = record.get("storage_location")
+                    frequency = record.get("frequency")
                     
                     if target_type:
                         script_info['target_type'] = target_type
                     if storage_location:
                         script_info['storage_location'] = storage_location
+                    if frequency:
+                        script_info['frequency'] = frequency
     
     except Exception as e:
         logger.error(f"从Neo4j获取脚本 {script_name} 和表 {target_table} 的信息时出错: {str(e)}")
@@ -307,13 +313,13 @@ def get_upstream_script_dependencies(script_info, dependency_level='dependency')
                     if table_label == 'DataModel':
                         query = """
                             MATCH (target:DataModel {en_name: $table_name})<-[rel:DERIVED_FROM]-(source)
-                            RETURN source.en_name AS target_table, rel.script_name AS script_name
+                            RETURN source.en_name AS target_table, rel.script_name AS script_name, rel.script_type AS script_type
                             LIMIT 1
                         """
                     elif table_label == 'DataResource':
                         query = """
                             MATCH (target:DataResource {en_name: $table_name})<-[rel:ORIGINATES_FROM]-(source)
-                            RETURN source.en_name AS target_table, rel.script_name AS script_name
+                            RETURN source.en_name AS target_table, rel.script_name AS script_name, rel.script_type AS script_type
                             LIMIT 1
                         """
                     else:
@@ -326,6 +332,7 @@ def get_upstream_script_dependencies(script_info, dependency_level='dependency')
                     if record and record.get("script_name"):
                         upstream_script_name = record.get("script_name")
                         upstream_target_table = source_table
+                        upstream_script_type = record.get("script_type", "python_script")
                         
                         # 构建上游脚本ID
                         upstream_id = f"{upstream_script_name.replace('.', '_')}_{upstream_target_table}"
@@ -412,13 +419,15 @@ def execute_python_script(script_info):
     execution_mode = script_info.get('execution_mode', 'append')
     target_table_label = script_info.get('target_table_label')
     source_tables = script_info.get('source_tables', [])
+    frequency = script_info.get('frequency', 'daily')
     
     # 记录开始执行
-    logger.info(f"===== 开始执行脚本: {script_name} =====")
+    logger.info(f"===== 开始执行物理Python脚本文件: {script_name} =====")
     logger.info(f"目标表: {target_table}")
     logger.info(f"执行模式: {execution_mode}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"源表: {source_tables}")
+    logger.info(f"频率: {frequency}")
     
     # 检查脚本文件是否存在
     exists, script_path = check_script_exists(script_name)
@@ -441,14 +450,19 @@ def execute_python_script(script_info):
             # 构建函数参数
             run_kwargs = {
                 "table_name": target_table,
-                "execution_mode": execution_mode
+                "execution_mode": execution_mode,
+                "frequency": frequency,
+                "exec_date": datetime.now().strftime('%Y-%m-%d')
             }
             
             # 如果是structure类型,添加特殊参数
             if target_table_label == 'DataResource' and script_info.get('target_type') == 'structure':
                 run_kwargs["target_type"] = script_info.get('target_type')
                 run_kwargs["storage_location"] = script_info.get('storage_location')
-                run_kwargs["frequency"] = script_info.get('frequency', 'daily')
+            
+            # 添加源表
+            if source_tables:
+                run_kwargs["source_tables"] = source_tables
             
             # 执行脚本
             result = module.run(**run_kwargs)
@@ -472,6 +486,205 @@ def execute_python_script(script_info):
         logger.error(traceback.format_exc())
         return False
 
+def execute_sql(script_info):
+    """
+    执行SQL脚本(从data_transform_scripts表获取)
+    
+    参数:
+        script_info: 脚本信息字典
+        
+    返回:
+        bool: 执行成功返回True,失败返回False
+    """
+    script_name = script_info.get('script_name')
+    target_table = script_info.get('target_table')
+    execution_mode = script_info.get('execution_mode', 'append')
+    target_table_label = script_info.get('target_table_label')
+    frequency = script_info.get('frequency', 'daily')
+    
+    # 记录开始执行
+    logger.info(f"===== 开始执行SQL脚本: {script_name} =====")
+    logger.info(f"目标表: {target_table}")
+    logger.info(f"执行模式: {execution_mode}")
+    logger.info(f"表标签: {target_table_label}")
+    logger.info(f"频率: {frequency}")
+    
+    try:
+        # 记录执行开始时间
+        start_time = datetime.now()
+        
+        # 导入execution_sql模块
+        exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
+        if not os.path.exists(exec_sql_path):
+            logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
+            return False
+        
+        # 动态导入execution_sql模块
+        spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
+        exec_sql_module = importlib.util.module_from_spec(spec)
+        spec.loader.exec_module(exec_sql_module)
+        
+        # 检查并调用标准入口函数run
+        if hasattr(exec_sql_module, "run"):
+            logger.info(f"调用SQL执行脚本的标准入口函数 run()")
+            
+            # 构建函数参数
+            run_kwargs = {
+                "script_type": "sql",
+                "target_table": target_table,
+                "script_name": script_name,
+                "exec_date": datetime.now().strftime('%Y-%m-%d'),
+                "frequency": frequency,
+                "target_table_label": target_table_label,
+                "execution_mode": execution_mode
+            }
+            
+            # 如果是structure类型,添加特殊参数
+            if target_table_label == 'DataResource' and script_info.get('target_type') == 'structure':
+                run_kwargs["target_type"] = script_info.get('target_type')
+                run_kwargs["storage_location"] = script_info.get('storage_location')
+            
+            # 添加源表
+            if 'source_tables' in script_info and script_info['source_tables']:
+                run_kwargs["source_tables"] = script_info['source_tables']
+            
+            # 执行脚本
+            result = exec_sql_module.run(**run_kwargs)
+            
+            # 记录结束时间
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            
+            # 确保结果是布尔值
+            if not isinstance(result, bool):
+                result = bool(result)
+            
+            logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+            return result
+        else:
+            logger.error(f"SQL执行模块没有定义标准入口函数 run()")
+            return False
+            
+    except Exception as e:
+        logger.error(f"执行SQL脚本 {script_name} 时出错: {str(e)}")
+        logger.error(traceback.format_exc())
+        return False
+
+def execute_python(script_info):
+    """
+    执行Python脚本(从data_transform_scripts表获取)
+    
+    参数:
+        script_info: 脚本信息字典
+        
+    返回:
+        bool: 执行成功返回True,失败返回False
+    """
+    script_name = script_info.get('script_name')
+    target_table = script_info.get('target_table')
+    execution_mode = script_info.get('execution_mode', 'append')
+    target_table_label = script_info.get('target_table_label')
+    frequency = script_info.get('frequency', 'daily')
+    
+    # 记录开始执行
+    logger.info(f"===== 开始执行Python脚本(data_transform_scripts): {script_name} =====")
+    logger.info(f"目标表: {target_table}")
+    logger.info(f"执行模式: {execution_mode}")
+    logger.info(f"表标签: {target_table_label}")
+    logger.info(f"频率: {frequency}")
+    
+    try:
+        # 记录执行开始时间
+        start_time = datetime.now()
+        
+        # 导入execution_python模块
+        exec_python_path = os.path.join(SCRIPTS_BASE_PATH, "execution_python.py")
+        if not os.path.exists(exec_python_path):
+            logger.error(f"Python执行脚本文件不存在: {exec_python_path}")
+            return False
+        
+        # 动态导入execution_python模块
+        spec = importlib.util.spec_from_file_location("execution_python", exec_python_path)
+        exec_python_module = importlib.util.module_from_spec(spec)
+        spec.loader.exec_module(exec_python_module)
+        
+        # 检查并调用标准入口函数run
+        if hasattr(exec_python_module, "run"):
+            logger.info(f"调用Python执行脚本的标准入口函数 run()")
+            
+            # 构建函数参数
+            run_kwargs = {
+                "script_type": "python",
+                "target_table": target_table,
+                "script_name": script_name,
+                "exec_date": datetime.now().strftime('%Y-%m-%d'),
+                "frequency": frequency,
+                "target_table_label": target_table_label,
+                "execution_mode": execution_mode
+            }
+            
+            # 如果是structure类型,添加特殊参数
+            if target_table_label == 'DataResource' and script_info.get('target_type') == 'structure':
+                run_kwargs["target_type"] = script_info.get('target_type')
+                run_kwargs["storage_location"] = script_info.get('storage_location')
+            
+            # 添加源表
+            if 'source_tables' in script_info and script_info['source_tables']:
+                run_kwargs["source_tables"] = script_info['source_tables']
+            
+            # 执行脚本
+            result = exec_python_module.run(**run_kwargs)
+            
+            # 记录结束时间
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            
+            # 确保结果是布尔值
+            if not isinstance(result, bool):
+                result = bool(result)
+            
+            logger.info(f"Python脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+            return result
+        else:
+            logger.error(f"Python执行模块没有定义标准入口函数 run()")
+            return False
+            
+    except Exception as e:
+        logger.error(f"执行Python脚本 {script_name} 时出错: {str(e)}")
+        logger.error(traceback.format_exc())
+        return False
+
+def choose_executor(script_info):
+    """
+    根据脚本类型选择合适的执行函数
+    
+    参数:
+        script_info: 脚本信息字典
+        
+    返回:
+        function: 执行函数
+    """
+    script_type = script_info.get('script_type', 'python_script').lower()
+    target_table_label = script_info.get('target_table_label')
+    
+    # 根据脚本类型和目标表标签选择执行函数
+    if script_type == 'sql' and target_table_label == 'DataModel':
+        # 使用SQL脚本执行函数
+        logger.info(f"脚本 {script_info['script_id']} 是SQL类型且目标表标签为DataModel,使用execute_sql函数执行")
+        return execute_sql
+    elif script_type == 'python' and target_table_label == 'DataModel':
+        # 使用Python脚本执行函数
+        logger.info(f"脚本 {script_info['script_id']} 是Python类型且目标表标签为DataModel,使用execute_python函数执行")
+        return execute_python
+    elif script_type == 'python_script':
+        # 使用Python脚本文件执行函数
+        logger.info(f"脚本 {script_info['script_id']} 是python_script类型,使用execute_python_script函数执行")
+        return execute_python_script
+    else:
+        # 默认使用Python脚本文件执行函数
+        logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
+        return execute_python_script
+
 def prepare_dependency_chain(**context):
     """
     准备依赖链并保存到XCom
@@ -522,7 +735,7 @@ def execute_script_chain(**context):
     # 记录依赖链信息
     logger.info(f"准备执行依赖链中的 {len(dependency_chain)} 个脚本")
     for idx, script_info in enumerate(dependency_chain, 1):
-        logger.info(f"脚本[{idx}]: {script_info['script_name']} -> {script_info['target_table']}")
+        logger.info(f"脚本[{idx}]: {script_info['script_name']} -> {script_info['target_table']} (类型: {script_info['script_type']})")
     
     # 逐个执行脚本
     all_success = True
@@ -531,16 +744,21 @@ def execute_script_chain(**context):
     for idx, script_info in enumerate(dependency_chain, 1):
         script_name = script_info['script_name']
         target_table = script_info['target_table']
+        script_type = script_info.get('script_type', 'python_script')
         
-        logger.info(f"===== 执行脚本 {idx}/{len(dependency_chain)}: {script_name} -> {target_table} =====")
+        logger.info(f"===== 执行脚本 {idx}/{len(dependency_chain)}: {script_name} -> {target_table} (类型: {script_type}) =====")
+        
+        # 根据脚本类型选择执行函数
+        executor = choose_executor(script_info)
         
         # 执行脚本
-        success = execute_python_script(script_info)
+        success = executor(script_info)
         
         # 记录结果
         result = {
             "script_name": script_name,
             "target_table": target_table,
+            "script_type": script_type,
             "success": success
         }
         results.append(result)
@@ -575,6 +793,14 @@ def generate_execution_report(**context):
     success_count = sum(1 for r in results if r['success'])
     fail_count = total - success_count
     
+    # 统计不同类型脚本数量
+    script_types = {}
+    for result in results:
+        script_type = result.get('script_type', 'python_script')
+        if script_type not in script_types:
+            script_types[script_type] = 0
+        script_types[script_type] += 1
+    
     # 构建报告
     report = []
     report.append("\n========== 脚本执行报告 ==========")
@@ -584,13 +810,19 @@ def generate_execution_report(**context):
     report.append(f"失败数: {fail_count}")
     report.append(f"成功率: {success_count / total * 100:.2f}%")
     
+    # 添加脚本类型统计
+    report.append("\n--- 脚本类型统计 ---")
+    for script_type, count in script_types.items():
+        report.append(f"{script_type}: {count} 个")
+    
     report.append("\n--- 执行详情 ---")
     for idx, result in enumerate(results, 1):
         script_name = result['script_name']
         target_table = result['target_table']
+        script_type = result.get('script_type', 'python_script')
         success = result['success']
         status = "✓ 成功" if success else "✗ 失败"
-        report.append(f"{idx}. {script_name} -> {target_table}: {status}")
+        report.append(f"{idx}. {script_name} -> {target_table} ({script_type}): {status}")
     
     report.append("\n========== 报告结束 ==========")
     
@@ -654,4 +886,5 @@ with DAG(
     )
     
     # 设置任务依赖关系
-    prepare_task >> execute_task >> report_task >> completed_task
+    prepare_task >> execute_task >> report_task >> completed_task
+        # 使用Python脚