import io import logging from flask import current_app, jsonify, request, send_file from minio import Minio from minio.error import S3Error from sqlalchemy import or_ from app import db from app.api.meta_data import bp from app.core.meta_data import ( check_redundancy_for_add, check_redundancy_for_update, get_file_content, get_formatted_time, handle_id_unstructured, handle_txt_graph, meta_impact_graph, meta_kinship_graph, meta_list, normalize_tag_inputs, parse_text, solve_unstructured_data, text_resource_solve, ) from app.core.system.auth import require_auth from app.models.metadata_review import ( MetadataReviewRecord, MetadataVersionHistory, update_review_record_resolution, ) from app.models.result import failed, success from app.services.neo4j_driver import neo4j_driver logger = logging.getLogger("app") def get_minio_client(): """获取 MinIO 客户端实例""" return Minio( current_app.config["MINIO_HOST"], access_key=current_app.config["MINIO_USER"], secret_key=current_app.config["MINIO_PASSWORD"], secure=current_app.config["MINIO_SECURE"], ) def get_minio_config(): """获取 MinIO 配置""" return { "MINIO_BUCKET": current_app.config["MINIO_BUCKET"], "PREFIX": current_app.config["PREFIX"], "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"], } def allowed_file(filename): """检查文件扩展名是否允许""" if "." not in filename: return False ext = filename.rsplit(".", 1)[1].lower() return ext in get_minio_config()["ALLOWED_EXTENSIONS"] # 元数据列表 @bp.route("/node/list", methods=["POST"]) def meta_node_list(): try: payload = request.get_json() or {} if not isinstance(payload, dict): return jsonify(failed("请求数据格式错误,应为 JSON 对象")) def to_int(value, default): try: return int(value) except (TypeError, ValueError): return default # 分页参数 page = to_int(payload.get("current", 1), 1) page_size = to_int(payload.get("size", 10), 10) # 过滤参数 name_en_filter = payload.get("name_en") or None name_zh_filter = payload.get("name_zh") or None category_filter = payload.get("category") or None time_filter = payload.get("time") or None logger.info( f"[node/list] 过滤参数: name_zh={name_zh_filter}, " f"name_en={name_en_filter}, category={category_filter}" ) tag_filter = payload.get("tag") if tag_filter is not None and not isinstance(tag_filter, list): tag_filter = None # 调用核心业务逻辑 result, total_count = meta_list( page, page_size, "", name_en_filter, name_zh_filter, category_filter, time_filter, tag_filter, ) # 返回结果 return jsonify( success( { "records": result, "total": total_count, "size": page_size, "current": page, } ) ) except Exception as e: logger.error(f"获取元数据列表失败: {str(e)}") return jsonify(failed(str(e))) # 元数据图谱 @bp.route("/node/graph", methods=["POST"]) def meta_node_graph(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 从请求中获取节点ID node_id = request.json.get("nodeId") if node_id is None: return jsonify(failed("nodeId 不能为空")) try: node_id_int = int(node_id) except (TypeError, ValueError): return jsonify(failed("nodeId 必须为整数")) # 调用核心业务逻辑 graph = meta_kinship_graph(node_id_int) is_dict = isinstance(graph, dict) nodes = graph.get("nodes", []) if is_dict else [] relationships = graph.get("relationships", []) if is_dict else [] # 当前节点属性 node_info = next( (n for n in nodes if n.get("id") == node_id_int), {}, ) # 关联节点(包含属性,便于前端展示名称等) related_nodes = [n for n in nodes if n.get("id") != node_id_int] payload = { "node": node_info, "related_nodes": related_nodes, "relationships": relationships, } return jsonify(success(payload)) except Exception as e: logger.error(f"获取元数据图谱失败: {str(e)}") return jsonify(failed(str(e))) # 删除元数据 @bp.route("/node/delete", methods=["POST"]) def meta_node_delete(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 从请求中获取节点ID node_id = request.json.get("id") # 删除节点逻辑 with neo4j_driver.get_session() as session: cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n" session.run(cypher, node_id=int(node_id)) # 返回结果 return jsonify(success({})) except Exception as e: logger.error(f"删除元数据失败: {str(e)}") return jsonify(failed(str(e))) # 编辑元数据 @bp.route("/node/edit", methods=["POST"]) def meta_node_edit(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 从请求中获取节点ID node_id = request.json.get("id") if not node_id: return jsonify(failed("节点ID不能为空")) # 获取节点 with neo4j_driver.get_session() as session: # 查询节点信息 cypher = """ MATCH (n:DataMeta) WHERE id(n) = $node_id RETURN n """ result = session.run(cypher, node_id=int(node_id)) node = result.single() if not node or not node["n"]: return jsonify(failed("节点不存在")) # 获取节点数据 node_data = dict(node["n"]) node_data["id"] = node["n"].id # 获取标签信息 tag_cypher = """ MATCH (n:DataMeta)-[:LABEL]->(t:DataLabel) WHERE id(n) = $node_id RETURN t """ tag_result = session.run(tag_cypher, node_id=int(node_id)) tags: list[dict] = [] for record in tag_result: tag_node = record.get("t") if tag_node: tags.append( { "id": tag_node.id, "name_zh": tag_node.get("name_zh", ""), "name_en": tag_node.get("name_en", ""), } ) # 获取主数据信息 master_data_cypher = """ MATCH (n:DataMeta)-[:master_data]->(m:master_data) WHERE id(n) = $node_id RETURN m """ master_data_result = session.run(master_data_cypher, node_id=int(node_id)) master_data = master_data_result.single() # 构建返回数据 response_data = [ { "master_data": ( master_data["m"].id if master_data and master_data["m"] else None ), "name_zh": node_data.get("name_zh", ""), "name_en": node_data.get("name_en", ""), "create_time": node_data.get("create_time", ""), "update_time": node_data.get("update_time", ""), "status": bool(node_data.get("status", True)), "data_type": node_data.get("data_type", ""), "tag": tags, "affiliation": node_data.get("affiliation"), "category": node_data.get("category"), "alias": node_data.get("alias"), "describe": node_data.get("describe"), } ] logger.info(f"成功获取元数据节点: ID={node_data['id']}") return jsonify(success(response_data)) except Exception as e: logger.error(f"获取元数据节点失败: {str(e)}") return jsonify(failed(str(e))) # 增加元数据 @bp.route("/check", methods=["GET"]) def meta_check(): """ 检查元数据中文名是否已存在 请求参数: - name_zh: 元数据中文名(URL参数) 返回: - exists: true/false 表示是否存在 """ try: name_zh = request.args.get("name_zh") if not name_zh: return jsonify(failed("缺少name_zh参数")) # 查询数据库检查是否存在 with neo4j_driver.get_session() as session: cypher = """ MATCH (n:DataMeta {name_zh: $name_zh}) RETURN count(n) > 0 as exists """ result = session.run(cypher, name_zh=name_zh) record = result.single() if record: exists = record["exists"] logger.info(f"检查元数据 '{name_zh}': {'存在' if exists else '不存在'}") return jsonify( success({"exists": exists, "name_zh": name_zh}, "查询成功") ) else: return jsonify( success({"exists": False, "name_zh": name_zh}, "查询成功") ) except Exception as e: logger.error(f"检查元数据失败: {str(e)}") return jsonify(failed(f"检查失败: {str(e)}")) @bp.route("/node/add", methods=["POST"]) def meta_node_add(): """ 新增元数据节点 在创建前会进行冗余检测: - 如果存在完全匹配的元数据,返回已存在的节点信息 - 如果存在疑似重复的元数据,创建审核记录并返回提示 - 如果无重复,正常创建新节点 """ try: if not request.json: return jsonify(failed("请求数据不能为空")) # 从请求中获取节点信息 node_name_zh = request.json.get("name_zh") node_type = request.json.get("data_type") node_category = request.json.get("category") node_alias = request.json.get("alias") node_affiliation = request.json.get("affiliation") node_tag = request.json.get("tag") node_desc = request.json.get("describe") node_status = bool(request.json.get("status", True)) node_name_en = request.json.get("name_en") # 是否强制创建(跳过冗余检测) force_create = bool(request.json.get("force_create", False)) if not node_name_zh: return jsonify(failed("节点名称不能为空")) if not node_type: return jsonify(failed("节点类型不能为空")) # 统一处理标签ID tag_ids = normalize_tag_inputs(node_tag) # ========== 冗余检测 ========== if not force_create: redundancy_result = check_redundancy_for_add( name_zh=node_name_zh, name_en=node_name_en or "", data_type=node_type, tag_ids=tag_ids, ) # 存在完全匹配的元数据 if redundancy_result["has_exact_match"]: exact_id = redundancy_result["exact_match_id"] logger.info( f"元数据已存在(完全匹配): name_zh={node_name_zh}, " f"existing_id={exact_id}" ) # 返回已存在的节点信息 with neo4j_driver.get_session() as session: existing = session.run( "MATCH (n:DataMeta) WHERE id(n) = $id RETURN n", {"id": exact_id}, ).single() if existing and existing["n"]: existing_data = dict(existing["n"]) existing_data["id"] = existing["n"].id return jsonify( success(existing_data, message="元数据已存在,返回已有节点") ) return jsonify(failed(f"元数据已存在(ID={exact_id}),请勿重复创建")) # 存在疑似重复的元数据,已创建审核记录 if redundancy_result["review_created"]: candidates = redundancy_result["candidates"] candidate_names = [c.get("name_zh", "") for c in candidates[:3]] logger.info( f"发现疑似重复元数据: name_zh={node_name_zh}, " f"candidates={candidate_names}" ) return jsonify( failed( f"发现疑似重复元数据,已创建审核记录。" f"疑似重复: {', '.join(candidate_names)}。" f"请前往元数据审核页面处理,或使用 force_create=true 强制创建。" ) ) # ========== 创建节点 ========== with neo4j_driver.get_session() as session: cypher = """ MERGE (n:DataMeta {name_zh: $name_zh}) ON CREATE SET n.name_en = $name_en, n.data_type = $data_type, n.category = $category, n.alias = $alias, n.affiliation = $affiliation, n.describe = $describe, n.create_time = $create_time, n.updateTime = $update_time, n.status = $status, n.name_en = $name_en ON MATCH SET n.data_type = $data_type, n.category = $category, n.alias = $alias, n.affiliation = $affiliation, n.describe = $describe, n.updateTime = $update_time, n.status = $status, n.name_en = $name_en RETURN n """ create_time = update_time = get_formatted_time() result = session.run( cypher, name_zh=node_name_zh, data_type=node_type, category=node_category, alias=node_alias, affiliation=node_affiliation, describe=node_desc, create_time=create_time, update_time=update_time, status=node_status, name_en=node_name_en, ) node = result.single() if node and node["n"]: node_data = dict(node["n"]) node_data["id"] = node["n"].id # 如果提供了标签列表,创建标签关系 tag_nodes = [] if tag_ids: for tag_id in tag_ids: # 获取标签节点信息 tag_fetch = session.run( "MATCH (t:DataLabel) WHERE id(t) = $tag_id RETURN t", tag_id=tag_id, ).single() if not tag_fetch or not tag_fetch.get("t"): logger.warning(f"未找到标签节点: {tag_id}") continue tag_node = tag_fetch["t"] tag_nodes.append( { "id": tag_node.id, "name_zh": tag_node.get("name_zh", ""), "name_en": tag_node.get("name_en", ""), } ) tag_cypher = """ MATCH (n:DataMeta), (t:DataLabel) WHERE id(n) = $node_id AND id(t) = $tag_id MERGE (n)-[r:LABEL]->(t) RETURN r """ session.run(tag_cypher, node_id=node["n"].id, tag_id=tag_id) node_data["tag"] = tag_nodes logger.info( f"成功创建或更新元数据节点: " f"ID={node_data['id']}, name={node_name_zh}" ) return jsonify(success(node_data)) else: logger.error(f"创建元数据节点失败: {node_name_zh}") return jsonify(failed("创建元数据节点失败")) except Exception as e: logger.error(f"添加元数据失败: {str(e)}") return jsonify(failed(str(e))) # 搜索元数据 @bp.route("/search", methods=["GET"]) def search_metadata_route(): try: keyword = request.args.get("keyword", "") if not keyword: return jsonify(success([])) cypher = """ MATCH (n:DataMeta) WHERE n.name_zh CONTAINS $keyword RETURN n LIMIT 100 """ with neo4j_driver.get_session() as session: result = session.run(cypher, keyword=keyword) metadata_list = [dict(record["n"]) for record in result] return jsonify(success(metadata_list)) except Exception as e: logger.error(f"搜索元数据失败: {str(e)}") return jsonify(failed(str(e))) # 全文检索查询 @bp.route("/full/text/query", methods=["POST"]) def full_text_query(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 获取查询条件 search_term = request.json.get("query", "") if not search_term: return jsonify(failed("查询条件不能为空")) # 执行Neo4j全文索引查询 with neo4j_driver.get_session() as session: cypher = """ CALL db.index.fulltext.queryNodes("DataMetaFulltext", $term) YIELD node, score RETURN node, score ORDER BY score DESC LIMIT 20 """ result = session.run(cypher, term=search_term) # 处理查询结果 search_results = [] for record in result: node_data = dict(record["node"]) node_data["id"] = record["node"].id node_data["score"] = record["score"] search_results.append(node_data) return jsonify(success(search_results)) except Exception as e: logger.error(f"全文检索查询失败: {str(e)}") return jsonify(failed(str(e))) # 非结构化文本查询 @bp.route("/unstructure/text/query", methods=["POST"]) def unstructure_text_query(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 获取查询参数 node_id = request.json.get("id") if not node_id: return jsonify(failed("节点ID不能为空")) # 获取节点信息 node_data = handle_id_unstructured(node_id) if not node_data: return jsonify(failed("节点不存在")) # 获取对象路径 object_name = node_data.get("url") if not object_name: return jsonify(failed("文档路径不存在")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() bucket_name = config["MINIO_BUCKET"] # 从MinIO获取文件内容 file_content = get_file_content(minio_client, bucket_name, object_name) # 解析文本内容 parsed_data = parse_text(file_content) # 返回结果 result = { "node": node_data, "parsed": parsed_data, "content": ( file_content[:1000] + "..." if len(file_content) > 1000 else file_content ), } return jsonify(success(result)) except Exception as e: logger.error(f"非结构化文本查询失败: {str(e)}") return jsonify(failed(str(e))) # 文件上传 @bp.route("/resource/upload", methods=["POST"]) def upload_file(): try: # 检查请求中是否有文件 if "file" not in request.files: return jsonify(failed("没有找到上传的文件")) file = request.files["file"] # 检查文件名 if not file.filename: return jsonify(failed("未选择文件")) # 保存文件名到本地变量(确保类型安全) filename = file.filename # 检查文件类型 if not allowed_file(filename): return jsonify(failed("不支持的文件类型")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 上传到MinIO file_content = file.read() file_size = len(file_content) file_type = filename.rsplit(".", 1)[1].lower() # 提取文件名(不包含扩展名) filename_without_ext = filename.rsplit(".", 1)[0] # 生成紧凑的时间戳 (yyyyMMddHHmmss) import time timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) # 生成唯一文件名 object_name = ( f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}" ) # 上传文件 minio_client.put_object( config["MINIO_BUCKET"], object_name, io.BytesIO(file_content), file_size, content_type=f"application/{file_type}", ) # 返回结果 return jsonify( success( { "filename": file.filename, "size": file_size, "type": file_type, "url": object_name, } ) ) except Exception as e: logger.error(f"文件上传失败: {str(e)}") return jsonify(failed(str(e))) # 文件下载显示 @bp.route("/resource/display", methods=["POST"]) def upload_file_display(): response = None try: if not request.json: return jsonify(failed("请求数据不能为空")) object_name = request.json.get("url") if not object_name: return jsonify(failed("文件路径不能为空")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 获取文件内容 response = minio_client.get_object(config["MINIO_BUCKET"], object_name) file_data = response.read() # 获取文件名 file_name = object_name.split("/")[-1] # 确定文件类型 file_extension = file_name.split(".")[-1].lower() # 为不同文件类型设置合适的MIME类型 mime_types = { "pdf": "application/pdf", "doc": "application/msword", "docx": ( "application/vnd.openxmlformats-" "officedocument.wordprocessingml.document" ), "xls": "application/vnd.ms-excel", "xlsx": ( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), "txt": "text/plain", "csv": "text/csv", } content_type = mime_types.get(file_extension, "application/octet-stream") # 返回结果 return jsonify( success( { "filename": file_name, "type": file_extension, "contentType": content_type, "size": len(file_data), "url": f"/api/meta/resource/download?url={object_name}", } ) ) except S3Error as e: logger.error(f"MinIO操作失败: {str(e)}") return jsonify(failed(f"文件访问失败: {str(e)}")) except Exception as e: logger.error(f"文件显示信息获取失败: {str(e)}") return jsonify(failed(str(e))) finally: if response: response.close() response.release_conn() # 文件下载接口 @bp.route("/resource/download", methods=["GET"]) def download_file(): response = None try: object_name = request.args.get("url") if not object_name: return jsonify(failed("文件路径不能为空")) # URL解码,处理特殊字符 import urllib.parse object_name = urllib.parse.unquote(object_name) # 记录下载请求信息,便于调试 logger.info(f"下载文件请求: {object_name}") # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 获取文件 try: response = minio_client.get_object(config["MINIO_BUCKET"], object_name) file_data = response.read() except S3Error as e: logger.error(f"MinIO获取文件失败: {str(e)}") return jsonify(failed(f"文件获取失败: {str(e)}")) # 获取文件名,并处理特殊字符 file_name = object_name.split("/")[-1] # 直接从内存返回文件,不创建临时文件 file_stream = io.BytesIO(file_data) # 返回文件 return send_file( file_stream, as_attachment=True, download_name=file_name, mimetype="application/octet-stream", ) except Exception as e: logger.error(f"文件下载失败: {str(e)}") return jsonify(failed(str(e))) finally: if response: response.close() response.release_conn() # 文本资源翻译 @bp.route("/resource/translate", methods=["POST"]) def text_resource_translate(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 获取参数 name_zh = request.json.get("name_zh", "") keyword = request.json.get("keyword", "") if not name_zh: return jsonify(failed("名称不能为空")) # 调用资源处理逻辑 result = text_resource_solve(None, name_zh, keyword) return jsonify(success(result)) except Exception as e: logger.error(f"文本资源翻译失败: {str(e)}") return jsonify(failed(str(e))) # 创建文本资源节点 @bp.route("/resource/node", methods=["POST"]) def text_resource_node(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 获取参数 name_zh = request.json.get("name_zh", "") name_en = request.json.get("name_en", "") keywords = request.json.get("keywords", []) keywords_en = request.json.get("keywords_en", []) object_name = request.json.get("url", "") if not name_zh or not name_en or not object_name: return jsonify(failed("参数不完整")) # 创建节点 with neo4j_driver.get_session() as session: # 创建资源节点 cypher = """ CREATE (n:DataMeta { name_zh: $name_zh, name_en: $name_en, keywords: $keywords, keywords_en: $keywords_en, url: $object_name, create_time: $create_time, updateTime: $update_time }) RETURN n """ create_time = update_time = get_formatted_time() result = session.run( cypher, name_zh=name_zh, name_en=name_en, keywords=keywords, keywords_en=keywords_en, object_name=object_name, create_time=create_time, update_time=update_time, ) record = result.single() if not record: return jsonify(failed("创建节点失败")) node = record["n"] # 为每个关键词创建标签节点并关联 for i, keyword in enumerate(keywords): if keyword: # 创建标签节点 tag_cypher = """ MERGE (t:Tag {name_zh: $name_zh}) ON CREATE SET t.name_en = $name_en, t.create_time = $create_time RETURN t """ tag_result = session.run( tag_cypher, name_zh=keyword, name_en=keywords_en[i] if i < len(keywords_en) else "", create_time=create_time, ) tag_record = tag_result.single() if not tag_record: continue tag_node = tag_record["t"] # 创建关系 rel_cypher = """ MATCH (n), (t) WHERE id(n) = $node_id AND id(t) = $tag_id CREATE (n)-[r:HAS_TAG]->(t) RETURN r """ session.run(rel_cypher, node_id=node.id, tag_id=tag_node.id) # 返回创建的节点 return jsonify(success(dict(node))) except Exception as e: logger.error(f"创建文本资源节点失败: {str(e)}") return jsonify(failed(str(e))) # 处理非结构化数据 @bp.route("/unstructured/process", methods=["POST"]) def processing_unstructured_data(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 获取参数 node_id = request.json.get("id") if not node_id: return jsonify(failed("节点ID不能为空")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() prefix = config["PREFIX"] # 调用处理逻辑 result = solve_unstructured_data(node_id, minio_client, prefix) if result: return jsonify(success({"message": "处理成功"})) else: return jsonify(failed("处理失败")) except Exception as e: logger.error(f"处理非结构化数据失败: {str(e)}") return jsonify(failed(str(e))) # 创建文本图谱 @bp.route("/text/graph", methods=["POST"]) def create_text_graph(): try: if not request.json: return jsonify(failed("请求数据不能为空")) # 获取参数 node_id = request.json.get("id") entity_zh = request.json.get("entity_zh") entity_en = request.json.get("entity_en") if not all([node_id, entity_zh, entity_en]): return jsonify(failed("参数不完整")) # 创建图谱 result = handle_txt_graph(node_id, entity_zh, entity_en) if result: return jsonify(success({"message": "图谱创建成功"})) else: return jsonify(failed("图谱创建失败")) except Exception as e: logger.error(f"创建文本图谱失败: {str(e)}") return jsonify(failed(str(e))) @bp.route("/config", methods=["GET"]) @require_auth def get_meta_config(): """获取元数据配置信息""" config = get_minio_config() return jsonify( { "bucket_name": config["MINIO_BUCKET"], "prefix": config["PREFIX"], "allowed_extensions": list(config["ALLOWED_EXTENSIONS"]), } ) # 更新元数据 @bp.route("/node/update", methods=["POST"]) def meta_node_update(): """ 更新元数据节点 在更新前会进行冗余检测(如果修改了 name_zh/name_en): - 如果更新后的名称与其他节点完全匹配,返回错误 - 如果存在疑似重复的元数据,创建审核记录并返回提示 - 如果无重复,正常更新节点 """ try: if not request.json: return jsonify(failed("请求数据不能为空")) # 从请求中获取节点ID和更新数据 node_id = request.json.get("id") if not node_id: return jsonify(failed("节点ID不能为空")) # 验证并转换节点ID为整数 try: node_id = int(node_id) except (ValueError, TypeError): return jsonify(failed(f"节点ID必须为整数,当前值: {node_id}")) # 是否强制更新(跳过冗余检测) force_update = bool(request.json.get("force_update", False)) # 更新节点 with neo4j_driver.get_session() as session: # 检查节点是否存在并获取当前值 check_cypher = """ MATCH (n:DataMeta) WHERE id(n) = $node_id RETURN n """ result = session.run(check_cypher, node_id=node_id) node = result.single() if not node or not node["n"]: return jsonify(failed("节点不存在")) # 获取当前节点属性 current_node = dict(node["n"]) # 处理每个可能的更新字段 fields_to_update = { "name_zh": request.json.get("name_zh"), "category": request.json.get("category"), "alias": request.json.get("alias"), "affiliation": request.json.get("affiliation"), "data_type": request.json.get("data_type"), "describe": request.json.get("describe"), "status": request.json.get("status"), "name_en": request.json.get("name_en"), } # 计算更新后的值(用于冗余检测) updated_name_zh = ( fields_to_update["name_zh"] if fields_to_update["name_zh"] is not None else current_node.get("name_zh", "") ) updated_name_en = ( fields_to_update["name_en"] if fields_to_update["name_en"] is not None else current_node.get("name_en", "") ) updated_data_type = ( fields_to_update["data_type"] if fields_to_update["data_type"] is not None else current_node.get("data_type", "varchar(255)") ) # 处理标签 tag = request.json.get("tag") tag_ids = normalize_tag_inputs(tag) if tag is not None else [] # ========== 冗余检测(仅当修改了 name_zh 或 name_en 时)========== name_changed = ( fields_to_update["name_zh"] is not None and fields_to_update["name_zh"] != current_node.get("name_zh") ) or ( fields_to_update["name_en"] is not None and fields_to_update["name_en"] != current_node.get("name_en") ) if name_changed and not force_update: redundancy_result = check_redundancy_for_update( node_id=node_id, name_zh=updated_name_zh, name_en=updated_name_en, data_type=updated_data_type, tag_ids=tag_ids, ) # 存在完全匹配的其他元数据 if redundancy_result["has_exact_match"]: exact_id = redundancy_result["exact_match_id"] logger.warning( f"更新后元数据与其他节点完全匹配: " f"node_id={node_id}, existing_id={exact_id}" ) return jsonify( failed( f"更新后的元数据与已有节点(ID={exact_id})完全相同," f"请检查是否需要合并或修改名称。" ) ) # 存在疑似重复的元数据,已创建审核记录 if redundancy_result["review_created"]: candidates = redundancy_result["candidates"] candidate_names = [c.get("name_zh", "") for c in candidates[:3]] logger.info( f"更新元数据发现疑似重复: node_id={node_id}, " f"candidates={candidate_names}" ) return jsonify( failed( f"发现疑似重复元数据,已创建审核记录。" f"疑似重复: {', '.join(candidate_names)}。" f"请前往元数据审核页面处理,或使用 force_update=true 强制更新。" ) ) # ========== 执行更新 ========== # 构建更新语句,只更新提供的属性 update_cypher = """ MATCH (n:DataMeta) WHERE id(n) = $node_id SET n.updateTime = $update_time """ # 准备更新参数 update_params = {"node_id": node_id, "update_time": get_formatted_time()} # 只更新提供了新值的字段 for field, new_value in fields_to_update.items(): if new_value is not None: # 特殊处理 data_type 字段映射 if field == "data_type": update_cypher += f", n.data_type = ${field}\n" else: update_cypher += f", n.{field} = ${field}\n" update_params[field] = new_value update_cypher += "RETURN n" result = session.run( update_cypher, # type: ignore[arg-type] **update_params, ) updated_node = result.single() if updated_node and updated_node["n"]: node_data = dict(updated_node["n"]) node_data["id"] = updated_node["n"].id # 如果更新了标签,处理标签关系(支持列表) if tag is not None: # 先删除现有标签关系 delete_tag_cypher = """ MATCH (n:DataMeta)-[r:LABEL]->(t:DataLabel) WHERE id(n) = $node_id DELETE r """ session.run(delete_tag_cypher, node_id=node_id) for tag_id in tag_ids: create_tag_cypher = """ MATCH (n:DataMeta), (t:DataLabel) WHERE id(n) = $node_id AND id(t) = $tag_id MERGE (n)-[r:LABEL]->(t) RETURN r """ session.run(create_tag_cypher, node_id=node_id, tag_id=tag_id) logger.info(f"成功更新元数据节点: ID={node_data['id']}") return jsonify(success(node_data)) else: logger.error(f"更新元数据节点失败: ID={node_id}") return jsonify(failed("更新元数据节点失败")) except Exception as e: logger.error(f"更新元数据失败: {str(e)}") return jsonify(failed(str(e))) @bp.route("/review/list", methods=["POST"]) def metadata_review_list(): """ 审核记录列表:疑似冗余/变动 Body: - current: 页码(默认1) - size: 每页数量(默认10) - record_type: redundancy|change(可选) - status: pending|resolved|ignored(可选) - business_domain_id: 业务领域ID(可选) - keyword: 关键字(可选,匹配 new_meta.name_zh/name_en) """ try: payload = request.get_json() or {} if not isinstance(payload, dict): return jsonify(failed("请求数据格式错误,应为 JSON 对象")) def to_int(value, default): try: return int(value) except (TypeError, ValueError): return default page = to_int(payload.get("current", 1), 1) page_size = to_int(payload.get("size", 10), 10) record_type = payload.get("record_type") status = payload.get("status") business_domain_id = payload.get("business_domain_id") keyword = (payload.get("keyword") or "").strip() query = MetadataReviewRecord.query if record_type: query = query.filter(MetadataReviewRecord.record_type == record_type) if status: query = query.filter(MetadataReviewRecord.status == status) if business_domain_id is not None and str(business_domain_id).strip() != "": bd_id_int = int(business_domain_id) query = query.filter(MetadataReviewRecord.business_domain_id == bd_id_int) if keyword: # 兼容:使用JSONB ->> 提取进行模糊匹配 name_zh_col = MetadataReviewRecord.new_meta["name_zh"].astext name_en_col = MetadataReviewRecord.new_meta["name_en"].astext query = query.filter( or_( name_zh_col.contains(keyword), name_en_col.contains(keyword), ) ) total = query.count() records = ( query.order_by(MetadataReviewRecord.created_at.desc()) .offset((page - 1) * page_size) .limit(page_size) .all() ) return jsonify( success( { "records": [r.to_dict() for r in records], "total": total, "size": page_size, "current": page, } ) ) except Exception as e: logger.error(f"审核记录列表查询失败: {str(e)}") return jsonify(failed("审核记录列表查询失败", error=str(e))) @bp.route("/review/detail", methods=["GET"]) def metadata_review_detail(): """ 审核记录详情 Query: - id: 记录ID """ try: record_id = request.args.get("id") if not record_id: return jsonify(failed("缺少id参数")) record = MetadataReviewRecord.query.get(int(record_id)) if not record: return jsonify(failed("记录不存在")) data = record.to_dict() # change 场景:返回受影响元数据的影响关系图谱(若有 meta_id) impact_graph = None if record.record_type == "change": old_meta = record.old_meta or {} meta_id = old_meta.get("meta_id") if meta_id is not None and str(meta_id).strip() != "": try: impact_graph = meta_impact_graph(int(meta_id)) except Exception as e: logger.warning(f"获取影响图谱失败: {e}") data["impact_graph"] = impact_graph return jsonify(success(data)) except Exception as e: logger.error(f"审核记录详情查询失败: {str(e)}") return jsonify(failed("审核记录详情查询失败", error=str(e))) @bp.route("/review/resolve", methods=["POST"]) def metadata_review_resolve(): """ 处理审核记录 Body: - id: 记录ID - action: alias | create_new | accept_change | reject_change | ignore - payload: 动作参数(可选) - resolved_by: 处理人(可选) - notes: 备注(可选) action=alias: payload: { candidate_meta_id: int } 行为:为业务领域建立 INCLUDES 到 candidate_meta_id,关系上写入 alias_name_zh/alias_name_en action=create_new: payload: { new_name_zh: str } 行为:创建新的 DataMeta(中文名区分)并关联业务领域 action=accept_change: payload: { meta_id?: int } 行为:把 new_meta 写回目标 DataMeta,并写入 metadata_version_history(PG) action=reject_change/ignore: 行为:仅更新审核记录状态 """ try: payload = request.get_json() or {} if not isinstance(payload, dict): return jsonify(failed("请求数据格式错误,应为 JSON 对象")) record_id = payload.get("id") action = payload.get("action") action_payload = payload.get("payload") or {} resolved_by = payload.get("resolved_by") notes = payload.get("notes") if not record_id: return jsonify(failed("id 不能为空")) if not action: return jsonify(failed("action 不能为空")) record = MetadataReviewRecord.query.get(int(record_id)) if not record: return jsonify(failed("记录不存在")) if record.status != "pending": return jsonify(failed("记录已处理,无法重复处理")) # 需要业务领域上下文的动作 bd_id = record.business_domain_id new_meta = record.new_meta or {} if action == "alias": candidate_meta_id = action_payload.get("candidate_meta_id") if not bd_id: return jsonify(failed("记录缺少 business_domain_id,无法执行 alias")) if not candidate_meta_id: return jsonify(failed("payload.candidate_meta_id 不能为空")) # 写入 Neo4j:建立 INCLUDES,并记录别名 from app.services.neo4j_driver import neo4j_driver alias_name_zh = (new_meta.get("name_zh") or "").strip() alias_name_en = (new_meta.get("name_en") or "").strip() with neo4j_driver.get_session() as session: session.run( """ MATCH (n:BusinessDomain), (m:DataMeta) WHERE id(n) = $domain_id AND id(m) = $meta_id MERGE (n)-[r:INCLUDES]->(m) SET r.alias_name_zh = $alias_name_zh, r.alias_name_en = $alias_name_en """, { "domain_id": int(bd_id), "meta_id": int(candidate_meta_id), "alias_name_zh": alias_name_zh, "alias_name_en": alias_name_en, }, ) update_review_record_resolution( record, action="alias", payload={"candidate_meta_id": int(candidate_meta_id)}, resolved_by=resolved_by, notes=notes, ) db.session.commit() return jsonify(success(record.to_dict())) if action == "create_new": new_name_zh = (action_payload.get("new_name_zh") or "").strip() if not bd_id: return jsonify( failed("记录缺少 business_domain_id,无法执行 create_new") ) if not new_name_zh: return jsonify(failed("payload.new_name_zh 不能为空")) from app.core.meta_data import get_formatted_time from app.services.neo4j_driver import neo4j_driver with neo4j_driver.get_session() as session: # 创建新 DataMeta(避免覆盖旧节点) result = session.run( """ CREATE (m:DataMeta { name_zh: $name_zh, name_en: $name_en, data_type: $data_type, create_time: $create_time, status: true }) RETURN m """, { "name_zh": new_name_zh, "name_en": (new_meta.get("name_en") or "").strip(), "data_type": (new_meta.get("data_type") or "varchar(255)"), "create_time": get_formatted_time(), }, ).single() if not result or not result.get("m"): return jsonify(failed("创建新元数据失败")) new_meta_id = int(result["m"].id) session.run( """ MATCH (n:BusinessDomain), (m:DataMeta) WHERE id(n) = $domain_id AND id(m) = $meta_id MERGE (n)-[:INCLUDES]->(m) """, {"domain_id": int(bd_id), "meta_id": new_meta_id}, ) update_review_record_resolution( record, action="create_new", payload={"new_name_zh": new_name_zh}, resolved_by=resolved_by, notes=notes, ) db.session.commit() return jsonify(success(record.to_dict())) if action == "accept_change": old_meta = record.old_meta or {} meta_id = action_payload.get("meta_id") or old_meta.get("meta_id") if not meta_id: return jsonify(failed("无法确定需要更新的 meta_id")) from app.core.meta_data import get_formatted_time from app.services.neo4j_driver import neo4j_driver before_snapshot = old_meta.get("snapshot") or {} after_snapshot = new_meta # 写入 Neo4j:更新 DataMeta 属性,并尝试同步标签集合 with neo4j_driver.get_session() as session: name_zh_val = ( after_snapshot.get("name_zh") or before_snapshot.get("name_zh") or "" ).strip() name_en_val = (after_snapshot.get("name_en") or "").strip() data_type_val = after_snapshot.get("data_type") or "varchar(255)" session.run( """ MATCH (m:DataMeta) WHERE id(m) = $meta_id SET m.name_zh = $name_zh, m.name_en = $name_en, m.data_type = $data_type, m.updateTime = $update_time, m.status = true """, { "meta_id": int(meta_id), "name_zh": name_zh_val, "name_en": name_en_val, "data_type": data_type_val, "update_time": get_formatted_time(), }, ) tag_ids = after_snapshot.get("tag_ids") or [] tag_ids = [int(t) for t in tag_ids if t is not None] if tag_ids: session.run( """ MATCH (m:DataMeta)-[r:LABEL]->(:DataLabel) WHERE id(m) = $meta_id DELETE r """, {"meta_id": int(meta_id)}, ) session.run( """ MATCH (m:DataMeta) WHERE id(m) = $meta_id WITH m UNWIND $tag_ids AS tid MATCH (t:DataLabel) WHERE id(t) = tid MERGE (m)-[:LABEL]->(t) """, {"meta_id": int(meta_id), "tag_ids": tag_ids}, ) # 写入版本历史(PG) history = MetadataVersionHistory() history.meta_id = int(meta_id) if meta_id is not None else 0 history.change_source = "ddl" history.before_snapshot = ( before_snapshot if before_snapshot is not None else {} ) history.after_snapshot = ( after_snapshot if after_snapshot is not None else {} ) history.created_by = resolved_by if resolved_by is not None else "" db.session.add(history) update_review_record_resolution( record, action="accept_change", payload={"meta_id": int(meta_id)}, resolved_by=resolved_by, notes=notes, ) db.session.commit() return jsonify(success(record.to_dict())) if action in ("reject_change", "ignore"): update_review_record_resolution( record, action=action, payload=action_payload, resolved_by=resolved_by, notes=notes, ) db.session.commit() return jsonify(success(record.to_dict())) return jsonify(failed(f"不支持的action: {action}")) except Exception as e: logger.error(f"处理审核记录失败: {str(e)}") db.session.rollback() return jsonify(failed("处理审核记录失败", error=str(e)))