| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526 |
- """
- Business Domain API 路由模块
- 提供业务领域相关的 RESTful API 接口
- """
- import io
- import json
- import time
- import logging
- import traceback
- import urllib.parse
- from flask import request, jsonify, current_app, send_file
- from minio import Minio
- from minio.error import S3Error
- from app.api.business_domain import bp
- from app.models.result import success, failed
- from app.services.neo4j_driver import neo4j_driver
- from app.core.llm.ddl_parser import DDLParser
- from app.core.business_domain import (
- business_domain_list,
- get_business_domain_by_id,
- delete_business_domain,
- update_business_domain,
- save_business_domain,
- business_domain_graph_all,
- business_domain_search_list,
- business_domain_compose,
- business_domain_label_list,
- )
- logger = logging.getLogger("app")
- # ----------------------- MinIO helpers -----------------------
- def get_minio_client():
- """获取 MinIO 客户端实例"""
- return Minio(
- current_app.config["MINIO_HOST"],
- access_key=current_app.config["MINIO_USER"],
- secret_key=current_app.config["MINIO_PASSWORD"],
- secure=current_app.config["MINIO_SECURE"],
- )
- def get_minio_config():
- """获取 MinIO 配置"""
- return {
- "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
- "PREFIX": current_app.config.get(
- "BUSINESS_DOMAIN_PREFIX", "business_domain"
- ),
- "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
- }
- def allowed_file(filename):
- """检查文件扩展名是否允许"""
- if "." not in filename:
- return False
- ext = filename.rsplit(".", 1)[1].lower()
- return ext in get_minio_config()["ALLOWED_EXTENSIONS"]
- # ----------------------- Business Domain APIs -----------------------
- @bp.route("/list", methods=["POST"])
- def bd_list():
- """获取业务领域列表"""
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- page = int(request.json.get("current", 1))
- page_size = int(request.json.get("size", 10))
- name_en_filter = request.json.get("name_en")
- name_zh_filter = request.json.get("name_zh")
- type_filter = request.json.get("type", "all")
- category_filter = request.json.get("category")
- tag_filter = request.json.get("tag")
- domains, total_count = business_domain_list(
- page,
- page_size,
- name_en_filter,
- name_zh_filter,
- type_filter,
- category_filter,
- tag_filter,
- )
- return jsonify(
- success(
- {
- "records": domains,
- "total": total_count,
- "size": page_size,
- "current": page,
- }
- )
- )
- except Exception as e:
- logger.error(f"获取业务领域列表失败: {str(e)}")
- return jsonify(failed("获取业务领域列表失败", error=str(e)))
- @bp.route("/detail", methods=["POST"])
- def bd_detail():
- """获取业务领域详情"""
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- domain_id = request.json.get("id")
- if domain_id is None:
- return jsonify(failed("业务领域ID不能为空"))
- try:
- domain_id = int(domain_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
- domain_data = get_business_domain_by_id(domain_id)
- if not domain_data:
- return jsonify(failed("业务领域不存在"))
- return jsonify(success(domain_data))
- except Exception as e:
- logger.error(f"获取业务领域详情失败: {str(e)}")
- return jsonify(failed("获取业务领域详情失败", error=str(e)))
- @bp.route("/delete", methods=["POST"])
- def bd_delete():
- """删除业务领域"""
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- domain_id = request.json.get("id")
- if domain_id is None:
- return jsonify(failed("业务领域ID不能为空"))
- result = delete_business_domain(domain_id)
- if result:
- return jsonify(success({"message": "业务领域删除成功"}))
- return jsonify(failed("业务领域删除失败"))
- except Exception as e:
- logger.error(f"删除业务领域失败: {str(e)}")
- return jsonify(failed("删除业务领域失败", error=str(e)))
- @bp.route("/save", methods=["POST"])
- def bd_save():
- """保存业务领域(新建或更新)"""
- try:
- data = request.json
- if not data:
- return jsonify(failed("请求数据不能为空"))
- if not data.get("id"):
- if not data.get("name_zh") or not data.get("name_en"):
- return jsonify(failed("新建时 name_zh 和 name_en 为必填项"))
- saved_data = save_business_domain(data)
- return jsonify(success(saved_data))
- except Exception as e:
- logger.error(f"保存业务领域失败: {str(e)}")
- return jsonify(failed("保存业务领域失败", error=str(e)))
- @bp.route("/update", methods=["POST"])
- def bd_update():
- """更新业务领域"""
- try:
- data = request.json
- if not data or "id" not in data:
- return jsonify(failed("参数不完整"))
- updated_data = update_business_domain(data)
- return jsonify(success(updated_data))
- except Exception as e:
- logger.error(f"更新业务领域失败: {str(e)}")
- return jsonify(failed("更新业务领域失败", error=str(e)))
- @bp.route("/upload", methods=["POST"])
- def bd_upload():
- """上传业务领域相关文件"""
- response = None
- try:
- if "file" not in request.files:
- return jsonify(failed("没有找到上传的文件"))
- file = request.files["file"]
- if file.filename == "":
- return jsonify(failed("未选择文件"))
- if not allowed_file(file.filename):
- return jsonify(failed("不支持的文件类型"))
- minio_client = get_minio_client()
- config = get_minio_config()
- file_content = file.read()
- file_size = len(file_content)
- filename = file.filename or ""
- file_type = filename.rsplit(".", 1)[1].lower()
- filename_without_ext = filename.rsplit(".", 1)[0]
- timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
- object_name = (
- f"{config['PREFIX']}/"
- f"{filename_without_ext}_{timestamp}.{file_type}"
- )
- minio_client.put_object(
- config["MINIO_BUCKET"],
- object_name,
- io.BytesIO(file_content),
- file_size,
- content_type=f"application/{file_type}",
- )
- logger.info(f"文件上传成功: {object_name}, 大小: {file_size}")
- return jsonify(
- success(
- {
- "filename": file.filename,
- "size": file_size,
- "type": file_type,
- "url": object_name,
- }
- )
- )
- except Exception as e:
- logger.error(f"文件上传失败: {str(e)}")
- return jsonify(failed("文件上传失败", error=str(e)))
- finally:
- if response:
- response.close()
- response.release_conn()
- @bp.route("/download", methods=["GET"])
- def bd_download():
- """下载业务领域相关文件"""
- response = None
- try:
- object_name = request.args.get("url")
- if not object_name:
- return jsonify(failed("文件路径不能为空"))
- object_name = urllib.parse.unquote(object_name)
- logger.info(f"下载文件请求: {object_name}")
- minio_client = get_minio_client()
- config = get_minio_config()
- try:
- response = minio_client.get_object(
- config["MINIO_BUCKET"], object_name
- )
- file_data = response.read()
- except S3Error as e:
- logger.error(f"MinIO获取文件失败: {str(e)}")
- return jsonify(failed(f"文件获取失败: {str(e)}"))
- file_name = object_name.split("/")[-1]
- file_stream = io.BytesIO(file_data)
- return send_file(
- file_stream,
- as_attachment=True,
- download_name=file_name,
- mimetype="application/octet-stream",
- )
- except Exception as e:
- logger.error(f"文件下载失败: {str(e)}")
- return jsonify(failed("文件下载失败", error=str(e)))
- finally:
- if response:
- response.close()
- response.release_conn()
- @bp.route("/graphall", methods=["POST"])
- def bd_graph_all():
- """获取业务领域完整关系图谱"""
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- domain_id = request.json.get("id")
- include_meta = request.json.get("meta", True)
- if domain_id is None:
- return jsonify(failed("业务领域ID不能为空"))
- try:
- domain_id = int(domain_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
- graph_data = business_domain_graph_all(domain_id, include_meta)
- return jsonify(success(graph_data))
- except Exception as e:
- logger.error(f"获取业务领域图谱失败: {str(e)}")
- return jsonify(failed("获取业务领域图谱失败", error=str(e)))
- @bp.route("/ddlparse", methods=["POST"])
- def bd_ddl_parse():
- """解析DDL语句,用于业务领域创建"""
- try:
- sql_content = ""
- if "file" in request.files:
- file = request.files["file"]
- if file and file.filename:
- if not file.filename.lower().endswith(".sql"):
- return jsonify(failed("只接受SQL文件"))
- sql_content = file.read().decode("utf-8")
- logger.info(f"从上传的文件中读取SQL内容,文件名: {file.filename}")
- elif request.is_json and request.json:
- sql_content = request.json.get("sql", "")
- if not sql_content:
- return jsonify(failed("SQL内容不能为空,请上传SQL文件或提供SQL内容"))
- parser = DDLParser()
- ddl_list = parser.parse_ddl(sql_content)
- if not ddl_list:
- return jsonify(failed("未找到有效的CREATE TABLE语句"))
- if isinstance(ddl_list, list):
- table_names = []
- for table_item in ddl_list:
- if isinstance(table_item, dict) and "table_info" in table_item:
- table_name = table_item["table_info"].get("name_en")
- if table_name:
- table_names.append(table_name)
- for table_item in ddl_list:
- if isinstance(table_item, dict):
- table_item["exist"] = False
- if table_names:
- try:
- with neo4j_driver.get_session() as session:
- table_query = """
- UNWIND $names AS name
- OPTIONAL MATCH (n:BusinessDomain {name_en: name})
- RETURN name, n IS NOT NULL AS exists
- """
- table_results = session.run(
- table_query, names=table_names
- )
- exist_map = {}
- for record in table_results:
- t_name = record["name"]
- exists = record["exists"]
- exist_map[t_name] = exists
- for table_item in ddl_list:
- if (
- isinstance(table_item, dict)
- and "table_info" in table_item
- ):
- info = table_item["table_info"]
- t_name = info.get("name_en")
- if t_name and t_name in exist_map:
- table_item["exist"] = exist_map[t_name]
- except Exception as e:
- logger.error(f"检查业务领域存在状态失败: {str(e)}")
- elif isinstance(ddl_list, dict):
- table_names = list(ddl_list.keys())
- for table_name in table_names:
- if isinstance(ddl_list[table_name], dict):
- ddl_list[table_name]["exist"] = False
- else:
- logger.warning(
- f"表 {table_name} 的值不是字典类型: "
- f"{type(ddl_list[table_name])}"
- )
- if table_names:
- try:
- with neo4j_driver.get_session() as session:
- table_query = """
- UNWIND $names AS name
- OPTIONAL MATCH (n:BusinessDomain {name_en: name})
- RETURN name, n IS NOT NULL AS exists
- """
- table_results = session.run(
- table_query, names=table_names
- )
- for record in table_results:
- t_name = record["name"]
- exists = record["exists"]
- is_valid = (
- t_name in ddl_list
- and isinstance(ddl_list[t_name], dict)
- )
- if is_valid:
- ddl_list[t_name]["exist"] = exists
- except Exception as e:
- logger.error(f"检查业务领域存在状态失败: {str(e)}")
- logger.debug(
- f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}"
- )
- return jsonify(success(ddl_list))
- except Exception as e:
- logger.error(f"解析DDL语句失败: {str(e)}")
- logger.error(traceback.format_exc())
- return jsonify(failed("解析DDL语句失败", error=str(e)))
- @bp.route("/search", methods=["POST"])
- def bd_search():
- """搜索业务领域关联的元数据"""
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- page = int(request.json.get("current", 1))
- page_size = int(request.json.get("size", 10))
- domain_id = request.json.get("id")
- name_en_filter = request.json.get("name_en")
- name_zh_filter = request.json.get("name_zh")
- category_filter = request.json.get("category")
- tag_filter = request.json.get("tag")
- if domain_id is None:
- return jsonify(failed("业务领域ID不能为空"))
- try:
- domain_id = int(domain_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
- metadata_list, total_count = business_domain_search_list(
- domain_id,
- page,
- page_size,
- name_en_filter,
- name_zh_filter,
- category_filter,
- tag_filter,
- )
- return jsonify(
- success(
- {
- "records": metadata_list,
- "total": total_count,
- "size": page_size,
- "current": page,
- }
- )
- )
- except Exception as e:
- logger.error(f"业务领域关联元数据搜索失败: {str(e)}")
- return jsonify(failed("业务领域关联元数据搜索失败", error=str(e)))
- @bp.route("/compose", methods=["POST"])
- def bd_compose():
- """从已有业务领域中组合创建新的业务领域"""
- try:
- data = request.json
- if not data:
- return jsonify(failed("请求数据不能为空"))
- if not data.get("name_zh"):
- return jsonify(failed("name_zh 为必填项"))
- if not data.get("id_list"):
- return jsonify(failed("id_list 为必填项"))
- result_data = business_domain_compose(data)
- response_data = {"business_domain": result_data}
- return jsonify(success(response_data))
- except Exception as e:
- logger.error(f"组合创建业务领域失败: {str(e)}")
- return jsonify(failed("组合创建业务领域失败", error=str(e)))
- @bp.route("/labellist", methods=["POST"])
- def bd_label_list():
- """获取数据标签列表(用于业务领域关联)"""
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- page = int(request.json.get("current", 1))
- page_size = int(request.json.get("size", 10))
- name_en_filter = request.json.get("name_en")
- name_zh_filter = request.json.get("name_zh")
- category_filter = request.json.get("category")
- group_filter = request.json.get("group")
- labels, total_count = business_domain_label_list(
- page,
- page_size,
- name_en_filter,
- name_zh_filter,
- category_filter,
- group_filter,
- )
- return jsonify(
- success(
- {
- "records": labels,
- "total": total_count,
- "size": page_size,
- "current": page,
- }
- )
- )
- except Exception as e:
- logger.error(f"获取标签列表失败: {str(e)}")
- return jsonify(failed("获取标签列表失败", error=str(e)))
|