Przeglądaj źródła

修改production_line的list api 增加en_name;修改producttion_line的production_line_execute,使用airflow rest api执行手工调度。

wangxq 1 miesiąc temu
rodzic
commit
2b71248c67
2 zmienionych plików z 112 dodań i 14 usunięć
  1. 101 14
      app/api/production_line/routes.py
  2. 11 0
      app/config/config.py

+ 101 - 14
app/api/production_line/routes.py

@@ -1,11 +1,12 @@
 import json
 import logging
-from flask import request, jsonify
+import requests
+from flask import request, jsonify, current_app
 from app.api.production_line import bp
 from app.models.result import success, failed
 from app.api.graph.routes import MyEncoder, connect_graph
 from app.core.production_line import production_draw_graph
-from app.core.production_line.production_line import execute_production_line
+# from app.core.production_line.production_line import execute_production_line  # 注释掉原来的导入
 
 
 logger = logging.getLogger(__name__)
@@ -44,6 +45,7 @@ def production_line_list():
         RETURN  {{
             id: id,
             name: name,
+            en_name: n.en_name,
             type: type
         }} AS result,n.time as time
         ORDER BY time desc
@@ -149,28 +151,113 @@ Date: 2024-03-20
 @bp.route('/production/line/execute', methods=['POST'])
 def production_line_execute():
     """
-    手动执行数据资源加载
+    手动执行数据资源加载 - 通过调用 Airflow REST API
     
     Args (通过JSON请求体):
         id (int): 数据资源ID
+        target_table (str): 目标表名
+        dependency_level (str): 依赖级别,默认为 "resource"
         
     Returns:
         JSON: 执行结果
     """
     try:
-        # 获取资源ID
-        resource_id = request.json.get('id')
-        if resource_id is None:  # 修改检查逻辑,只有当ID为None时才报错
-            return jsonify(failed("资源ID不能为空"))
+        # 获取请求参数
+        receiver = request.get_json()
+        if not receiver:
+            return jsonify(failed("请求体不能为空"))
+            
+        #resource_id = receiver.get('id')
+        target_table = receiver.get('target_table')
+        dependency_level = receiver.get('dependency_level', 'resource')
             
-        # 执行加载
-        result = execute_production_line(resource_id)
+        if not target_table:
+            return jsonify(failed("目标表名不能为空"))
+        
+        # Airflow API 配置 - 从配置文件获取
+        AIRFLOW_API_ENDPOINT = '/api/v1/dags/dataops_productline_manual_trigger_dag/dagRuns'
+        airflow_base_url = current_app.config['AIRFLOW_BASE_URL']
+        airflow_url = f"{airflow_base_url}{AIRFLOW_API_ENDPOINT}"
+        
+        # 构建请求数据
+        payload = {
+            "conf": {
+                "target_table": target_table,
+                "dependency_level": dependency_level
+            }
+        }
+        
+        # 设置认证信息(Basic Auth) - 从配置文件获取
+        auth = (current_app.config['AIRFLOW_AUTH_USER'], current_app.config['AIRFLOW_AUTH_PASSWORD'])
+        
+        # 设置请求头
+        headers = {
+            'Content-Type': 'application/json'
+        }
+        
+        # 记录请求信息到日志
+        logger.info(f"准备调用 Airflow API: {airflow_url}")
+        logger.info(f"请求参数: {json.dumps(payload, ensure_ascii=False, indent=2)}")
+        
+        # 调用 Airflow API
+        response = requests.post(
+            airflow_url,
+            json=payload,
+            auth=auth,
+            headers=headers,
+            timeout=30
+        )
         
-        if result['status'] == 'success':
-            return jsonify(success(result))
+        # 检查响应状态
+        if response.status_code == 200:
+            airflow_result = response.json()
+            
+            # 打印Airflow API返回的结果到日志
+            logger.info(f"Airflow API 返回结果: {json.dumps(airflow_result, ensure_ascii=False, indent=2)}")
+            
+            # 获取状态
+            state = airflow_result.get("state")
+            
+            # 根据状态设置消息
+            if state == "queued":
+                message = "触发成功,正在执行"
+                logger.info(f"DAG触发成功,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}")
+                # 返回成功响应,包含完整的 Airflow 返回结果
+                return jsonify(success(airflow_result, message))
+            else:
+                message = "无法触发执行,请联系管理员"
+                logger.warning(f"DAG触发状态异常,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}")
+                # 即使状态不是 queued,也返回完整的 Airflow 结果,但使用 failed 状态
+                return jsonify(failed(message, airflow_result))
+            
         else:
-            return jsonify(failed(result['message']))
+            # 处理错误响应
+            error_msg = f"Airflow API 调用失败,状态码: {response.status_code}"
+            try:
+                error_detail = response.json()
+                error_msg += f",错误详情: {error_detail}"
+            except:
+                error_msg += f",响应内容: {response.text}"
+                
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
             
+    except requests.exceptions.Timeout:
+        error_msg = "调用 Airflow API 超时"
+        logger.error(error_msg)
+        return jsonify(failed(error_msg))
+        
+    except requests.exceptions.ConnectionError:
+        error_msg = "无法连接到 Airflow 服务器"
+        logger.error(error_msg)
+        return jsonify(failed(error_msg))
+        
+    except requests.exceptions.RequestException as e:
+        error_msg = f"请求 Airflow API 时发生错误: {str(e)}"
+        logger.error(error_msg)
+        return jsonify(failed(error_msg))
+        
     except Exception as e:
-        logger.error(f"执行数据资源加载失败: {str(e)}")
-        return jsonify(failed(str(e))) 
+        error_msg = f"执行数据资源加载失败: {str(e)}"
+        logger.error(error_msg)
+        return jsonify(failed(error_msg)) 

+ 11 - 0
app/config/config.py

@@ -54,6 +54,7 @@ class BaseConfig:
     LOG_FORMAT = '%(asctime)s - %(levelname)s - %(filename)s - %(funcName)s - %(lineno)s - %(message)s'
     LOG_ENCODING = 'UTF-8'
     LOG_ENABLED = True
+    
 
 class DevelopmentConfig(BaseConfig):
     """Windows 开发环境配置"""
@@ -87,6 +88,11 @@ class DevelopmentConfig(BaseConfig):
     LOG_LEVEL = 'DEBUG'
     LOG_FILE = 'flask_development.log'
     LOG_TO_CONSOLE = True
+    
+    # 开发环境 Airflow 配置
+    AIRFLOW_BASE_URL = 'http://localhost:8080'
+    AIRFLOW_AUTH_USER = 'admin'
+    AIRFLOW_AUTH_PASSWORD = 'admin'
 
 class ProductionConfig(BaseConfig):
     """Linux 生产环境配置"""
@@ -120,6 +126,11 @@ class ProductionConfig(BaseConfig):
     LOG_LEVEL = 'INFO'
     LOG_FILE = 'flask_production.log'
     LOG_TO_CONSOLE = False
+    
+    # 生产环境 Airflow 配置
+    AIRFLOW_BASE_URL = os.environ.get('AIRFLOW_BASE_URL', 'http://192.168.3.143:8080')
+    AIRFLOW_AUTH_USER = os.environ.get('AIRFLOW_AUTH_USER', 'admin')
+    AIRFLOW_AUTH_PASSWORD = os.environ.get('AIRFLOW_AUTH_PASSWORD', 'admin')
 
 # 配置字典
 config = {