from flask import request, jsonify, send_from_directory, send_file, current_app from app.api.meta_data import bp from app.models.result import success, failed import logging import json import io import os from minio import Minio from minio.error import S3Error from app.services.neo4j_driver import neo4j_driver from app.services.package_function import create_or_get_node, relationship_exists from app.core.meta_data import ( translate_and_parse, get_formatted_time, meta_list, meta_kinship_graph, meta_impact_graph, parse_text, parse_entity_relation, handle_txt_graph, get_file_content, text_resource_solve, handle_id_unstructured, solve_unstructured_data ) from app.core.system.auth import require_auth logger = logging.getLogger("app") def get_minio_client(): """获取 MinIO 客户端实例""" return Minio( current_app.config['MINIO_HOST'], access_key=current_app.config['MINIO_USER'], secret_key=current_app.config['MINIO_PASSWORD'], secure=current_app.config['MINIO_SECURE'] ) def get_minio_config(): """获取 MinIO 配置""" return { 'bucket_name': current_app.config['BUCKET_NAME'], 'prefix': current_app.config['PREFIX'], 'allowed_extensions': current_app.config['ALLOWED_EXTENSIONS'] } def allowed_file(filename): """检查文件扩展名是否允许""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in get_minio_config()['allowed_extensions'] # 元数据列表 @bp.route('/node/list', methods=['POST']) def meta_node_list(): try: # 从请求中获取分页参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) # 获取搜索参数 search = request.json.get('search', '') en_name_filter = request.json.get('en_name', None) name_filter = request.json.get('name', None) category_filter = request.json.get('category', None) time_filter = request.json.get('time', None) tag_filter = request.json.get('tag', None) # 调用核心业务逻辑 result, total_count = meta_list( page, page_size, search, en_name_filter, name_filter, category_filter, time_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": result, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取元数据列表失败: {str(e)}") return jsonify(failed(str(e))) # 元数据图谱 @bp.route('/node/graph', methods=['POST']) def meta_node_graph(): try: # 从请求中获取节点ID node_id = request.json.get('nodeId') # 调用核心业务逻辑 result = meta_kinship_graph(node_id) # 返回结果 return jsonify(success(result)) except Exception as e: logger.error(f"获取元数据图谱失败: {str(e)}") return jsonify(failed(str(e))) # 删除元数据 @bp.route('/node/delete', methods=['POST']) def meta_node_delete(): try: # 从请求中获取节点ID node_id = request.json.get('id') # 删除节点逻辑 with neo4j_driver.get_session() as session: cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n" session.run(cypher, node_id=int(node_id)) # 返回结果 return jsonify(success({})) except Exception as e: logger.error(f"删除元数据失败: {str(e)}") return jsonify(failed(str(e))) # 编辑元数据 @bp.route('/node/edit', methods=['POST']) def meta_node_edit(): try: # 从请求中获取节点信息 node_id = request.json.get('id') node_name = request.json.get('name') node_type = request.json.get('type') node_desc = request.json.get('desc', '') node_properties = request.json.get('properties', {}) with neo4j_driver.get_session() as session: # 更新节点属性 cypher = """ MATCH (n) WHERE id(n) = $node_id SET n.name = $name, n.data_type = $type, n.desc = $desc, n.properties = $properties, n.updateTime = $update_time RETURN n """ update_time = get_formatted_time() result = session.run( cypher, node_id=int(node_id), name=node_name, type=node_type, desc=node_desc, properties=node_properties, update_time=update_time ) node = result.single() if node: return jsonify(success(dict(node["n"]))) else: return jsonify(failed("节点不存在")) except Exception as e: logger.error(f"编辑元数据失败: {str(e)}") return jsonify(failed(str(e))) # 增加元数据 @bp.route('/node/add', methods=['POST']) def meta_node_add(): try: # 从请求中获取节点信息 node_name = request.json.get('name') node_type = request.json.get('type') node_desc = request.json.get('desc', '') node_properties = request.json.get('properties', {}) # 创建节点 with neo4j_driver.get_session() as session: cypher = """ CREATE (n:meta_data {name: $name, data_type: $type, desc: $desc, properties: $properties, createTime: $create_time, updateTime: $update_time}) RETURN n """ create_time = update_time = get_formatted_time() result = session.run( cypher, name=node_name, type=node_type, desc=node_desc, properties=node_properties, create_time=create_time, update_time=update_time ) node = result.single() return jsonify(success(dict(node["n"]))) except Exception as e: logger.error(f"添加元数据失败: {str(e)}") return jsonify(failed(str(e))) # 搜索元数据 @bp.route('/search', methods=['GET']) def search_metadata_route(): try: keyword = request.args.get('keyword', '') if not keyword: return jsonify(success([])) cypher = """ MATCH (n:meta_data) WHERE n.name CONTAINS $keyword RETURN n LIMIT 100 """ with neo4j_driver.get_session() as session: result = session.run(cypher, keyword=keyword) metadata_list = [dict(record["n"]) for record in result] return jsonify(success(metadata_list)) except Exception as e: logger.error(f"搜索元数据失败: {str(e)}") return jsonify(failed(str(e))) # 全文检索查询 @bp.route('/full/text/query', methods=['POST']) def full_text_query(): try: # 获取查询条件 query = request.json.get('query', '') if not query: return jsonify(failed("查询条件不能为空")) # 执行Neo4j全文索引查询 with neo4j_driver.get_session() as session: cypher = """ CALL db.index.fulltext.queryNodes("meta_dataFulltext", $query) YIELD node, score RETURN node, score ORDER BY score DESC LIMIT 20 """ result = session.run(cypher, query=query) # 处理查询结果 search_results = [] for record in result: node_data = dict(record["node"]) node_data["id"] = record["node"].id node_data["score"] = record["score"] search_results.append(node_data) return jsonify(success(search_results)) except Exception as e: logger.error(f"全文检索查询失败: {str(e)}") return jsonify(failed(str(e))) # 非结构化文本查询 @bp.route('/unstructure/text/query', methods=['POST']) def unstructure_text_query(): try: # 获取查询参数 node_id = request.json.get('id') if not node_id: return jsonify(failed("节点ID不能为空")) # 获取节点信息 node_data = handle_id_unstructured(node_id) if not node_data: return jsonify(failed("节点不存在")) # 获取对象路径 object_name = node_data.get('url') if not object_name: return jsonify(failed("文档路径不存在")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() bucket_name = config['bucket_name'] # 从MinIO获取文件内容 file_content = get_file_content(minio_client, bucket_name, object_name) # 解析文本内容 parsed_data = parse_text(file_content) # 返回结果 result = { "node": node_data, "parsed": parsed_data, "content": file_content[:1000] + "..." if len(file_content) > 1000 else file_content } return jsonify(success(result)) except Exception as e: logger.error(f"非结构化文本查询失败: {str(e)}") return jsonify(failed(str(e))) # 文件上传 @bp.route('/resource/upload', methods=['POST']) def upload_file(): try: # 检查请求中是否有文件 if 'file' not in request.files: return jsonify(failed("没有找到上传的文件")) file = request.files['file'] # 检查文件名 if file.filename == '': return jsonify(failed("未选择文件")) # 检查文件类型 if not allowed_file(file.filename): return jsonify(failed("不支持的文件类型")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 上传到MinIO file_content = file.read() file_size = len(file_content) file_type = file.filename.rsplit('.', 1)[1].lower() # 提取文件名(不包含扩展名) filename_without_ext = file.filename.rsplit('.', 1)[0] # 生成紧凑的时间戳 (yyyyMMddHHmmss) import time timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) # 生成唯一文件名 object_name = f"{config['prefix']}/{filename_without_ext}_{timestamp}.{file_type}" # 上传文件 minio_client.put_object( config['bucket_name'], object_name, io.BytesIO(file_content), file_size, content_type=f"application/{file_type}" ) # 返回结果 return jsonify(success({ "filename": file.filename, "size": file_size, "type": file_type, "url": object_name })) except Exception as e: logger.error(f"文件上传失败: {str(e)}") return jsonify(failed(str(e))) # 文件下载显示 @bp.route('/resource/display', methods=['POST']) def upload_file_display(): response = None try: object_name = request.json.get('url') if not object_name: return jsonify(failed("文件路径不能为空")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 获取文件内容 response = minio_client.get_object(config['bucket_name'], object_name) file_data = response.read() # 获取文件名 file_name = object_name.split('/')[-1] # 确定文件类型 file_extension = file_name.split('.')[-1].lower() # 为不同文件类型设置合适的MIME类型 mime_types = { 'pdf': 'application/pdf', 'doc': 'application/msword', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'xls': 'application/vnd.ms-excel', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'txt': 'text/plain', 'csv': 'text/csv' } content_type = mime_types.get(file_extension, 'application/octet-stream') # 返回结果 return jsonify(success({ "filename": file_name, "type": file_extension, "contentType": content_type, "size": len(file_data), "url": f"/api/meta/resource/download?url={object_name}" })) except S3Error as e: logger.error(f"MinIO操作失败: {str(e)}") return jsonify(failed(f"文件访问失败: {str(e)}")) except Exception as e: logger.error(f"文件显示信息获取失败: {str(e)}") return jsonify(failed(str(e))) finally: if response: response.close() response.release_conn() # 文件下载接口 @bp.route('/resource/download', methods=['GET']) def download_file(): response = None try: object_name = request.args.get('url') if not object_name: return jsonify(failed("文件路径不能为空")) # URL解码,处理特殊字符 import urllib.parse object_name = urllib.parse.unquote(object_name) # 记录下载请求信息,便于调试 logger.info(f"下载文件请求: {object_name}") # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 获取文件 try: response = minio_client.get_object(config['bucket_name'], object_name) file_data = response.read() except S3Error as e: logger.error(f"MinIO获取文件失败: {str(e)}") return jsonify(failed(f"文件获取失败: {str(e)}")) # 获取文件名,并处理特殊字符 file_name = object_name.split('/')[-1] # 直接从内存返回文件,不创建临时文件 file_stream = io.BytesIO(file_data) # 返回文件 return send_file( file_stream, as_attachment=True, download_name=file_name, mimetype="application/octet-stream" ) except Exception as e: logger.error(f"文件下载失败: {str(e)}") return jsonify(failed(str(e))) finally: if response: response.close() response.release_conn() # 文本资源翻译 @bp.route('/resource/translate', methods=['POST']) def text_resource_translate(): try: # 获取参数 name = request.json.get('name', '') keyword = request.json.get('keyword', '') if not name: return jsonify(failed("名称不能为空")) # 调用资源处理逻辑 result = text_resource_solve(None, name, keyword) return jsonify(success(result)) except Exception as e: logger.error(f"文本资源翻译失败: {str(e)}") return jsonify(failed(str(e))) # 创建文本资源节点 @bp.route('/resource/node', methods=['POST']) def text_resource_node(): try: # 获取参数 name = request.json.get('name', '') en_name = request.json.get('en_name', '') keywords = request.json.get('keywords', []) keywords_en = request.json.get('keywords_en', []) object_name = request.json.get('url', '') if not name or not en_name or not object_name: return jsonify(failed("参数不完整")) # 创建节点 with neo4j_driver.get_session() as session: # 创建资源节点 cypher = """ CREATE (n:meta_data { name: $name, en_name: $en_name, keywords: $keywords, keywords_en: $keywords_en, url: $object_name, createTime: $create_time, updateTime: $update_time }) RETURN n """ create_time = update_time = get_formatted_time() result = session.run( cypher, name=name, en_name=en_name, keywords=keywords, keywords_en=keywords_en, object_name=object_name, create_time=create_time, update_time=update_time ) node = result.single()["n"] # 为每个关键词创建标签节点并关联 for i, keyword in enumerate(keywords): if keyword: # 创建标签节点 tag_cypher = """ MERGE (t:Tag {name: $name}) ON CREATE SET t.en_name = $en_name, t.createTime = $create_time RETURN t """ tag_result = session.run( tag_cypher, name=keyword, en_name=keywords_en[i] if i < len(keywords_en) else "", create_time=create_time ) tag_node = tag_result.single()["t"] # 创建关系 rel_cypher = """ MATCH (n), (t) WHERE id(n) = $node_id AND id(t) = $tag_id CREATE (n)-[r:HAS_TAG]->(t) RETURN r """ session.run( rel_cypher, node_id=node.id, tag_id=tag_node.id ) # 返回创建的节点 return jsonify(success(dict(node))) except Exception as e: logger.error(f"创建文本资源节点失败: {str(e)}") return jsonify(failed(str(e))) # 处理非结构化数据 @bp.route('/unstructured/process', methods=['POST']) def processing_unstructured_data(): try: # 获取参数 node_id = request.json.get('id') if not node_id: return jsonify(failed("节点ID不能为空")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() prefix = config['prefix'] # 调用处理逻辑 result = solve_unstructured_data(node_id, minio_client, prefix) if result: return jsonify(success({"message": "处理成功"})) else: return jsonify(failed("处理失败")) except Exception as e: logger.error(f"处理非结构化数据失败: {str(e)}") return jsonify(failed(str(e))) # 创建文本图谱 @bp.route('/text/graph', methods=['POST']) def create_text_graph(): try: # 获取参数 node_id = request.json.get('id') entity = request.json.get('entity') entity_en = request.json.get('entity_en') if not all([node_id, entity, entity_en]): return jsonify(failed("参数不完整")) # 创建图谱 result = handle_txt_graph(node_id, entity, entity_en) if result: return jsonify(success({"message": "图谱创建成功"})) else: return jsonify(failed("图谱创建失败")) except Exception as e: logger.error(f"创建文本图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/config', methods=['GET']) @require_auth def get_meta_config(): """获取元数据配置信息""" config = get_minio_config() return jsonify({ 'bucket_name': config['bucket_name'], 'prefix': config['prefix'], 'allowed_extensions': list(config['allowed_extensions']) })