import json import logging import requests from flask import request, jsonify, current_app from app.api.production_line import bp from app.models.result import success, failed from app.api.graph.routes import MyEncoder, connect_graph from app.core.production_line import production_draw_graph # from app.core.production_line.production_line import execute_production_line # 注释掉原来的导入 logger = logging.getLogger(__name__) # 生产线列表 @bp.route('/production/line/list', methods=['POST']) def production_line_list(): """ 获取生产线列表,支持分页和名称过滤 Args (通过JSON请求体): current (int): 当前页码,默认为1 size (int): 每页大小,默认为10 name (str, optional): 名称过滤条件 Returns: JSON: 包含生产线列表和分页信息的响应 """ try: receiver = request.get_json() page = int(receiver.get('current', 1)) page_size = int(receiver.get('size', 10)) name_filter = receiver.get('name', None) # 计算跳过的记录的数量 skip_count = (page - 1) * page_size if name_filter: where_clause = f"n.name CONTAINS'{name_filter}'" else: where_clause = "TRUE" cql = f""" MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause} WITH id(n) AS id, n.name AS name, labels(n)[0] AS type,n RETURN {{ id: id, name: name, en_name: n.en_name, type: type }} AS result,n.time as time ORDER BY time desc SKIP {skip_count} LIMIT {page_size} """ # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder) with driver.session() as session: result = session.run(cql) data = result.data() records = [] for item in data: records.append(item['result']) # 获取总量 total_query = f"MATCH (n) WHERE (n:data_model OR n:data_resource OR n:data_metric) AND {where_clause}" \ f" RETURN COUNT(n) AS total" total_result = session.run(total_query).single()["total"] response_data = {'records': records, 'total': total_result, 'size': page_size, 'current': page} res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 根据生产线列表,传入id,绘制图谱 @bp.route('/production/line/graph', methods=['POST']) def production_line_graph(): """ 根据生产线ID绘制关系图谱 Args (通过JSON请求体): id (int): 节点ID Returns: JSON: 包含图谱数据的响应 """ try: # 获取请求参数 receiver = request.get_json() if not receiver or 'id' not in receiver: return json.dumps(failed("缺少必要参数: id"), ensure_ascii=False, cls=MyEncoder) id = receiver['id'] # 修改: 专门处理ID为0的情况,将ID类型视为整数 if id is None: return json.dumps(failed("节点ID不能为空"), ensure_ascii=False, cls=MyEncoder) # 确保ID是整数类型 try: id = int(id) except (ValueError, TypeError): return json.dumps(failed("节点ID必须是整数"), ensure_ascii=False, cls=MyEncoder) # 修复:使用正确的session方式执行查询 driver = connect_graph() if not driver: return json.dumps(failed("无法连接到数据库"), ensure_ascii=False, cls=MyEncoder) with driver.session() as session: # 检查节点是否存在 check_query = """ MATCH (n) WHERE id(n) = $nodeId RETURN labels(n)[0] as type, n.name as name """ result = session.run(check_query, nodeId=id) record = result.single() if not record: return json.dumps(failed(f"节点不存在: ID={id}"), ensure_ascii=False, cls=MyEncoder) type = record["type"] # 生成图谱 data = production_draw_graph(id, type) # 检查返回结果是否包含错误 if "error" in data: error_msg = data.pop("error") return json.dumps(failed(error_msg, data), ensure_ascii=False, cls=MyEncoder) return json.dumps(success(data, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"生成图谱失败: {str(e)}") return json.dumps(failed(str(e)), ensure_ascii=False, cls=MyEncoder) """ Manual execution API endpoint Author: paul Date: 2024-03-20 """ @bp.route('/production/line/execute', methods=['POST']) def production_line_execute(): """ 手动执行数据资源加载 - 通过调用 Airflow REST API Args (通过JSON请求体): id (int): 数据资源ID target_table (str): 目标表名 dependency_level (str): 依赖级别,默认为 "resource" Returns: JSON: 执行结果 """ try: # 获取请求参数 receiver = request.get_json() if not receiver: return jsonify(failed("请求体不能为空")) #resource_id = receiver.get('id') target_table = receiver.get('target_table') dependency_level = receiver.get('dependency_level', 'resource') if not target_table: return jsonify(failed("目标表名不能为空")) # Airflow API 配置 - 从配置文件获取 AIRFLOW_API_ENDPOINT = '/api/v1/dags/dataops_productline_manual_trigger_dag/dagRuns' airflow_base_url = current_app.config['AIRFLOW_BASE_URL'] airflow_url = f"{airflow_base_url}{AIRFLOW_API_ENDPOINT}" # 构建请求数据 payload = { "conf": { "target_table": target_table, "dependency_level": dependency_level } } # 设置认证信息(Basic Auth) - 从配置文件获取 auth = (current_app.config['AIRFLOW_AUTH_USER'], current_app.config['AIRFLOW_AUTH_PASSWORD']) # 设置请求头 headers = { 'Content-Type': 'application/json' } # 记录请求信息到日志 logger.info(f"准备调用 Airflow API: {airflow_url}") logger.info(f"请求参数: {json.dumps(payload, ensure_ascii=False, indent=2)}") # 调用 Airflow API response = requests.post( airflow_url, json=payload, auth=auth, headers=headers, timeout=30 ) # 检查响应状态 if response.status_code == 200: airflow_result = response.json() # 打印Airflow API返回的结果到日志 logger.info(f"Airflow API 返回结果: {json.dumps(airflow_result, ensure_ascii=False, indent=2)}") # 获取状态 state = airflow_result.get("state") # 根据状态设置消息 if state == "queued": message = "触发成功,正在执行" logger.info(f"DAG触发成功,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}") # 返回成功响应,包含完整的 Airflow 返回结果 return jsonify(success(airflow_result, message)) else: message = "无法触发执行,请联系管理员" logger.warning(f"DAG触发状态异常,状态: {state}, 运行ID: {airflow_result.get('dag_run_id')}") # 即使状态不是 queued,也返回完整的 Airflow 结果,但使用 failed 状态 return jsonify(failed(message, airflow_result)) else: # 处理错误响应 error_msg = f"Airflow API 调用失败,状态码: {response.status_code}" try: error_detail = response.json() error_msg += f",错误详情: {error_detail}" except: error_msg += f",响应内容: {response.text}" logger.error(error_msg) return jsonify(failed(error_msg)) except requests.exceptions.Timeout: error_msg = "调用 Airflow API 超时" logger.error(error_msg) return jsonify(failed(error_msg)) except requests.exceptions.ConnectionError: error_msg = "无法连接到 Airflow 服务器" logger.error(error_msg) return jsonify(failed(error_msg)) except requests.exceptions.RequestException as e: error_msg = f"请求 Airflow API 时发生错误: {str(e)}" logger.error(error_msg) return jsonify(failed(error_msg)) except Exception as e: error_msg = f"执行数据资源加载失败: {str(e)}" logger.error(error_msg) return jsonify(failed(error_msg))