| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780 |
- """
- Business Domain API 路由模块
- 提供业务领域相关的 RESTful API 接口
- """
- import io
- import json
- import time
- import logging
- import traceback
- import urllib.parse
- from flask import request, jsonify, current_app, send_file
- from minio import Minio
- from minio.error import S3Error
- from app.api.business_domain import bp
- from app.models.result import success, failed
- from app.services.neo4j_driver import neo4j_driver
- from app.core.llm.ddl_parser import DDLParser
- from app.core.business_domain import (
- business_domain_list,
- get_business_domain_by_id,
- delete_business_domain,
- update_business_domain,
- save_business_domain,
- business_domain_graph_all,
- business_domain_search_list,
- business_domain_compose,
- business_domain_label_list
- )
- logger = logging.getLogger("app")
- def get_minio_client():
- """获取 MinIO 客户端实例"""
- return Minio(
- current_app.config['MINIO_HOST'],
- access_key=current_app.config['MINIO_USER'],
- secret_key=current_app.config['MINIO_PASSWORD'],
- secure=current_app.config['MINIO_SECURE']
- )
- def get_minio_config():
- """获取 MinIO 配置"""
- return {
- 'MINIO_BUCKET': current_app.config['MINIO_BUCKET'],
- 'PREFIX': current_app.config.get(
- 'BUSINESS_DOMAIN_PREFIX', 'business_domain'
- ),
- 'ALLOWED_EXTENSIONS': current_app.config['ALLOWED_EXTENSIONS']
- }
- def allowed_file(filename):
- """检查文件扩展名是否允许"""
- if '.' not in filename:
- return False
- ext = filename.rsplit('.', 1)[1].lower()
- return ext in get_minio_config()['ALLOWED_EXTENSIONS']
- @bp.route('/list', methods=['POST'])
- def bd_list():
- """
- 获取业务领域列表
-
- 请求参数 (JSON):
- - current: 当前页码,默认1
- - size: 每页大小,默认10
- - name_en: 英文名称过滤条件(可选)
- - name_zh: 中文名称过滤条件(可选)
- - type: 类型过滤条件,默认'all'表示不过滤(可选)
- - category: 分类过滤条件(可选)
- - tag: 标签过滤条件(可选)
-
- 返回:
- - success: 是否成功
- - message: 消息
- - data:
- - records: 业务领域列表
- - total: 总数量
- - size: 每页大小
- - current: 当前页码
- """
- try:
- # 获取分页和筛选参数
- if not request.json:
- return jsonify(failed('请求数据不能为空'))
-
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- name_en_filter = request.json.get('name_en')
- name_zh_filter = request.json.get('name_zh')
- type_filter = request.json.get('type', 'all')
- category_filter = request.json.get('category')
- tag_filter = request.json.get('tag')
-
- # 调用业务逻辑查询业务领域列表
- domains, total_count = business_domain_list(
- page,
- page_size,
- name_en_filter,
- name_zh_filter,
- type_filter,
- category_filter,
- tag_filter
- )
-
- # 返回结果
- return jsonify(success({
- "records": domains,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"获取业务领域列表失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/detail', methods=['POST'])
- def bd_detail():
- """
- 获取业务领域详情
-
- 请求参数 (JSON):
- - id: 业务领域节点ID(必填)
-
- 返回:
- - success: 是否成功
- - message: 消息
- - data: 业务领域详情
- """
- try:
- # 获取参数
- if not request.json:
- return jsonify(failed('请求数据不能为空'))
-
- domain_id = request.json.get('id')
-
- if domain_id is None:
- return jsonify(failed("业务领域ID不能为空"))
-
- # 确保传入的ID为整数
- try:
- domain_id = int(domain_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
-
- # 调用业务逻辑查询业务领域详情
- domain_data = get_business_domain_by_id(domain_id)
-
- if not domain_data:
- return jsonify(failed("业务领域不存在"))
-
- return jsonify(success(domain_data))
- except Exception as e:
- logger.error(f"获取业务领域详情失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/delete', methods=['POST'])
- def bd_delete():
- """
- 删除业务领域
-
- 请求参数 (JSON):
- - id: 业务领域节点ID(必填)
-
- 返回:
- - success: 是否成功
- - message: 消息
- - data: 删除结果
- """
- try:
- # 获取参数
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
-
- domain_id = request.json.get('id')
- if domain_id is None:
- return jsonify(failed("业务领域ID不能为空"))
-
- # 调用业务逻辑删除业务领域
- result = delete_business_domain(domain_id)
-
- if result:
- return jsonify(success({"message": "业务领域删除成功"}))
- else:
- return jsonify(failed("业务领域删除失败"))
- except Exception as e:
- logger.error(f"删除业务领域失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/save', methods=['POST'])
- def bd_save():
- """
- 保存业务领域(新建或更新)
- 请求参数 (JSON):
- - id: 业务领域节点ID(可选,有则更新,无则新建)
- - name_zh: 中文名称(新建时必填)
- - name_en: 英文名称(新建时必填)
- - describe: 描述(可选)
- - type: 类型(可选)
- - category: 分类(可选)
- - tag: 标签ID(可选)
- - data_source: 数据源ID(可选)
- - 其他属性字段...
- 返回:
- - success: 是否成功
- - message: 消息
- - data: 保存后的业务领域数据
- """
- try:
- # 获取保存数据
- data = request.json
- if not data:
- return jsonify(failed("请求数据不能为空"))
- # 新建时校验必填字段
- if not data.get("id"):
- if not data.get("name_zh") or not data.get("name_en"):
- return jsonify(failed("新建时 name_zh 和 name_en 为必填项"))
- # 调用业务逻辑保存业务领域
- saved_data = save_business_domain(data)
- return jsonify(success(saved_data))
- except Exception as e:
- logger.error(f"保存业务领域失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/update', methods=['POST'])
- def bd_update():
- """
- 更新业务领域
- 请求参数 (JSON):
- - id: 业务领域节点ID(必填)
- - name_zh: 中文名称(可选)
- - name_en: 英文名称(可选)
- - describe: 描述(可选)
- - tag: 标签ID(可选)
- - data_source: 数据源ID(可选)
- - 其他属性字段...
- 返回:
- - success: 是否成功
- - message: 消息
- - data: 更新后的业务领域数据
- """
- try:
- # 获取更新数据
- data = request.json
- if not data or "id" not in data:
- return jsonify(failed("参数不完整"))
- # 调用业务逻辑更新业务领域
- updated_data = update_business_domain(data)
- return jsonify(success(updated_data))
- except Exception as e:
- logger.error(f"更新业务领域失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/upload', methods=['POST'])
- def bd_upload():
- """
- 上传业务领域相关文件
- 请求参数 (multipart/form-data):
- - file: 上传的文件(必填)
- 返回:
- - success: 是否成功
- - message: 消息
- - data:
- - filename: 原始文件名
- - size: 文件大小(字节)
- - type: 文件类型
- - url: 文件存储路径
- """
- try:
- # 检查请求中是否有文件
- if 'file' not in request.files:
- return jsonify(failed("没有找到上传的文件"))
- file = request.files['file']
- # 检查文件名
- if file.filename == '':
- return jsonify(failed("未选择文件"))
- # 检查文件类型
- if not allowed_file(file.filename):
- return jsonify(failed("不支持的文件类型"))
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
- # 读取文件内容
- file_content = file.read()
- file_size = len(file_content)
- filename = file.filename or ''
- file_type = filename.rsplit('.', 1)[1].lower()
- # 提取文件名(不包含扩展名)
- filename_without_ext = filename.rsplit('.', 1)[0]
- # 生成紧凑的时间戳 (yyyyMMddHHmmss)
- timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
- # 生成唯一文件名
- prefix = config['PREFIX']
- object_name = (
- f"{prefix}/{filename_without_ext}_{timestamp}.{file_type}"
- )
- # 上传文件到 MinIO
- minio_client.put_object(
- config['MINIO_BUCKET'],
- object_name,
- io.BytesIO(file_content),
- file_size,
- content_type=f"application/{file_type}"
- )
- logger.info(f"文件上传成功: {object_name}, 大小: {file_size}")
- # 返回结果
- return jsonify(success({
- "filename": file.filename,
- "size": file_size,
- "type": file_type,
- "url": object_name
- }))
- except Exception as e:
- logger.error(f"文件上传失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/download', methods=['GET'])
- def bd_download():
- """
- 下载业务领域相关文件
- 请求参数 (URL Query):
- - url: 文件存储路径(必填)
- 返回:
- - 文件流(作为附件下载)
- """
- response = None
- try:
- # 获取文件路径参数
- object_name = request.args.get('url')
- if not object_name:
- return jsonify(failed("文件路径不能为空"))
- # URL解码,处理特殊字符
- object_name = urllib.parse.unquote(object_name)
- # 记录下载请求信息,便于调试
- logger.info(f"下载文件请求: {object_name}")
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
- # 获取文件
- try:
- response = minio_client.get_object(
- config['MINIO_BUCKET'], object_name
- )
- file_data = response.read()
- except S3Error as e:
- logger.error(f"MinIO获取文件失败: {str(e)}")
- return jsonify(failed(f"文件获取失败: {str(e)}"))
- # 获取文件名
- file_name = object_name.split('/')[-1]
- # 直接从内存返回文件,不创建临时文件
- file_stream = io.BytesIO(file_data)
- # 返回文件
- return send_file(
- file_stream,
- as_attachment=True,
- download_name=file_name,
- mimetype="application/octet-stream"
- )
- except Exception as e:
- logger.error(f"文件下载失败: {str(e)}")
- return jsonify(failed(str(e)))
- finally:
- if response:
- response.close()
- response.release_conn()
- @bp.route('/graphall', methods=['POST'])
- def bd_graph_all():
- """
- 获取业务领域完整关系图谱
- 请求参数 (JSON):
- - id: 业务领域节点ID(必填)
- - meta: 是否包含元数据节点,默认True(可选)
- 返回:
- - success: 是否成功
- - message: 消息
- - data:
- - nodes: 节点列表
- - lines: 关系列表
- """
- try:
- # 获取参数
- if not request.json:
- return jsonify(failed('请求数据不能为空'))
- domain_id = request.json.get('id')
- include_meta = request.json.get('meta', True)
- if domain_id is None:
- return jsonify(failed("业务领域ID不能为空"))
- # 确保传入的ID为整数
- try:
- domain_id = int(domain_id)
- except (ValueError, TypeError):
- return jsonify(failed(
- f"业务领域ID必须为整数, 收到的是: {domain_id}"
- ))
- # 调用业务逻辑获取完整图谱
- graph_data = business_domain_graph_all(domain_id, include_meta)
- return jsonify(success(graph_data))
- except Exception as e:
- logger.error(f"获取业务领域图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/ddlparse', methods=['POST'])
- def bd_ddl_parse():
- """
- 解析DDL语句,用于业务领域创建
- 请求参数:
- - file: SQL文件(multipart/form-data,可选)
- - sql: SQL内容(JSON,可选)
- 至少提供其中一种方式
- 返回:
- - success: 是否成功
- - message: 消息
- - data: 解析后的DDL列表,包含表信息和字段信息
- """
- try:
- # 获取参数 - 支持两种方式:上传文件或JSON
- sql_content = ''
- # 检查是否有文件上传
- if 'file' in request.files:
- file = request.files['file']
- # 检查文件是否存在且文件名不为空
- if file and file.filename:
- # 检查是否是SQL文件
- if not file.filename.lower().endswith('.sql'):
- return jsonify(failed("只接受SQL文件"))
- # 读取文件内容
- sql_content = file.read().decode('utf-8')
- logger.info(
- f"从上传的文件中读取SQL内容,文件名: {file.filename}"
- )
- # 如果没有文件上传,检查是否有JSON输入
- elif request.is_json and request.json:
- sql_content = request.json.get('sql', '')
- # 如果两种方式都没有提供SQL内容,则返回错误
- if not sql_content:
- return jsonify(failed(
- "SQL内容不能为空,请上传SQL文件或提供SQL内容"
- ))
- parser = DDLParser()
- # 提取创建表的DDL语句
- ddl_list = parser.parse_ddl(sql_content)
- if not ddl_list:
- return jsonify(failed("未找到有效的CREATE TABLE语句"))
- # 处理表的存在状态
- if isinstance(ddl_list, list):
- # 新格式:数组格式
- # 获取所有表名
- table_names = []
- for table_item in ddl_list:
- if isinstance(table_item, dict) and 'table_info' in table_item:
- table_name = table_item['table_info'].get('name_en')
- if table_name:
- table_names.append(table_name)
- # 首先为所有表设置默认的exist状态
- for table_item in ddl_list:
- if isinstance(table_item, dict):
- table_item["exist"] = False
- if table_names:
- try:
- # 查询业务领域是否存在
- with neo4j_driver.get_session() as session:
- table_query = """
- UNWIND $names AS name
- OPTIONAL MATCH (n:BusinessDomain {name_en: name})
- RETURN name, n IS NOT NULL AS exists
- """
- table_results = session.run(
- table_query, names=table_names
- )
- # 创建存在状态映射
- exist_map = {}
- for record in table_results:
- table_name = record["name"]
- exists = record["exists"]
- exist_map[table_name] = exists
- # 更新存在的表的状态
- for table_item in ddl_list:
- if (isinstance(table_item, dict)
- and 'table_info' in table_item):
- info = table_item['table_info']
- t_name = info.get('name_en')
- if t_name and t_name in exist_map:
- table_item["exist"] = exist_map[t_name]
- except Exception as e:
- logger.error(f"检查业务领域存在状态失败: {str(e)}")
- # 如果查询失败,所有表保持默认的False状态
- elif isinstance(ddl_list, dict):
- # 兼容旧格式:字典格式(以表名为key)
- table_names = list(ddl_list.keys())
- # 首先为所有表设置默认的exist状态
- for table_name in table_names:
- if isinstance(ddl_list[table_name], dict):
- ddl_list[table_name]["exist"] = False
- else:
- logger.warning(
- f"表 {table_name} 的值不是字典类型: "
- f"{type(ddl_list[table_name])}"
- )
- if table_names:
- try:
- # 查询业务领域是否存在
- with neo4j_driver.get_session() as session:
- table_query = """
- UNWIND $names AS name
- OPTIONAL MATCH (n:BusinessDomain {name_en: name})
- RETURN name, n IS NOT NULL AS exists
- """
- table_results = session.run(
- table_query, names=table_names
- )
- # 更新存在的表的状态
- for record in table_results:
- table_name = record["name"]
- exists = record["exists"]
- is_valid = (
- table_name in ddl_list
- and isinstance(ddl_list[table_name], dict)
- )
- if is_valid:
- ddl_list[table_name]["exist"] = exists
- except Exception as e:
- logger.error(f"检查业务领域存在状态失败: {str(e)}")
- # 如果查询失败,所有表保持默认的False状态
- logger.debug(
- f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}"
- )
- return jsonify(success(ddl_list))
- except Exception as e:
- logger.error(f"解析DDL语句失败: {str(e)}")
- logger.error(traceback.format_exc())
- return jsonify(failed(str(e)))
- @bp.route('/search', methods=['POST'])
- def bd_search():
- """
- 搜索业务领域关联的元数据
- 请求参数 (JSON):
- - id: 业务领域节点ID(必填)
- - current: 当前页码,默认1
- - size: 每页大小,默认10
- - name_en: 英文名称过滤条件(可选)
- - name_zh: 中文名称过滤条件(可选)
- - category: 分类过滤条件(可选)
- - tag: 标签过滤条件(可选)
- 返回:
- - success: 是否成功
- - message: 消息
- - data:
- - records: 元数据列表
- - total: 总数量
- - size: 每页大小
- - current: 当前页码
- """
- try:
- # 获取分页和筛选参数
- if not request.json:
- return jsonify(failed('请求数据不能为空'))
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- domain_id = request.json.get('id')
- name_en_filter = request.json.get('name_en')
- name_zh_filter = request.json.get('name_zh')
- category_filter = request.json.get('category')
- tag_filter = request.json.get('tag')
- if domain_id is None:
- return jsonify(failed("业务领域ID不能为空"))
- # 确保传入的ID为整数
- try:
- domain_id = int(domain_id)
- except (ValueError, TypeError):
- return jsonify(failed(
- f"业务领域ID必须为整数, 收到的是: {domain_id}"
- ))
- # 记录请求信息
- logger.info(f"获取业务领域关联元数据请求,ID: {domain_id}")
- # 调用业务逻辑查询关联元数据
- metadata_list, total_count = business_domain_search_list(
- domain_id,
- page,
- page_size,
- name_en_filter,
- name_zh_filter,
- category_filter,
- tag_filter
- )
- # 返回结果
- return jsonify(success({
- "records": metadata_list,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"业务领域关联元数据搜索失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/compose', methods=['POST'])
- def bd_compose():
- """
- 从已有业务领域中组合创建新的业务领域
- 请求参数 (JSON):
- - name_zh: 中文名称(必填)
- - name_en: 英文名称(可选,不提供则自动翻译)
- - id_list: 关联的业务领域和元数据列表(必填)
- 格式: [{"domain_id": 123, "metaData": [{"id": 456}, ...]}]
- - describe: 描述(可选)
- - type: 类型(可选)
- - category: 分类(可选)
- - tag: 标签ID(可选)
- - data_source: 数据源ID(可选)
- 返回:
- - success: 是否成功
- - message: 消息
- - data: 创建后的业务领域数据
- """
- try:
- # 获取请求数据
- data = request.json
- if not data:
- return jsonify(failed("请求数据不能为空"))
- # 校验必填字段
- if not data.get("name_zh"):
- return jsonify(failed("name_zh 为必填项"))
- if not data.get("id_list"):
- return jsonify(failed("id_list 为必填项"))
- # 调用业务逻辑组合创建业务领域
- result_data = business_domain_compose(data)
- # 构建响应数据
- response_data = {
- "business_domain": result_data
- }
- return jsonify(success(response_data))
- except Exception as e:
- logger.error(f"组合创建业务领域失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/labellist', methods=['POST'])
- def bd_label_list():
- """
- 获取数据标签列表(用于业务领域关联)
- 请求参数 (JSON):
- - current: 当前页码,默认1
- - size: 每页大小,默认10
- - name_en: 英文名称过滤条件(可选)
- - name_zh: 中文名称过滤条件(可选)
- - category: 分类过滤条件(可选)
- - group: 分组过滤条件(可选)
- 返回:
- - success: 是否成功
- - message: 消息
- - data:
- - records: 标签列表
- - total: 总数量
- - size: 每页大小
- - current: 当前页码
- """
- try:
- # 获取分页和筛选参数
- if not request.json:
- return jsonify(failed('请求数据不能为空'))
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- name_en_filter = request.json.get('name_en')
- name_zh_filter = request.json.get('name_zh')
- category_filter = request.json.get('category')
- group_filter = request.json.get('group')
- # 调用业务逻辑查询标签列表
- labels, total_count = business_domain_label_list(
- page,
- page_size,
- name_en_filter,
- name_zh_filter,
- category_filter,
- group_filter
- )
- # 返回结果
- return jsonify(success({
- "records": labels,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"获取标签列表失败: {str(e)}")
- return jsonify(failed(str(e)))
|