|
- from flask import request, jsonify, send_from_directory, send_file, current_app
- from app.api.meta_data import bp
- from app.models.result import success, failed
- import logging
- import json
- import io
- import os
- from minio import Minio
- from minio.error import S3Error
- from app.services.neo4j_driver import neo4j_driver
- from app.services.package_function import create_or_get_node, relationship_exists
- from app.core.meta_data import (
- translate_and_parse,
- get_formatted_time,
- meta_list,
- meta_kinship_graph,
- meta_impact_graph,
- parse_text,
- parse_entity_relation,
- handle_txt_graph,
- get_file_content,
- text_resource_solve,
- handle_id_unstructured,
- solve_unstructured_data
- )
- from app.core.system.auth import require_auth
- logger = logging.getLogger("app")
- def get_minio_client():
- """获取 MinIO 客户端实例"""
- return Minio(
- current_app.config['MINIO_HOST'],
- access_key=current_app.config['MINIO_USER'],
- secret_key=current_app.config['MINIO_PASSWORD'],
- secure=current_app.config['MINIO_SECURE']
- )
- def get_minio_config():
- """获取 MinIO 配置"""
- return {
- 'bucket_name': current_app.config['BUCKET_NAME'],
- 'prefix': current_app.config['PREFIX'],
- 'allowed_extensions': current_app.config['ALLOWED_EXTENSIONS']
- }
- def allowed_file(filename):
- """检查文件扩展名是否允许"""
- return '.' in filename and filename.rsplit('.', 1)[1].lower() in get_minio_config()['allowed_extensions']
- # 元数据列表
- @bp.route('/node/list', methods=['POST'])
- def meta_node_list():
- try:
- # 从请求中获取分页参数
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- # 获取搜索参数
- search = request.json.get('search', '')
- en_name_filter = request.json.get('en_name', None)
- name_filter = request.json.get('name', None)
- category_filter = request.json.get('category', None)
- time_filter = request.json.get('time', None)
- tag_filter = request.json.get('tag', None)
-
- # 调用核心业务逻辑
- result, total_count = meta_list(
- page,
- page_size,
- search,
- en_name_filter,
- name_filter,
- category_filter,
- time_filter,
- tag_filter
- )
-
- # 返回结果
- return jsonify(success({
- "records": result,
- "total": total_count,
- "size": page_size,
- "current": page
- }))
- except Exception as e:
- logger.error(f"获取元数据列表失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 元数据图谱
- @bp.route('/node/graph', methods=['POST'])
- def meta_node_graph():
- try:
- # 从请求中获取节点ID
- node_id = request.json.get('nodeId')
-
- # 调用核心业务逻辑
- result = meta_kinship_graph(node_id)
-
- # 返回结果
- return jsonify(success(result))
- except Exception as e:
- logger.error(f"获取元数据图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 删除元数据
- @bp.route('/node/delete', methods=['POST'])
- def meta_node_delete():
- try:
- # 从请求中获取节点ID
- node_id = request.json.get('id')
-
- # 删除节点逻辑
- with neo4j_driver.get_session() as session:
- cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n"
- session.run(cypher, node_id=int(node_id))
-
- # 返回结果
- return jsonify(success({}))
- except Exception as e:
- logger.error(f"删除元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 编辑元数据
- @bp.route('/node/edit', methods=['POST'])
- def meta_node_edit():
- try:
- # 从请求中获取节点信息
- node_id = request.json.get('id')
- node_name = request.json.get('name')
- node_type = request.json.get('type')
- node_desc = request.json.get('desc', '')
- node_properties = request.json.get('properties', {})
-
- with neo4j_driver.get_session() as session:
- # 更新节点属性
- cypher = """
- MATCH (n) WHERE id(n) = $node_id
- SET n.name = $name, n.type = $type, n.desc = $desc,
- n.properties = $properties, n.updateTime = $update_time
- RETURN n
- """
- update_time = get_formatted_time()
- result = session.run(
- cypher,
- node_id=int(node_id),
- name=node_name,
- type=node_type,
- desc=node_desc,
- properties=node_properties,
- update_time=update_time
- )
-
- node = result.single()
- if node:
- return jsonify(success(dict(node["n"])))
- else:
- return jsonify(failed("节点不存在"))
- except Exception as e:
- logger.error(f"编辑元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 增加元数据
- @bp.route('/node/add', methods=['POST'])
- def meta_node_add():
- try:
- # 从请求中获取节点信息
- node_name = request.json.get('name')
- node_type = request.json.get('type')
- node_desc = request.json.get('desc', '')
- node_properties = request.json.get('properties', {})
-
- # 创建节点
- with neo4j_driver.get_session() as session:
- cypher = """
- CREATE (n:Metadata {name: $name, type: $type, desc: $desc,
- properties: $properties, createTime: $create_time,
- updateTime: $update_time})
- RETURN n
- """
- create_time = update_time = get_formatted_time()
- result = session.run(
- cypher,
- name=node_name,
- type=node_type,
- desc=node_desc,
- properties=node_properties,
- create_time=create_time,
- update_time=update_time
- )
-
- node = result.single()
- return jsonify(success(dict(node["n"])))
- except Exception as e:
- logger.error(f"添加元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 搜索元数据
- @bp.route('/search', methods=['GET'])
- def search_metadata_route():
- try:
- keyword = request.args.get('keyword', '')
- if not keyword:
- return jsonify(success([]))
-
- cypher = """
- MATCH (n:Metadata)
- WHERE n.name CONTAINS $keyword
- RETURN n LIMIT 100
- """
-
- with neo4j_driver.get_session() as session:
- result = session.run(cypher, keyword=keyword)
- metadata_list = [dict(record["n"]) for record in result]
-
- return jsonify(success(metadata_list))
- except Exception as e:
- logger.error(f"搜索元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 全文检索查询
- @bp.route('/full/text/query', methods=['POST'])
- def full_text_query():
- try:
- # 获取查询条件
- query = request.json.get('query', '')
- if not query:
- return jsonify(failed("查询条件不能为空"))
-
- # 执行Neo4j全文索引查询
- with neo4j_driver.get_session() as session:
- cypher = """
- CALL db.index.fulltext.queryNodes("metadataFulltext", $query)
- YIELD node, score
- RETURN node, score
- ORDER BY score DESC
- LIMIT 20
- """
-
- result = session.run(cypher, query=query)
-
- # 处理查询结果
- search_results = []
- for record in result:
- node_data = dict(record["node"])
- node_data["id"] = record["node"].id
- node_data["score"] = record["score"]
- search_results.append(node_data)
-
- return jsonify(success(search_results))
- except Exception as e:
- logger.error(f"全文检索查询失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 非结构化文本查询
- @bp.route('/unstructure/text/query', methods=['POST'])
- def unstructure_text_query():
- try:
- # 获取查询参数
- node_id = request.json.get('id')
- if not node_id:
- return jsonify(failed("节点ID不能为空"))
-
- # 获取节点信息
- node_data = handle_id_unstructured(node_id)
- if not node_data:
- return jsonify(failed("节点不存在"))
-
- # 获取对象路径
- object_name = node_data.get('url')
- if not object_name:
- return jsonify(failed("文档路径不存在"))
-
- # 从MinIO获取文件内容
- file_content = get_file_content(minio_client, bucket_name, object_name)
-
- # 解析文本内容
- parsed_data = parse_text(file_content)
-
- # 返回结果
- result = {
- "node": node_data,
- "parsed": parsed_data,
- "content": file_content[:1000] + "..." if len(file_content) > 1000 else file_content
- }
-
- return jsonify(success(result))
- except Exception as e:
- logger.error(f"非结构化文本查询失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 文件上传
- @bp.route('/resource/upload', methods=['POST'])
- def upload_file():
- try:
- # 检查请求中是否有文件
- if 'file' not in request.files:
- return jsonify(failed("没有找到上传的文件"))
-
- file = request.files['file']
-
- # 检查文件名
- if file.filename == '':
- return jsonify(failed("未选择文件"))
-
- # 检查文件类型
- if not allowed_file(file.filename):
- return jsonify(failed("不支持的文件类型"))
-
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
-
- # 上传到MinIO
- file_content = file.read()
- file_size = len(file_content)
- file_type = file.filename.rsplit('.', 1)[1].lower()
-
- # 提取文件名(不包含扩展名)
- filename_without_ext = file.filename.rsplit('.', 1)[0]
-
- # 生成紧凑的时间戳 (yyyyMMddHHmmss)
- import time
- timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
-
- # 生成唯一文件名
- object_name = f"{config['prefix']}/{filename_without_ext}_{timestamp}.{file_type}"
-
- # 上传文件
- minio_client.put_object(
- config['bucket_name'],
- object_name,
- io.BytesIO(file_content),
- file_size,
- content_type=f"application/{file_type}"
- )
-
- # 返回结果
- return jsonify(success({
- "filename": file.filename,
- "size": file_size,
- "type": file_type,
- "url": object_name
- }))
- except Exception as e:
- logger.error(f"文件上传失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 文件下载显示
- @bp.route('/resource/display', methods=['POST'])
- def upload_file_display():
- response = None
- try:
- object_name = request.json.get('url')
- if not object_name:
- return jsonify(failed("文件路径不能为空"))
-
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
-
- # 获取文件内容
- response = minio_client.get_object(config['bucket_name'], object_name)
- file_data = response.read()
-
- # 获取文件名
- file_name = object_name.split('/')[-1]
-
- # 确定文件类型
- file_extension = file_name.split('.')[-1].lower()
-
- # 为不同文件类型设置合适的MIME类型
- mime_types = {
- 'pdf': 'application/pdf',
- 'doc': 'application/msword',
- 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
- 'xls': 'application/vnd.ms-excel',
- 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
- 'txt': 'text/plain',
- 'csv': 'text/csv'
- }
-
- content_type = mime_types.get(file_extension, 'application/octet-stream')
-
- # 返回结果
- return jsonify(success({
- "filename": file_name,
- "type": file_extension,
- "contentType": content_type,
- "size": len(file_data),
- "url": f"/api/meta/resource/download?url={object_name}"
- }))
- except S3Error as e:
- logger.error(f"MinIO操作失败: {str(e)}")
- return jsonify(failed(f"文件访问失败: {str(e)}"))
- except Exception as e:
- logger.error(f"文件显示信息获取失败: {str(e)}")
- return jsonify(failed(str(e)))
- finally:
- if response:
- response.close()
- response.release_conn()
- # 文件下载接口
- @bp.route('/resource/download', methods=['GET'])
- def download_file():
- response = None
- try:
- object_name = request.args.get('url')
- if not object_name:
- return jsonify(failed("文件路径不能为空"))
-
- # URL解码,处理特殊字符
- import urllib.parse
- object_name = urllib.parse.unquote(object_name)
-
- # 记录下载请求信息,便于调试
- logger.info(f"下载文件请求: {object_name}")
-
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
-
- # 获取文件
- try:
- response = minio_client.get_object(config['bucket_name'], object_name)
- file_data = response.read()
- except S3Error as e:
- logger.error(f"MinIO获取文件失败: {str(e)}")
- return jsonify(failed(f"文件获取失败: {str(e)}"))
-
- # 获取文件名,并处理特殊字符
- file_name = object_name.split('/')[-1]
-
- # 直接从内存返回文件,不创建临时文件
- file_stream = io.BytesIO(file_data)
-
- # 返回文件
- return send_file(
- file_stream,
- as_attachment=True,
- download_name=file_name,
- mimetype="application/octet-stream"
- )
- except Exception as e:
- logger.error(f"文件下载失败: {str(e)}")
- return jsonify(failed(str(e)))
- finally:
- if response:
- response.close()
- response.release_conn()
- # 文本资源翻译
- @bp.route('/resource/translate', methods=['POST'])
- def text_resource_translate():
- try:
- # 获取参数
- name = request.json.get('name', '')
- keyword = request.json.get('keyword', '')
-
- if not name:
- return jsonify(failed("名称不能为空"))
-
- # 调用资源处理逻辑
- result = text_resource_solve(None, name, keyword)
-
- return jsonify(success(result))
- except Exception as e:
- logger.error(f"文本资源翻译失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 创建文本资源节点
- @bp.route('/resource/node', methods=['POST'])
- def text_resource_node():
- try:
- # 获取参数
- name = request.json.get('name', '')
- en_name = request.json.get('en_name', '')
- keywords = request.json.get('keywords', [])
- keywords_en = request.json.get('keywords_en', [])
- object_name = request.json.get('url', '')
-
- if not name or not en_name or not object_name:
- return jsonify(failed("参数不完整"))
-
- # 创建节点
- with neo4j_driver.get_session() as session:
- # 创建资源节点
- cypher = """
- CREATE (n:TextResource {
- name: $name,
- en_name: $en_name,
- keywords: $keywords,
- keywords_en: $keywords_en,
- url: $object_name,
- createTime: $create_time,
- updateTime: $update_time
- })
- RETURN n
- """
-
- create_time = update_time = get_formatted_time()
- result = session.run(
- cypher,
- name=name,
- en_name=en_name,
- keywords=keywords,
- keywords_en=keywords_en,
- object_name=object_name,
- create_time=create_time,
- update_time=update_time
- )
-
- node = result.single()["n"]
-
- # 为每个关键词创建标签节点并关联
- for i, keyword in enumerate(keywords):
- if keyword:
- # 创建标签节点
- tag_cypher = """
- MERGE (t:Tag {name: $name})
- ON CREATE SET t.en_name = $en_name, t.createTime = $create_time
- RETURN t
- """
-
- tag_result = session.run(
- tag_cypher,
- name=keyword,
- en_name=keywords_en[i] if i < len(keywords_en) else "",
- create_time=create_time
- )
-
- tag_node = tag_result.single()["t"]
-
- # 创建关系
- rel_cypher = """
- MATCH (n), (t)
- WHERE id(n) = $node_id AND id(t) = $tag_id
- CREATE (n)-[r:HAS_TAG]->(t)
- RETURN r
- """
-
- session.run(
- rel_cypher,
- node_id=node.id,
- tag_id=tag_node.id
- )
-
- # 返回创建的节点
- return jsonify(success(dict(node)))
- except Exception as e:
- logger.error(f"创建文本资源节点失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 处理非结构化数据
- @bp.route('/unstructured/process', methods=['POST'])
- def processing_unstructured_data():
- try:
- # 获取参数
- node_id = request.json.get('id')
- if not node_id:
- return jsonify(failed("节点ID不能为空"))
-
- # 调用处理逻辑
- result = solve_unstructured_data(node_id, minio_client, prefix)
-
- if result:
- return jsonify(success({"message": "处理成功"}))
- else:
- return jsonify(failed("处理失败"))
- except Exception as e:
- logger.error(f"处理非结构化数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 创建文本图谱
- @bp.route('/text/graph', methods=['POST'])
- def create_text_graph():
- try:
- # 获取参数
- node_id = request.json.get('id')
- entity = request.json.get('entity')
- entity_en = request.json.get('entity_en')
-
- if not all([node_id, entity, entity_en]):
- return jsonify(failed("参数不完整"))
-
- # 创建图谱
- result = handle_txt_graph(node_id, entity, entity_en)
-
- if result:
- return jsonify(success({"message": "图谱创建成功"}))
- else:
- return jsonify(failed("图谱创建失败"))
- except Exception as e:
- logger.error(f"创建文本图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/config', methods=['GET'])
- @require_auth
- def get_meta_config():
- """获取元数据配置信息"""
- config = get_minio_config()
- return jsonify({
- 'bucket_name': config['bucket_name'],
- 'prefix': config['prefix'],
- 'allowed_extensions': list(config['allowed_extensions'])
- })
|