""" Business Domain API 路由模块 提供业务领域相关的 RESTful API 接口 """ import io import json import time import logging import traceback import urllib.parse from flask import request, jsonify, current_app, send_file from minio import Minio from minio.error import S3Error from app.api.business_domain import bp from app.models.result import success, failed from app.services.neo4j_driver import neo4j_driver from app.core.llm.ddl_parser import DDLParser from app.core.business_domain import ( business_domain_list, get_business_domain_by_id, delete_business_domain, update_business_domain, save_business_domain, business_domain_graph_all, business_domain_search_list, business_domain_compose, business_domain_label_list, ) logger = logging.getLogger("app") # ----------------------- MinIO helpers ----------------------- def get_minio_client(): """获取 MinIO 客户端实例""" return Minio( current_app.config["MINIO_HOST"], access_key=current_app.config["MINIO_USER"], secret_key=current_app.config["MINIO_PASSWORD"], secure=current_app.config["MINIO_SECURE"], ) def get_minio_config(): """获取 MinIO 配置""" return { "MINIO_BUCKET": current_app.config["MINIO_BUCKET"], "PREFIX": current_app.config.get( "BUSINESS_DOMAIN_PREFIX", "business_domain" ), "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"], } def allowed_file(filename): """检查文件扩展名是否允许""" if "." not in filename: return False ext = filename.rsplit(".", 1)[1].lower() return ext in get_minio_config()["ALLOWED_EXTENSIONS"] # ----------------------- Business Domain APIs ----------------------- @bp.route("/list", methods=["POST"]) def bd_list(): """获取业务领域列表""" try: if not request.json: return jsonify(failed("请求数据不能为空")) page = int(request.json.get("current", 1)) page_size = int(request.json.get("size", 10)) name_en_filter = request.json.get("name_en") name_zh_filter = request.json.get("name_zh") type_filter = request.json.get("type", "all") category_filter = request.json.get("category") tag_filter = request.json.get("tag") domains, total_count = business_domain_list( page, page_size, name_en_filter, name_zh_filter, type_filter, category_filter, tag_filter, ) return jsonify( success( { "records": domains, "total": total_count, "size": page_size, "current": page, } ) ) except Exception as e: logger.error(f"获取业务领域列表失败: {str(e)}") return jsonify(failed("获取业务领域列表失败", error=str(e))) @bp.route("/detail", methods=["POST"]) def bd_detail(): """获取业务领域详情""" try: if not request.json: return jsonify(failed("请求数据不能为空")) domain_id = request.json.get("id") if domain_id is None: return jsonify(failed("业务领域ID不能为空")) try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}")) domain_data = get_business_domain_by_id(domain_id) if not domain_data: return jsonify(failed("业务领域不存在")) return jsonify(success(domain_data)) except Exception as e: logger.error(f"获取业务领域详情失败: {str(e)}") return jsonify(failed("获取业务领域详情失败", error=str(e))) @bp.route("/delete", methods=["POST"]) def bd_delete(): """删除业务领域""" try: if not request.json: return jsonify(failed("请求数据不能为空")) domain_id = request.json.get("id") if domain_id is None: return jsonify(failed("业务领域ID不能为空")) result = delete_business_domain(domain_id) if result: return jsonify(success({"message": "业务领域删除成功"})) return jsonify(failed("业务领域删除失败")) except Exception as e: logger.error(f"删除业务领域失败: {str(e)}") return jsonify(failed("删除业务领域失败", error=str(e))) @bp.route("/save", methods=["POST"]) def bd_save(): """保存业务领域(新建或更新)""" try: data = request.json if not data: return jsonify(failed("请求数据不能为空")) if not data.get("id"): if not data.get("name_zh") or not data.get("name_en"): return jsonify(failed("新建时 name_zh 和 name_en 为必填项")) saved_data = save_business_domain(data) return jsonify(success(saved_data)) except Exception as e: logger.error(f"保存业务领域失败: {str(e)}") return jsonify(failed("保存业务领域失败", error=str(e))) @bp.route("/update", methods=["POST"]) def bd_update(): """更新业务领域""" try: data = request.json if not data or "id" not in data: return jsonify(failed("参数不完整")) updated_data = update_business_domain(data) return jsonify(success(updated_data)) except Exception as e: logger.error(f"更新业务领域失败: {str(e)}") return jsonify(failed("更新业务领域失败", error=str(e))) @bp.route("/upload", methods=["POST"]) def bd_upload(): """上传业务领域相关文件""" response = None try: if "file" not in request.files: return jsonify(failed("没有找到上传的文件")) file = request.files["file"] if file.filename == "": return jsonify(failed("未选择文件")) if not allowed_file(file.filename): return jsonify(failed("不支持的文件类型")) minio_client = get_minio_client() config = get_minio_config() file_content = file.read() file_size = len(file_content) filename = file.filename or "" file_type = filename.rsplit(".", 1)[1].lower() filename_without_ext = filename.rsplit(".", 1)[0] timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) object_name = ( f"{config['PREFIX']}/" f"{filename_without_ext}_{timestamp}.{file_type}" ) minio_client.put_object( config["MINIO_BUCKET"], object_name, io.BytesIO(file_content), file_size, content_type=f"application/{file_type}", ) logger.info(f"文件上传成功: {object_name}, 大小: {file_size}") return jsonify( success( { "filename": file.filename, "size": file_size, "type": file_type, "url": object_name, } ) ) except Exception as e: logger.error(f"文件上传失败: {str(e)}") return jsonify(failed("文件上传失败", error=str(e))) finally: if response: response.close() response.release_conn() @bp.route("/download", methods=["GET"]) def bd_download(): """下载业务领域相关文件""" response = None try: object_name = request.args.get("url") if not object_name: return jsonify(failed("文件路径不能为空")) object_name = urllib.parse.unquote(object_name) logger.info(f"下载文件请求: {object_name}") minio_client = get_minio_client() config = get_minio_config() try: response = minio_client.get_object( config["MINIO_BUCKET"], object_name ) file_data = response.read() except S3Error as e: logger.error(f"MinIO获取文件失败: {str(e)}") return jsonify(failed(f"文件获取失败: {str(e)}")) file_name = object_name.split("/")[-1] file_stream = io.BytesIO(file_data) return send_file( file_stream, as_attachment=True, download_name=file_name, mimetype="application/octet-stream", ) except Exception as e: logger.error(f"文件下载失败: {str(e)}") return jsonify(failed("文件下载失败", error=str(e))) finally: if response: response.close() response.release_conn() @bp.route("/graphall", methods=["POST"]) def bd_graph_all(): """获取业务领域完整关系图谱""" try: if not request.json: return jsonify(failed("请求数据不能为空")) domain_id = request.json.get("id") include_meta = request.json.get("meta", True) if domain_id is None: return jsonify(failed("业务领域ID不能为空")) try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}")) graph_data = business_domain_graph_all(domain_id, include_meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取业务领域图谱失败: {str(e)}") return jsonify(failed("获取业务领域图谱失败", error=str(e))) @bp.route("/ddlparse", methods=["POST"]) def bd_ddl_parse(): """解析DDL语句,用于业务领域创建""" try: sql_content = "" if "file" in request.files: file = request.files["file"] if file and file.filename: if not file.filename.lower().endswith(".sql"): return jsonify(failed("只接受SQL文件")) sql_content = file.read().decode("utf-8") logger.info(f"从上传的文件中读取SQL内容,文件名: {file.filename}") elif request.is_json and request.json: sql_content = request.json.get("sql", "") if not sql_content: return jsonify(failed("SQL内容不能为空,请上传SQL文件或提供SQL内容")) parser = DDLParser() ddl_list = parser.parse_ddl(sql_content) if not ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) if isinstance(ddl_list, list): table_names = [] for table_item in ddl_list: if isinstance(table_item, dict) and "table_info" in table_item: table_name = table_item["table_info"].get("name_en") if table_name: table_names.append(table_name) for table_item in ddl_list: if isinstance(table_item, dict): table_item["exist"] = False if table_names: try: with neo4j_driver.get_session() as session: table_query = """ UNWIND $names AS name OPTIONAL MATCH (n:BusinessDomain {name_en: name}) RETURN name, n IS NOT NULL AS exists """ table_results = session.run( table_query, names=table_names ) exist_map = {} for record in table_results: t_name = record["name"] exists = record["exists"] exist_map[t_name] = exists for table_item in ddl_list: if ( isinstance(table_item, dict) and "table_info" in table_item ): info = table_item["table_info"] t_name = info.get("name_en") if t_name and t_name in exist_map: table_item["exist"] = exist_map[t_name] except Exception as e: logger.error(f"检查业务领域存在状态失败: {str(e)}") elif isinstance(ddl_list, dict): table_names = list(ddl_list.keys()) for table_name in table_names: if isinstance(ddl_list[table_name], dict): ddl_list[table_name]["exist"] = False else: logger.warning( f"表 {table_name} 的值不是字典类型: " f"{type(ddl_list[table_name])}" ) if table_names: try: with neo4j_driver.get_session() as session: table_query = """ UNWIND $names AS name OPTIONAL MATCH (n:BusinessDomain {name_en: name}) RETURN name, n IS NOT NULL AS exists """ table_results = session.run( table_query, names=table_names ) for record in table_results: t_name = record["name"] exists = record["exists"] is_valid = ( t_name in ddl_list and isinstance(ddl_list[t_name], dict) ) if is_valid: ddl_list[t_name]["exist"] = exists except Exception as e: logger.error(f"检查业务领域存在状态失败: {str(e)}") logger.debug( f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}" ) return jsonify(success(ddl_list)) except Exception as e: logger.error(f"解析DDL语句失败: {str(e)}") logger.error(traceback.format_exc()) return jsonify(failed("解析DDL语句失败", error=str(e))) @bp.route("/search", methods=["POST"]) def bd_search(): """搜索业务领域关联的元数据""" try: if not request.json: return jsonify(failed("请求数据不能为空")) page = int(request.json.get("current", 1)) page_size = int(request.json.get("size", 10)) domain_id = request.json.get("id") name_en_filter = request.json.get("name_en") name_zh_filter = request.json.get("name_zh") category_filter = request.json.get("category") tag_filter = request.json.get("tag") if domain_id is None: return jsonify(failed("业务领域ID不能为空")) try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}")) metadata_list, total_count = business_domain_search_list( domain_id, page, page_size, name_en_filter, name_zh_filter, category_filter, tag_filter, ) return jsonify( success( { "records": metadata_list, "total": total_count, "size": page_size, "current": page, } ) ) except Exception as e: logger.error(f"业务领域关联元数据搜索失败: {str(e)}") return jsonify(failed("业务领域关联元数据搜索失败", error=str(e))) @bp.route("/compose", methods=["POST"]) def bd_compose(): """从已有业务领域中组合创建新的业务领域""" try: data = request.json if not data: return jsonify(failed("请求数据不能为空")) if not data.get("name_zh"): return jsonify(failed("name_zh 为必填项")) if not data.get("id_list"): return jsonify(failed("id_list 为必填项")) result_data = business_domain_compose(data) response_data = {"business_domain": result_data} return jsonify(success(response_data)) except Exception as e: logger.error(f"组合创建业务领域失败: {str(e)}") return jsonify(failed("组合创建业务领域失败", error=str(e))) @bp.route("/labellist", methods=["POST"]) def bd_label_list(): """获取数据标签列表(用于业务领域关联)""" try: if not request.json: return jsonify(failed("请求数据不能为空")) page = int(request.json.get("current", 1)) page_size = int(request.json.get("size", 10)) name_en_filter = request.json.get("name_en") name_zh_filter = request.json.get("name_zh") category_filter = request.json.get("category") group_filter = request.json.get("group") labels, total_count = business_domain_label_list( page, page_size, name_en_filter, name_zh_filter, category_filter, group_filter, ) return jsonify( success( { "records": labels, "total": total_count, "size": page_size, "current": page, } ) ) except Exception as e: logger.error(f"获取标签列表失败: {str(e)}") return jsonify(failed("获取标签列表失败", error=str(e)))