from flask import request, jsonify, send_from_directory, send_file, current_app from app.api.meta_data import bp from app.models.result import success, failed import logging import json import io import os from minio import Minio from minio.error import S3Error from app.services.neo4j_driver import neo4j_driver from app.core.graph.graph_operations import create_or_get_node, relationship_exists from app.core.meta_data import ( translate_and_parse, get_formatted_time, meta_list, meta_kinship_graph, meta_impact_graph, parse_text, parse_entity_relation, handle_txt_graph, get_file_content, text_resource_solve, handle_id_unstructured, solve_unstructured_data ) from app.core.system.auth import require_auth logger = logging.getLogger("app") def get_minio_client(): """获取 MinIO 客户端实例""" return Minio( current_app.config['MINIO_HOST'], access_key=current_app.config['MINIO_USER'], secret_key=current_app.config['MINIO_PASSWORD'], secure=current_app.config['MINIO_SECURE'] ) def get_minio_config(): """获取 MinIO 配置""" return { 'MINIO_BUCKET': current_app.config['MINIO_BUCKET'], 'PREFIX': current_app.config['PREFIX'], 'ALLOWED_EXTENSIONS': current_app.config['ALLOWED_EXTENSIONS'] } def allowed_file(filename): """检查文件扩展名是否允许""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in get_minio_config()['ALLOWED_EXTENSIONS'] # 元数据列表 @bp.route('/node/list', methods=['POST']) def meta_node_list(): try: # 从请求中获取分页参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) # 获取搜索参数 search = request.json.get('search', '') en_name_filter = request.json.get('en_name', None) name_filter = request.json.get('name', None) category_filter = request.json.get('category', None) time_filter = request.json.get('time', None) tag_filter = request.json.get('tag', None) # 调用核心业务逻辑 result, total_count = meta_list( page, page_size, search, en_name_filter, name_filter, category_filter, time_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": result, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取元数据列表失败: {str(e)}") return jsonify(failed(str(e))) # 元数据图谱 @bp.route('/node/graph', methods=['POST']) def meta_node_graph(): try: # 从请求中获取节点ID node_id = request.json.get('nodeId') # 调用核心业务逻辑 result = meta_kinship_graph(node_id) # 返回结果 return jsonify(success(result)) except Exception as e: logger.error(f"获取元数据图谱失败: {str(e)}") return jsonify(failed(str(e))) # 删除元数据 @bp.route('/node/delete', methods=['POST']) def meta_node_delete(): try: # 从请求中获取节点ID node_id = request.json.get('id') # 删除节点逻辑 with neo4j_driver.get_session() as session: cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n" session.run(cypher, node_id=int(node_id)) # 返回结果 return jsonify(success({})) except Exception as e: logger.error(f"删除元数据失败: {str(e)}") return jsonify(failed(str(e))) # 编辑元数据 @bp.route('/node/edit', methods=['POST']) def meta_node_edit(): try: # 从请求中获取节点ID node_id = request.json.get('id') if not node_id: return jsonify(failed("节点ID不能为空")) # 获取节点 with neo4j_driver.get_session() as session: # 查询节点信息 cypher = """ MATCH (n:meta_data) WHERE id(n) = $node_id RETURN n """ result = session.run(cypher, node_id=int(node_id)) node = result.single() if not node or not node["n"]: return jsonify(failed("节点不存在")) # 获取节点数据 node_data = dict(node["n"]) node_data["id"] = node["n"].id # 获取标签信息 tag_cypher = """ MATCH (n:meta_data)-[:label]->(t:data_label) WHERE id(n) = $node_id RETURN t """ tag_result = session.run(tag_cypher, node_id=int(node_id)) tag = tag_result.single() # 获取主数据信息 master_data_cypher = """ MATCH (n:meta_data)-[:master_data]->(m:master_data) WHERE id(n) = $node_id RETURN m """ master_data_result = session.run(master_data_cypher, node_id=int(node_id)) master_data = master_data_result.single() # 构建返回数据 response_data = [{ "master_data": master_data["m"].id if master_data and master_data["m"] else None, "name": node_data.get("name", ""), "en_name": node_data.get("en_name", ""), "time": node_data.get("updateTime", ""), "status": node_data.get("status", "true") == "true", "type": node_data.get("type", ""), "tag": { "name": tag["t"].get("name", "") if tag and tag["t"] else None, "id": tag["t"].id if tag and tag["t"] else None }, "affiliation": node_data.get("affiliation"), "category": node_data.get("category"), "alias": node_data.get("alias"), "describe": node_data.get("describe") }] logger.info(f"成功获取元数据节点: ID={node_data['id']}") return jsonify(success(response_data)) except Exception as e: logger.error(f"获取元数据节点失败: {str(e)}") return jsonify(failed(str(e))) # 增加元数据 @bp.route('/node/add', methods=['POST']) def meta_node_add(): try: # 从请求中获取节点信息 node_name = request.json.get('name') node_type = request.json.get('type') node_category = request.json.get('category') node_alias = request.json.get('alias') node_affiliation = request.json.get('affiliation') node_tag = request.json.get('tag') node_desc = request.json.get('describe') node_status = request.json.get('status', 1) if not node_name: return jsonify(failed("节点名称不能为空")) if not node_type: return jsonify(failed("节点类型不能为空")) node_en_name = translate_and_parse(node_name) # 创建节点 with neo4j_driver.get_session() as session: cypher = """ MERGE (n:meta_data {name: $name}) ON CREATE SET n.data_type = $type, n.category = $category, n.alias = $alias, n.affiliation = $affiliation, n.desc = $desc, n.createTime = $create_time, n.updateTime = $update_time, n.status = $status, n.en_name = $en_name ON MATCH SET n.data_type = $type, n.category = $category, n.alias = $alias, n.affiliation = $affiliation, n.desc = $desc, n.updateTime = $update_time, n.status = $status, n.en_name = $en_name RETURN n """ create_time = update_time = get_formatted_time() result = session.run( cypher, name=node_name, type=node_type, category=node_category, alias=node_alias, affiliation=node_affiliation, desc=node_desc, create_time=create_time, update_time=update_time, status=str(node_status), en_name=node_en_name ) node = result.single() if node and node["n"]: node_data = dict(node["n"]) node_data["id"] = node["n"].id # 如果提供了标签ID,创建标签关系 if node_tag: tag_cypher = """ MATCH (n:meta_data), (t:data_label) WHERE id(n) = $node_id AND id(t) = $tag_id MERGE (n)-[r:label]->(t) RETURN r """ session.run(tag_cypher, node_id=node["n"].id, tag_id=int(node_tag)) logger.info(f"成功创建或更新元数据节点: ID={node_data['id']}, name={node_name}") return jsonify(success(node_data)) else: logger.error(f"创建元数据节点失败: {node_name}") return jsonify(failed("创建元数据节点失败")) except Exception as e: logger.error(f"添加元数据失败: {str(e)}") return jsonify(failed(str(e))) # 搜索元数据 @bp.route('/search', methods=['GET']) def search_metadata_route(): try: keyword = request.args.get('keyword', '') if not keyword: return jsonify(success([])) cypher = """ MATCH (n:meta_data) WHERE n.name CONTAINS $keyword RETURN n LIMIT 100 """ with neo4j_driver.get_session() as session: result = session.run(cypher, keyword=keyword) metadata_list = [dict(record["n"]) for record in result] return jsonify(success(metadata_list)) except Exception as e: logger.error(f"搜索元数据失败: {str(e)}") return jsonify(failed(str(e))) # 全文检索查询 @bp.route('/full/text/query', methods=['POST']) def full_text_query(): try: # 获取查询条件 query = request.json.get('query', '') if not query: return jsonify(failed("查询条件不能为空")) # 执行Neo4j全文索引查询 with neo4j_driver.get_session() as session: cypher = """ CALL db.index.fulltext.queryNodes("meta_dataFulltext", $query) YIELD node, score RETURN node, score ORDER BY score DESC LIMIT 20 """ result = session.run(cypher, query=query) # 处理查询结果 search_results = [] for record in result: node_data = dict(record["node"]) node_data["id"] = record["node"].id node_data["score"] = record["score"] search_results.append(node_data) return jsonify(success(search_results)) except Exception as e: logger.error(f"全文检索查询失败: {str(e)}") return jsonify(failed(str(e))) # 非结构化文本查询 @bp.route('/unstructure/text/query', methods=['POST']) def unstructure_text_query(): try: # 获取查询参数 node_id = request.json.get('id') if not node_id: return jsonify(failed("节点ID不能为空")) # 获取节点信息 node_data = handle_id_unstructured(node_id) if not node_data: return jsonify(failed("节点不存在")) # 获取对象路径 object_name = node_data.get('url') if not object_name: return jsonify(failed("文档路径不存在")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() bucket_name = config['MINIO_BUCKET'] # 从MinIO获取文件内容 file_content = get_file_content(minio_client, bucket_name, object_name) # 解析文本内容 parsed_data = parse_text(file_content) # 返回结果 result = { "node": node_data, "parsed": parsed_data, "content": file_content[:1000] + "..." if len(file_content) > 1000 else file_content } return jsonify(success(result)) except Exception as e: logger.error(f"非结构化文本查询失败: {str(e)}") return jsonify(failed(str(e))) # 文件上传 @bp.route('/resource/upload', methods=['POST']) def upload_file(): try: # 检查请求中是否有文件 if 'file' not in request.files: return jsonify(failed("没有找到上传的文件")) file = request.files['file'] # 检查文件名 if file.filename == '': return jsonify(failed("未选择文件")) # 检查文件类型 if not allowed_file(file.filename): return jsonify(failed("不支持的文件类型")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 上传到MinIO file_content = file.read() file_size = len(file_content) file_type = file.filename.rsplit('.', 1)[1].lower() # 提取文件名(不包含扩展名) filename_without_ext = file.filename.rsplit('.', 1)[0] # 生成紧凑的时间戳 (yyyyMMddHHmmss) import time timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) # 生成唯一文件名 object_name = f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}" # 上传文件 minio_client.put_object( config['MINIO_BUCKET'], object_name, io.BytesIO(file_content), file_size, content_type=f"application/{file_type}" ) # 返回结果 return jsonify(success({ "filename": file.filename, "size": file_size, "type": file_type, "url": object_name })) except Exception as e: logger.error(f"文件上传失败: {str(e)}") return jsonify(failed(str(e))) # 文件下载显示 @bp.route('/resource/display', methods=['POST']) def upload_file_display(): response = None try: object_name = request.json.get('url') if not object_name: return jsonify(failed("文件路径不能为空")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 获取文件内容 response = minio_client.get_object(config['MINIO_BUCKET'], object_name) file_data = response.read() # 获取文件名 file_name = object_name.split('/')[-1] # 确定文件类型 file_extension = file_name.split('.')[-1].lower() # 为不同文件类型设置合适的MIME类型 mime_types = { 'pdf': 'application/pdf', 'doc': 'application/msword', 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', 'xls': 'application/vnd.ms-excel', 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 'txt': 'text/plain', 'csv': 'text/csv' } content_type = mime_types.get(file_extension, 'application/octet-stream') # 返回结果 return jsonify(success({ "filename": file_name, "type": file_extension, "contentType": content_type, "size": len(file_data), "url": f"/api/meta/resource/download?url={object_name}" })) except S3Error as e: logger.error(f"MinIO操作失败: {str(e)}") return jsonify(failed(f"文件访问失败: {str(e)}")) except Exception as e: logger.error(f"文件显示信息获取失败: {str(e)}") return jsonify(failed(str(e))) finally: if response: response.close() response.release_conn() # 文件下载接口 @bp.route('/resource/download', methods=['GET']) def download_file(): response = None try: object_name = request.args.get('url') if not object_name: return jsonify(failed("文件路径不能为空")) # URL解码,处理特殊字符 import urllib.parse object_name = urllib.parse.unquote(object_name) # 记录下载请求信息,便于调试 logger.info(f"下载文件请求: {object_name}") # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 获取文件 try: response = minio_client.get_object(config['MINIO_BUCKET'], object_name) file_data = response.read() except S3Error as e: logger.error(f"MinIO获取文件失败: {str(e)}") return jsonify(failed(f"文件获取失败: {str(e)}")) # 获取文件名,并处理特殊字符 file_name = object_name.split('/')[-1] # 直接从内存返回文件,不创建临时文件 file_stream = io.BytesIO(file_data) # 返回文件 return send_file( file_stream, as_attachment=True, download_name=file_name, mimetype="application/octet-stream" ) except Exception as e: logger.error(f"文件下载失败: {str(e)}") return jsonify(failed(str(e))) finally: if response: response.close() response.release_conn() # 文本资源翻译 @bp.route('/resource/translate', methods=['POST']) def text_resource_translate(): try: # 获取参数 name = request.json.get('name', '') keyword = request.json.get('keyword', '') if not name: return jsonify(failed("名称不能为空")) # 调用资源处理逻辑 result = text_resource_solve(None, name, keyword) return jsonify(success(result)) except Exception as e: logger.error(f"文本资源翻译失败: {str(e)}") return jsonify(failed(str(e))) # 创建文本资源节点 @bp.route('/resource/node', methods=['POST']) def text_resource_node(): try: # 获取参数 name = request.json.get('name', '') en_name = request.json.get('en_name', '') keywords = request.json.get('keywords', []) keywords_en = request.json.get('keywords_en', []) object_name = request.json.get('url', '') if not name or not en_name or not object_name: return jsonify(failed("参数不完整")) # 创建节点 with neo4j_driver.get_session() as session: # 创建资源节点 cypher = """ CREATE (n:meta_data { name: $name, en_name: $en_name, keywords: $keywords, keywords_en: $keywords_en, url: $object_name, createTime: $create_time, updateTime: $update_time }) RETURN n """ create_time = update_time = get_formatted_time() result = session.run( cypher, name=name, en_name=en_name, keywords=keywords, keywords_en=keywords_en, object_name=object_name, create_time=create_time, update_time=update_time ) node = result.single()["n"] # 为每个关键词创建标签节点并关联 for i, keyword in enumerate(keywords): if keyword: # 创建标签节点 tag_cypher = """ MERGE (t:Tag {name: $name}) ON CREATE SET t.en_name = $en_name, t.createTime = $create_time RETURN t """ tag_result = session.run( tag_cypher, name=keyword, en_name=keywords_en[i] if i < len(keywords_en) else "", create_time=create_time ) tag_node = tag_result.single()["t"] # 创建关系 rel_cypher = """ MATCH (n), (t) WHERE id(n) = $node_id AND id(t) = $tag_id CREATE (n)-[r:HAS_TAG]->(t) RETURN r """ session.run( rel_cypher, node_id=node.id, tag_id=tag_node.id ) # 返回创建的节点 return jsonify(success(dict(node))) except Exception as e: logger.error(f"创建文本资源节点失败: {str(e)}") return jsonify(failed(str(e))) # 处理非结构化数据 @bp.route('/unstructured/process', methods=['POST']) def processing_unstructured_data(): try: # 获取参数 node_id = request.json.get('id') if not node_id: return jsonify(failed("节点ID不能为空")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() prefix = config['PREFIX'] # 调用处理逻辑 result = solve_unstructured_data(node_id, minio_client, prefix) if result: return jsonify(success({"message": "处理成功"})) else: return jsonify(failed("处理失败")) except Exception as e: logger.error(f"处理非结构化数据失败: {str(e)}") return jsonify(failed(str(e))) # 创建文本图谱 @bp.route('/text/graph', methods=['POST']) def create_text_graph(): try: # 获取参数 node_id = request.json.get('id') entity = request.json.get('entity') entity_en = request.json.get('entity_en') if not all([node_id, entity, entity_en]): return jsonify(failed("参数不完整")) # 创建图谱 result = handle_txt_graph(node_id, entity, entity_en) if result: return jsonify(success({"message": "图谱创建成功"})) else: return jsonify(failed("图谱创建失败")) except Exception as e: logger.error(f"创建文本图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/config', methods=['GET']) @require_auth def get_meta_config(): """获取元数据配置信息""" config = get_minio_config() return jsonify({ 'bucket_name': config['MINIO_BUCKET'], 'prefix': config['PREFIX'], 'allowed_extensions': list(config['ALLOWED_EXTENSIONS']) }) # 更新元数据 @bp.route('/node/update', methods=['POST']) def meta_node_update(): try: # 从请求中获取节点ID和更新数据 node_id = request.json.get('id') if not node_id: return jsonify(failed("节点ID不能为空")) # 更新节点 with neo4j_driver.get_session() as session: # 检查节点是否存在并获取当前值 check_cypher = """ MATCH (n:meta_data) WHERE id(n) = $node_id RETURN n """ result = session.run(check_cypher, node_id=int(node_id)) node = result.single() if not node or not node["n"]: return jsonify(failed("节点不存在")) # 获取当前节点的所有属性 current_node = node["n"] current_properties = dict(current_node) # 构建更新语句,只更新提供的属性 update_cypher = """ MATCH (n:meta_data) WHERE id(n) = $node_id SET n.updateTime = $update_time """ # 准备更新参数 update_params = { 'node_id': int(node_id), 'update_time': get_formatted_time() } # 处理每个可能的更新字段 fields_to_update = { 'name': request.json.get('name'), 'category': request.json.get('category'), 'alias': request.json.get('alias'), 'affiliation': request.json.get('affiliation'), 'type': request.json.get('type'), 'describe': request.json.get('describe'), 'status': request.json.get('status') } # 只更新提供了新值的字段 for field, new_value in fields_to_update.items(): if new_value is not None: update_cypher += f", n.{field} = ${field}\n" update_params[field] = new_value else: # 如果字段没有提供新值,使用当前值 update_params[field] = current_properties.get(field) update_cypher += f", n.{field} = ${field}\n" # 处理英文名称 if request.json.get('name'): update_cypher += ", n.en_name = $en_name\n" update_params['en_name'] = translate_and_parse(request.json.get('name')) else: update_cypher += ", n.en_name = $en_name\n" update_params['en_name'] = current_properties.get('en_name') update_cypher += "RETURN n" result = session.run(update_cypher, **update_params) updated_node = result.single() if updated_node and updated_node["n"]: node_data = dict(updated_node["n"]) node_data["id"] = updated_node["n"].id # 如果更新了标签,处理标签关系 tag = request.json.get('tag') if tag is not None: # 先删除现有标签关系 delete_tag_cypher = """ MATCH (n:meta_data)-[r:label]->(t:data_label) WHERE id(n) = $node_id DELETE r """ session.run(delete_tag_cypher, node_id=int(node_id)) # 创建新的标签关系 if tag and isinstance(tag, dict) and 'id' in tag: create_tag_cypher = """ MATCH (n:meta_data), (t:data_label) WHERE id(n) = $node_id AND id(t) = $tag_id MERGE (n)-[r:label]->(t) RETURN r """ session.run(create_tag_cypher, node_id=int(node_id), tag_id=int(tag['id'])) logger.info(f"成功更新元数据节点: ID={node_data['id']}") return jsonify(success(node_data)) else: logger.error(f"更新元数据节点失败: ID={node_id}") return jsonify(failed("更新元数据节点失败")) except Exception as e: logger.error(f"更新元数据失败: {str(e)}") return jsonify(failed(str(e)))