""" Business Domain API 路由模块 提供业务领域相关的 RESTful API 接口 """ import io import json import logging import time import traceback import urllib.parse from flask import current_app, jsonify, request, send_file from minio import Minio from minio.error import S3Error from app.api.business_domain import bp from app.core.business_domain import ( business_domain_compose, business_domain_graph_all, business_domain_label_list, business_domain_list, business_domain_search_list, delete_business_domain, get_business_domain_by_id, save_business_domain, update_business_domain, ) from app.core.llm.ddl_parser import DDLParser from app.models.result import failed, success from app.services.neo4j_driver import neo4j_driver logger = logging.getLogger("app") # ----------------------- MinIO helpers ----------------------- def get_minio_client(): """获取 MinIO 客户端实例""" return Minio( current_app.config["MINIO_HOST"], access_key=current_app.config["MINIO_USER"], secret_key=current_app.config["MINIO_PASSWORD"], secure=current_app.config["MINIO_SECURE"], ) def get_minio_config(): """获取 MinIO 配置""" return { "MINIO_BUCKET": current_app.config["MINIO_BUCKET"], "PREFIX": current_app.config.get("BUSINESS_DOMAIN_PREFIX", "business_domain"), "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"], } def allowed_file(filename): """检查文件扩展名是否允许""" if "." not in filename: return False ext = filename.rsplit(".", 1)[1].lower() return ext in get_minio_config()["ALLOWED_EXTENSIONS"] # ----------------------- Business Domain APIs ----------------------- @bp.route("/list", methods=["POST"]) def bd_list(): """获取业务领域列表""" try: if not request.json: return jsonify(failed("请求数据不能为空")) page = int(request.json.get("current", 1)) page_size = int(request.json.get("size", 10)) name_en_filter = request.json.get("name_en") name_zh_filter = request.json.get("name_zh") type_filter = request.json.get("type", "all") category_filter = request.json.get("category") tag_filter = request.json.get("tag") domains, total_count = business_domain_list( page, page_size, name_en_filter, name_zh_filter, type_filter, category_filter, tag_filter, ) return jsonify( success( { "records": domains, "total": total_count, "size": page_size, "current": page, } ) ) except Exception as e: logger.error(f"获取业务领域列表失败: {str(e)}") return jsonify(failed("获取业务领域列表失败", error=str(e))) @bp.route("/detail", methods=["POST"]) def bd_detail(): """获取业务领域详情""" try: if not request.json: return jsonify(failed("请求数据不能为空")) domain_id = request.json.get("id") if domain_id is None: return jsonify(failed("业务领域ID不能为空")) try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}")) domain_data = get_business_domain_by_id(domain_id) if not domain_data: return jsonify(failed("业务领域不存在")) return jsonify(success(domain_data)) except Exception as e: logger.error(f"获取业务领域详情失败: {str(e)}") return jsonify(failed("获取业务领域详情失败", error=str(e))) @bp.route("/delete", methods=["POST"]) def bd_delete(): """删除业务领域""" try: if not request.json: return jsonify(failed("请求数据不能为空")) domain_id = request.json.get("id") if domain_id is None: return jsonify(failed("业务领域ID不能为空")) result = delete_business_domain(domain_id) if result: return jsonify(success({"message": "业务领域删除成功"})) return jsonify(failed("业务领域删除失败")) except Exception as e: logger.error(f"删除业务领域失败: {str(e)}") return jsonify(failed("删除业务领域失败", error=str(e))) @bp.route("/save", methods=["POST"]) def bd_save(): """保存业务领域(新建或更新)""" try: data = request.json if not data: return jsonify(failed("请求数据不能为空")) if not data.get("id") and (not data.get("name_zh") or not data.get("name_en")): return jsonify(failed("新建时 name_zh 和 name_en 为必填项")) saved_data = save_business_domain(data) return jsonify(success(saved_data)) except Exception as e: logger.error(f"保存业务领域失败: {str(e)}") return jsonify(failed("保存业务领域失败", error=str(e))) @bp.route("/update", methods=["POST"]) def bd_update(): """更新业务领域""" try: data = request.json if not data or "id" not in data: return jsonify(failed("参数不完整")) updated_data = update_business_domain(data) return jsonify(success(updated_data)) except Exception as e: logger.error(f"更新业务领域失败: {str(e)}") return jsonify(failed("更新业务领域失败", error=str(e))) @bp.route("/upload", methods=["POST"]) def bd_upload(): """上传业务领域相关文件""" response = None try: if "file" not in request.files: return jsonify(failed("没有找到上传的文件")) file = request.files["file"] if file.filename == "": return jsonify(failed("未选择文件")) if not allowed_file(file.filename): return jsonify(failed("不支持的文件类型")) minio_client = get_minio_client() config = get_minio_config() file_content = file.read() file_size = len(file_content) filename = file.filename or "" file_type = filename.rsplit(".", 1)[1].lower() filename_without_ext = filename.rsplit(".", 1)[0] timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) object_name = ( f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}" ) minio_client.put_object( config["MINIO_BUCKET"], object_name, io.BytesIO(file_content), file_size, content_type=f"application/{file_type}", ) logger.info(f"文件上传成功: {object_name}, 大小: {file_size}") return jsonify( success( { "filename": file.filename, "size": file_size, "type": file_type, "url": object_name, } ) ) except Exception as e: logger.error(f"文件上传失败: {str(e)}") return jsonify(failed("文件上传失败", error=str(e))) finally: if response: response.close() response.release_conn() @bp.route("/download", methods=["GET"]) def bd_download(): """下载业务领域相关文件""" response = None try: object_name = request.args.get("url") if not object_name: return jsonify(failed("文件路径不能为空")) object_name = urllib.parse.unquote(object_name) logger.info(f"下载文件请求: {object_name}") minio_client = get_minio_client() config = get_minio_config() try: response = minio_client.get_object(config["MINIO_BUCKET"], object_name) file_data = response.read() except S3Error as e: logger.error(f"MinIO获取文件失败: {str(e)}") return jsonify(failed(f"文件获取失败: {str(e)}")) file_name = object_name.split("/")[-1] file_stream = io.BytesIO(file_data) return send_file( file_stream, as_attachment=True, download_name=file_name, mimetype="application/octet-stream", ) except Exception as e: logger.error(f"文件下载失败: {str(e)}") return jsonify(failed("文件下载失败", error=str(e))) finally: if response: response.close() response.release_conn() @bp.route("/graphall", methods=["POST"]) def bd_graph_all(): """获取业务领域完整关系图谱""" try: if not request.json: return jsonify(failed("请求数据不能为空")) domain_id = request.json.get("id") include_meta = request.json.get("meta", True) if domain_id is None: return jsonify(failed("业务领域ID不能为空")) try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}")) graph_data = business_domain_graph_all(domain_id, include_meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取业务领域图谱失败: {str(e)}") return jsonify(failed("获取业务领域图谱失败", error=str(e))) def _get_file_extension(filename: str) -> str: """获取文件扩展名(小写)""" if "." not in filename: return "" return filename.rsplit(".", 1)[1].lower() def _check_table_existence(table_list: list) -> list: """ 检查表在 Neo4j 中的存在状态 Args: table_list: 表信息列表 Returns: 更新了 exist 字段的表信息列表 """ table_names = [] for table_item in table_list: if isinstance(table_item, dict) and "table_info" in table_item: table_name = table_item["table_info"].get("name_en") if table_name: table_names.append(table_name) # 初始化 exist 字段 for table_item in table_list: if isinstance(table_item, dict): table_item["exist"] = False if table_names: try: with neo4j_driver.get_session() as session: table_query = """ UNWIND $names AS name OPTIONAL MATCH (n:BusinessDomain {name_en: name}) RETURN name, n IS NOT NULL AS exists """ table_results = session.run(table_query, names=table_names) exist_map = {} for record in table_results: t_name = record["name"] exists = record["exists"] exist_map[t_name] = exists for table_item in table_list: if isinstance(table_item, dict) and "table_info" in table_item: info = table_item["table_info"] t_name = info.get("name_en") if t_name and t_name in exist_map: table_item["exist"] = exist_map[t_name] except Exception as e: logger.error(f"检查业务领域存在状态失败: {str(e)}") return table_list # 支持的文件类型 ALLOWED_DDL_EXTENSIONS = {"sql", "xlsx", "xls", "docx", "doc", "pdf"} @bp.route("/ddlparse", methods=["POST"]) def bd_ddl_parse(): """ 解析文件内容,提取数据表定义信息 支持的文件类型: - SQL文件 (.sql): 解析DDL建表语句 - Excel文件 (.xlsx, .xls): 解析表格中的表结构定义 - Word文件 (.docx, .doc): 解析文档中的表结构定义 - PDF文件 (.pdf): 解析PDF中的表结构定义 返回: JSON数组格式的表结构信息 """ try: if "file" not in request.files: return jsonify(failed("没有找到上传的文件,请上传一个文件")) file = request.files["file"] if not file or not file.filename: return jsonify(failed("未选择文件")) filename = file.filename file_ext = _get_file_extension(filename) if file_ext not in ALLOWED_DDL_EXTENSIONS: return jsonify( failed( f"不支持的文件类型: .{file_ext}," f"支持的类型: {', '.join('.' + ext for ext in ALLOWED_DDL_EXTENSIONS)}" ) ) file_content = file.read() logger.info(f"接收到文件上传,文件名: {filename}, 类型: {file_ext}") parser = DDLParser() ddl_list = [] # 根据文件类型选择不同的解析方法 if file_ext == "sql": # SQL 文件直接解析 DDL sql_content = file_content.decode("utf-8") ddl_list = parser.parse_ddl(sql_content) elif file_ext in {"xlsx", "xls"}: # Excel 文件解析 ddl_list = parser.parse_excel_content(file_content) elif file_ext in {"docx", "doc"}: # Word 文件解析 if file_ext == "doc": return jsonify( failed("暂不支持 .doc 格式,请转换为 .docx 格式后重新上传") ) ddl_list = parser.parse_word_content(file_content) elif file_ext == "pdf": # PDF 文件解析 ddl_list = parser.parse_pdf_content(file_content) # 验证解析结果 if not ddl_list: return jsonify(failed("未找到有效的数据表定义信息")) # 确保结果是列表格式 if isinstance(ddl_list, dict): if "table_info" in ddl_list: ddl_list = [ddl_list] else: # 兼容旧格式(字典形式的多表) table_names = list(ddl_list.keys()) converted_list = [] for table_name in table_names: table_data = ddl_list[table_name] if isinstance(table_data, dict): table_data["exist"] = False converted_list.append(table_data) ddl_list = converted_list # 检查表在 Neo4j 中的存在状态 ddl_list = _check_table_existence(ddl_list) logger.debug(f"识别到的数据表: {json.dumps(ddl_list, ensure_ascii=False)}") return jsonify(success(ddl_list)) except ValueError as e: logger.error(f"文件解析失败: {str(e)}") return jsonify(failed(str(e))) except Exception as e: logger.error(f"解析文件失败: {str(e)}") logger.error(traceback.format_exc()) return jsonify(failed("解析文件失败", error=str(e))) @bp.route("/search", methods=["POST"]) def bd_search(): """搜索业务领域关联的元数据""" try: if not request.json: return jsonify(failed("请求数据不能为空")) page = int(request.json.get("current", 1)) page_size = int(request.json.get("size", 10)) domain_id = request.json.get("id") name_en_filter = request.json.get("name_en") name_zh_filter = request.json.get("name_zh") category_filter = request.json.get("category") tag_filter = request.json.get("tag") if domain_id is None: return jsonify(failed("业务领域ID不能为空")) try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}")) metadata_list, total_count = business_domain_search_list( domain_id, page, page_size, name_en_filter, name_zh_filter, category_filter, tag_filter, ) return jsonify( success( { "records": metadata_list, "total": total_count, "size": page_size, "current": page, } ) ) except Exception as e: logger.error(f"业务领域关联元数据搜索失败: {str(e)}") return jsonify(failed("业务领域关联元数据搜索失败", error=str(e))) @bp.route("/compose", methods=["POST"]) def bd_compose(): """从已有业务领域中组合创建新的业务领域""" try: data = request.json if not data: return jsonify(failed("请求数据不能为空")) if not data.get("name_zh"): return jsonify(failed("name_zh 为必填项")) if not data.get("id_list"): return jsonify(failed("id_list 为必填项")) result_data = business_domain_compose(data) response_data = {"business_domain": result_data} return jsonify(success(response_data)) except Exception as e: logger.error(f"组合创建业务领域失败: {str(e)}") return jsonify(failed("组合创建业务领域失败", error=str(e))) @bp.route("/labellist", methods=["POST"]) def bd_label_list(): """获取数据标签列表(用于业务领域关联)""" try: if not request.json: return jsonify(failed("请求数据不能为空")) page = int(request.json.get("current", 1)) page_size = int(request.json.get("size", 10)) name_en_filter = request.json.get("name_en") name_zh_filter = request.json.get("name_zh") category_filter = request.json.get("category") group_filter = request.json.get("group") labels, total_count = business_domain_label_list( page, page_size, name_en_filter, name_zh_filter, category_filter, group_filter, ) return jsonify( success( { "records": labels, "total": total_count, "size": page_size, "current": page, } ) ) except Exception as e: logger.error(f"获取标签列表失败: {str(e)}") return jsonify(failed("获取标签列表失败", error=str(e)))