| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695 |
- import io
- import logging
- from flask import current_app, jsonify, request, send_file
- from minio import Minio
- from minio.error import S3Error
- from sqlalchemy import or_
- from app import db
- from app.api.meta_data import bp
- from app.core.meta_data import (
- check_redundancy_for_add,
- check_redundancy_for_update,
- convert_tag_ids_to_tags,
- get_file_content,
- get_formatted_time,
- handle_id_unstructured,
- handle_txt_graph,
- meta_impact_graph,
- meta_kinship_graph,
- meta_list,
- normalize_tag_inputs,
- parse_text,
- solve_unstructured_data,
- text_resource_solve,
- )
- from app.core.system.auth import require_auth
- from app.models.metadata_review import (
- MetadataReviewRecord,
- MetadataVersionHistory,
- update_review_record_resolution,
- )
- from app.models.result import failed, success
- from app.services.neo4j_driver import neo4j_driver
- logger = logging.getLogger("app")
- def get_minio_client():
- """获取 MinIO 客户端实例"""
- return Minio(
- current_app.config["MINIO_HOST"],
- access_key=current_app.config["MINIO_USER"],
- secret_key=current_app.config["MINIO_PASSWORD"],
- secure=current_app.config["MINIO_SECURE"],
- )
- def get_minio_config():
- """获取 MinIO 配置"""
- return {
- "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
- "PREFIX": current_app.config["PREFIX"],
- "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
- }
- def allowed_file(filename):
- """检查文件扩展名是否允许"""
- if "." not in filename:
- return False
- ext = filename.rsplit(".", 1)[1].lower()
- return ext in get_minio_config()["ALLOWED_EXTENSIONS"]
- # 元数据列表
- @bp.route("/node/list", methods=["POST"])
- def meta_node_list():
- try:
- payload = request.get_json() or {}
- if not isinstance(payload, dict):
- return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
- def to_int(value, default):
- try:
- return int(value)
- except (TypeError, ValueError):
- return default
- # 分页参数
- page = to_int(payload.get("current", 1), 1)
- page_size = to_int(payload.get("size", 10), 10)
- # 过滤参数
- name_en_filter = payload.get("name_en") or None
- name_zh_filter = payload.get("name_zh") or None
- category_filter = payload.get("category") or None
- time_filter = payload.get("time") or None
- logger.info(
- f"[node/list] 过滤参数: name_zh={name_zh_filter}, "
- f"name_en={name_en_filter}, category={category_filter}"
- )
- tag_filter = payload.get("tag")
- if tag_filter is not None and not isinstance(tag_filter, list):
- tag_filter = None
- # 调用核心业务逻辑
- result, total_count = meta_list(
- page,
- page_size,
- "",
- name_en_filter,
- name_zh_filter,
- category_filter,
- time_filter,
- tag_filter,
- )
- # 返回结果
- return jsonify(
- success(
- {
- "records": result,
- "total": total_count,
- "size": page_size,
- "current": page,
- }
- )
- )
- except Exception as e:
- logger.error(f"获取元数据列表失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 元数据图谱
- @bp.route("/node/graph", methods=["POST"])
- def meta_node_graph():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 从请求中获取节点ID
- node_id = request.json.get("nodeId")
- if node_id is None:
- return jsonify(failed("nodeId 不能为空"))
- try:
- node_id_int = int(node_id)
- except (TypeError, ValueError):
- return jsonify(failed("nodeId 必须为整数"))
- # 调用核心业务逻辑
- graph = meta_kinship_graph(node_id_int)
- is_dict = isinstance(graph, dict)
- nodes = graph.get("nodes", []) if is_dict else []
- relationships = graph.get("relationships", []) if is_dict else []
- # 当前节点属性
- node_info = next(
- (n for n in nodes if n.get("id") == node_id_int),
- {},
- )
- # 关联节点(包含属性,便于前端展示名称等)
- related_nodes = [n for n in nodes if n.get("id") != node_id_int]
- payload = {
- "node": node_info,
- "related_nodes": related_nodes,
- "relationships": relationships,
- }
- return jsonify(success(payload))
- except Exception as e:
- logger.error(f"获取元数据图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 删除元数据
- @bp.route("/node/delete", methods=["POST"])
- def meta_node_delete():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 从请求中获取节点ID
- node_id = request.json.get("id")
- # 删除节点逻辑
- with neo4j_driver.get_session() as session:
- cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n"
- session.run(cypher, node_id=int(node_id))
- # 返回结果
- return jsonify(success({}))
- except Exception as e:
- logger.error(f"删除元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 编辑元数据
- @bp.route("/node/edit", methods=["POST"])
- def meta_node_edit():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 从请求中获取节点ID
- node_id = request.json.get("id")
- if not node_id:
- return jsonify(failed("节点ID不能为空"))
- # 获取节点
- with neo4j_driver.get_session() as session:
- # 查询节点信息
- cypher = """
- MATCH (n:DataMeta)
- WHERE id(n) = $node_id
- RETURN n
- """
- result = session.run(cypher, node_id=int(node_id))
- node = result.single()
- if not node or not node["n"]:
- return jsonify(failed("节点不存在"))
- # 获取节点数据
- node_data = dict(node["n"])
- node_data["id"] = node["n"].id
- # 获取标签信息
- tag_cypher = """
- MATCH (n:DataMeta)-[:LABEL]->(t:DataLabel)
- WHERE id(n) = $node_id
- RETURN t
- """
- tag_result = session.run(tag_cypher, node_id=int(node_id))
- tags: list[dict] = []
- for record in tag_result:
- tag_node = record.get("t")
- if tag_node:
- tags.append(
- {
- "id": tag_node.id,
- "name_zh": tag_node.get("name_zh", ""),
- "name_en": tag_node.get("name_en", ""),
- }
- )
- # 获取主数据信息
- master_data_cypher = """
- MATCH (n:DataMeta)-[:master_data]->(m:master_data)
- WHERE id(n) = $node_id
- RETURN m
- """
- master_data_result = session.run(master_data_cypher, node_id=int(node_id))
- master_data = master_data_result.single()
- # 构建返回数据
- response_data = [
- {
- "master_data": (
- master_data["m"].id
- if master_data and master_data["m"]
- else None
- ),
- "name_zh": node_data.get("name_zh", ""),
- "name_en": node_data.get("name_en", ""),
- "create_time": node_data.get("create_time", ""),
- "update_time": node_data.get("update_time", ""),
- "status": bool(node_data.get("status", True)),
- "data_type": node_data.get("data_type", ""),
- "tag": tags,
- "affiliation": node_data.get("affiliation"),
- "category": node_data.get("category"),
- "alias": node_data.get("alias"),
- "describe": node_data.get("describe"),
- }
- ]
- logger.info(f"成功获取元数据节点: ID={node_data['id']}")
- return jsonify(success(response_data))
- except Exception as e:
- logger.error(f"获取元数据节点失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 增加元数据
- @bp.route("/check", methods=["GET"])
- def meta_check():
- """
- 检查元数据中文名是否已存在
- 请求参数:
- - name_zh: 元数据中文名(URL参数)
- 返回:
- - exists: true/false 表示是否存在
- """
- try:
- name_zh = request.args.get("name_zh")
- if not name_zh:
- return jsonify(failed("缺少name_zh参数"))
- # 查询数据库检查是否存在
- with neo4j_driver.get_session() as session:
- cypher = """
- MATCH (n:DataMeta {name_zh: $name_zh})
- RETURN count(n) > 0 as exists
- """
- result = session.run(cypher, name_zh=name_zh)
- record = result.single()
- if record:
- exists = record["exists"]
- logger.info(f"检查元数据 '{name_zh}': {'存在' if exists else '不存在'}")
- return jsonify(
- success({"exists": exists, "name_zh": name_zh}, "查询成功")
- )
- else:
- return jsonify(
- success({"exists": False, "name_zh": name_zh}, "查询成功")
- )
- except Exception as e:
- logger.error(f"检查元数据失败: {str(e)}")
- return jsonify(failed(f"检查失败: {str(e)}"))
- @bp.route("/node/add", methods=["POST"])
- def meta_node_add():
- """
- 新增元数据节点
- 在创建前会进行冗余检测:
- - 如果存在完全匹配的元数据,返回已存在的节点信息
- - 如果存在疑似重复的元数据,创建审核记录并返回提示
- - 如果无重复,正常创建新节点
- """
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 从请求中获取节点信息
- node_name_zh = request.json.get("name_zh")
- node_type = request.json.get("data_type")
- node_category = request.json.get("category")
- node_alias = request.json.get("alias")
- node_affiliation = request.json.get("affiliation")
- node_tag = request.json.get("tag")
- node_desc = request.json.get("describe")
- node_status = bool(request.json.get("status", True))
- node_name_en = request.json.get("name_en")
- # 是否强制创建(跳过冗余检测)
- force_create = bool(request.json.get("force_create", False))
- if not node_name_zh:
- return jsonify(failed("节点名称不能为空"))
- if not node_type:
- return jsonify(failed("节点类型不能为空"))
- # 统一处理标签ID
- tag_ids = normalize_tag_inputs(node_tag)
- # ========== 冗余检测 ==========
- has_suspicious_duplicates = False
- suspicious_candidates = []
- if not force_create:
- redundancy_result = check_redundancy_for_add(
- name_zh=node_name_zh,
- name_en=node_name_en or "",
- data_type=node_type,
- tag_ids=tag_ids,
- )
- # 存在完全匹配的元数据,直接返回,不做任何操作
- if redundancy_result["has_exact_match"]:
- exact_id = redundancy_result["exact_match_id"]
- logger.info(
- f"元数据已存在(完全匹配): name_zh={node_name_zh}, "
- f"existing_id={exact_id}"
- )
- return jsonify(
- failed(
- f"元数据已存在(完全匹配),无需重复创建。"
- f"已存在的元数据ID: {exact_id}"
- )
- )
- # 存在疑似重复的元数据,标记状态,稍后创建节点后再写入审核记录
- if redundancy_result["has_candidates"]:
- has_suspicious_duplicates = True
- suspicious_candidates = redundancy_result["candidates"]
- logger.info(
- f"发现疑似重复元数据: name_zh={node_name_zh}, "
- f"候选数量={len(suspicious_candidates)}"
- )
- # ========== 创建节点 ==========
- with neo4j_driver.get_session() as session:
- cypher = """
- MERGE (n:DataMeta {name_zh: $name_zh})
- ON CREATE SET n.name_en = $name_en,
- n.data_type = $data_type,
- n.category = $category,
- n.alias = $alias,
- n.affiliation = $affiliation,
- n.describe = $describe,
- n.create_time = $create_time,
- n.updateTime = $update_time,
- n.status = $status,
- n.name_en = $name_en
- ON MATCH SET n.data_type = $data_type,
- n.category = $category,
- n.alias = $alias,
- n.affiliation = $affiliation,
- n.describe = $describe,
- n.updateTime = $update_time,
- n.status = $status,
- n.name_en = $name_en
- RETURN n
- """
- create_time = update_time = get_formatted_time()
- result = session.run(
- cypher,
- name_zh=node_name_zh,
- data_type=node_type,
- category=node_category,
- alias=node_alias,
- affiliation=node_affiliation,
- describe=node_desc,
- create_time=create_time,
- update_time=update_time,
- status=node_status,
- name_en=node_name_en,
- )
- node = result.single()
- if node and node["n"]:
- node_data = dict(node["n"])
- node_data["id"] = node["n"].id
- # 如果提供了标签列表,创建标签关系
- tag_nodes = []
- if tag_ids:
- for tag_id in tag_ids:
- # 获取标签节点信息
- tag_fetch = session.run(
- "MATCH (t:DataLabel) WHERE id(t) = $tag_id RETURN t",
- tag_id=tag_id,
- ).single()
- if not tag_fetch or not tag_fetch.get("t"):
- logger.warning(f"未找到标签节点: {tag_id}")
- continue
- tag_node = tag_fetch["t"]
- tag_nodes.append(
- {
- "id": tag_node.id,
- "name_zh": tag_node.get("name_zh", ""),
- "name_en": tag_node.get("name_en", ""),
- }
- )
- tag_cypher = """
- MATCH (n:DataMeta), (t:DataLabel)
- WHERE id(n) = $node_id AND id(t) = $tag_id
- MERGE (n)-[r:LABEL]->(t)
- RETURN r
- """
- session.run(tag_cypher, node_id=node["n"].id, tag_id=tag_id)
- node_data["tag"] = tag_nodes
- logger.info(
- f"成功创建或更新元数据节点: "
- f"ID={node_data['id']}, name={node_name_zh}"
- )
- # ========== 处理疑似重复情况 ==========
- # 如果存在疑似重复,创建审核记录
- if has_suspicious_duplicates and suspicious_candidates:
- from app.core.meta_data.redundancy_check import (
- write_redundancy_review_record_with_new_id,
- )
- # 构建新元数据快照(包含新创建的节点ID)
- new_meta_snapshot = {
- "id": node_data["id"],
- "name_zh": node_name_zh,
- "name_en": node_name_en or "",
- "data_type": node_type,
- "tag_ids": tag_ids,
- }
- # 写入审核记录
- write_redundancy_review_record_with_new_id(
- new_meta=new_meta_snapshot,
- candidates=suspicious_candidates,
- source="api",
- )
- # 返回成功创建,但提示疑似重复
- candidate_names = [
- c.get("name_zh", "") for c in suspicious_candidates[:3]
- ]
- return jsonify(
- success(
- node_data,
- message=(
- f"元数据创建成功,但发现疑似重复元数据。"
- f"疑似重复: {', '.join(candidate_names)}。"
- f"已创建审核记录,请前往元数据审核页面进行处理。"
- ),
- )
- )
- return jsonify(success(node_data))
- else:
- logger.error(f"创建元数据节点失败: {node_name_zh}")
- return jsonify(failed("创建元数据节点失败"))
- except Exception as e:
- logger.error(f"添加元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 搜索元数据
- @bp.route("/search", methods=["GET"])
- def search_metadata_route():
- try:
- keyword = request.args.get("keyword", "")
- if not keyword:
- return jsonify(success([]))
- cypher = """
- MATCH (n:DataMeta)
- WHERE n.name_zh CONTAINS $keyword
- RETURN n LIMIT 100
- """
- with neo4j_driver.get_session() as session:
- result = session.run(cypher, keyword=keyword)
- metadata_list = [dict(record["n"]) for record in result]
- return jsonify(success(metadata_list))
- except Exception as e:
- logger.error(f"搜索元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 全文检索查询
- @bp.route("/full/text/query", methods=["POST"])
- def full_text_query():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 获取查询条件
- search_term = request.json.get("query", "")
- if not search_term:
- return jsonify(failed("查询条件不能为空"))
- # 执行Neo4j全文索引查询
- with neo4j_driver.get_session() as session:
- cypher = """
- CALL db.index.fulltext.queryNodes("DataMetaFulltext", $term)
- YIELD node, score
- RETURN node, score
- ORDER BY score DESC
- LIMIT 20
- """
- result = session.run(cypher, term=search_term)
- # 处理查询结果
- search_results = []
- for record in result:
- node_data = dict(record["node"])
- node_data["id"] = record["node"].id
- node_data["score"] = record["score"]
- search_results.append(node_data)
- return jsonify(success(search_results))
- except Exception as e:
- logger.error(f"全文检索查询失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 非结构化文本查询
- @bp.route("/unstructure/text/query", methods=["POST"])
- def unstructure_text_query():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 获取查询参数
- node_id = request.json.get("id")
- if not node_id:
- return jsonify(failed("节点ID不能为空"))
- # 获取节点信息
- node_data = handle_id_unstructured(node_id)
- if not node_data:
- return jsonify(failed("节点不存在"))
- # 获取对象路径
- object_name = node_data.get("url")
- if not object_name:
- return jsonify(failed("文档路径不存在"))
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
- bucket_name = config["MINIO_BUCKET"]
- # 从MinIO获取文件内容
- file_content = get_file_content(minio_client, bucket_name, object_name)
- # 解析文本内容
- parsed_data = parse_text(file_content)
- # 返回结果
- result = {
- "node": node_data,
- "parsed": parsed_data,
- "content": (
- file_content[:1000] + "..."
- if len(file_content) > 1000
- else file_content
- ),
- }
- return jsonify(success(result))
- except Exception as e:
- logger.error(f"非结构化文本查询失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 文件上传
- @bp.route("/resource/upload", methods=["POST"])
- def upload_file():
- try:
- # 检查请求中是否有文件
- if "file" not in request.files:
- return jsonify(failed("没有找到上传的文件"))
- file = request.files["file"]
- # 检查文件名
- if not file.filename:
- return jsonify(failed("未选择文件"))
- # 保存文件名到本地变量(确保类型安全)
- filename = file.filename
- # 检查文件类型
- if not allowed_file(filename):
- return jsonify(failed("不支持的文件类型"))
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
- # 上传到MinIO
- file_content = file.read()
- file_size = len(file_content)
- file_type = filename.rsplit(".", 1)[1].lower()
- # 提取文件名(不包含扩展名)
- filename_without_ext = filename.rsplit(".", 1)[0]
- # 生成紧凑的时间戳 (yyyyMMddHHmmss)
- import time
- timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
- # 生成唯一文件名
- object_name = (
- f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}"
- )
- # 上传文件
- minio_client.put_object(
- config["MINIO_BUCKET"],
- object_name,
- io.BytesIO(file_content),
- file_size,
- content_type=f"application/{file_type}",
- )
- # 返回结果
- return jsonify(
- success(
- {
- "filename": file.filename,
- "size": file_size,
- "type": file_type,
- "url": object_name,
- }
- )
- )
- except Exception as e:
- logger.error(f"文件上传失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 文件下载显示
- @bp.route("/resource/display", methods=["POST"])
- def upload_file_display():
- response = None
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- object_name = request.json.get("url")
- if not object_name:
- return jsonify(failed("文件路径不能为空"))
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
- # 获取文件内容
- response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
- file_data = response.read()
- # 获取文件名
- file_name = object_name.split("/")[-1]
- # 确定文件类型
- file_extension = file_name.split(".")[-1].lower()
- # 为不同文件类型设置合适的MIME类型
- mime_types = {
- "pdf": "application/pdf",
- "doc": "application/msword",
- "docx": (
- "application/vnd.openxmlformats-"
- "officedocument.wordprocessingml.document"
- ),
- "xls": "application/vnd.ms-excel",
- "xlsx": (
- "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
- ),
- "txt": "text/plain",
- "csv": "text/csv",
- }
- content_type = mime_types.get(file_extension, "application/octet-stream")
- # 返回结果
- return jsonify(
- success(
- {
- "filename": file_name,
- "type": file_extension,
- "contentType": content_type,
- "size": len(file_data),
- "url": f"/api/meta/resource/download?url={object_name}",
- }
- )
- )
- except S3Error as e:
- logger.error(f"MinIO操作失败: {str(e)}")
- return jsonify(failed(f"文件访问失败: {str(e)}"))
- except Exception as e:
- logger.error(f"文件显示信息获取失败: {str(e)}")
- return jsonify(failed(str(e)))
- finally:
- if response:
- response.close()
- response.release_conn()
- # 文件下载接口
- @bp.route("/resource/download", methods=["GET"])
- def download_file():
- response = None
- try:
- object_name = request.args.get("url")
- if not object_name:
- return jsonify(failed("文件路径不能为空"))
- # URL解码,处理特殊字符
- import urllib.parse
- object_name = urllib.parse.unquote(object_name)
- # 记录下载请求信息,便于调试
- logger.info(f"下载文件请求: {object_name}")
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
- # 获取文件
- try:
- response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
- file_data = response.read()
- except S3Error as e:
- logger.error(f"MinIO获取文件失败: {str(e)}")
- return jsonify(failed(f"文件获取失败: {str(e)}"))
- # 获取文件名,并处理特殊字符
- file_name = object_name.split("/")[-1]
- # 直接从内存返回文件,不创建临时文件
- file_stream = io.BytesIO(file_data)
- # 返回文件
- return send_file(
- file_stream,
- as_attachment=True,
- download_name=file_name,
- mimetype="application/octet-stream",
- )
- except Exception as e:
- logger.error(f"文件下载失败: {str(e)}")
- return jsonify(failed(str(e)))
- finally:
- if response:
- response.close()
- response.release_conn()
- # 文本资源翻译
- @bp.route("/resource/translate", methods=["POST"])
- def text_resource_translate():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 获取参数
- name_zh = request.json.get("name_zh", "")
- keyword = request.json.get("keyword", "")
- if not name_zh:
- return jsonify(failed("名称不能为空"))
- # 调用资源处理逻辑
- result = text_resource_solve(None, name_zh, keyword)
- return jsonify(success(result))
- except Exception as e:
- logger.error(f"文本资源翻译失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 创建文本资源节点
- @bp.route("/resource/node", methods=["POST"])
- def text_resource_node():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 获取参数
- name_zh = request.json.get("name_zh", "")
- name_en = request.json.get("name_en", "")
- keywords = request.json.get("keywords", [])
- keywords_en = request.json.get("keywords_en", [])
- object_name = request.json.get("url", "")
- if not name_zh or not name_en or not object_name:
- return jsonify(failed("参数不完整"))
- # 创建节点
- with neo4j_driver.get_session() as session:
- # 创建资源节点
- cypher = """
- CREATE (n:DataMeta {
- name_zh: $name_zh,
- name_en: $name_en,
- keywords: $keywords,
- keywords_en: $keywords_en,
- url: $object_name,
- create_time: $create_time,
- updateTime: $update_time
- })
- RETURN n
- """
- create_time = update_time = get_formatted_time()
- result = session.run(
- cypher,
- name_zh=name_zh,
- name_en=name_en,
- keywords=keywords,
- keywords_en=keywords_en,
- object_name=object_name,
- create_time=create_time,
- update_time=update_time,
- )
- record = result.single()
- if not record:
- return jsonify(failed("创建节点失败"))
- node = record["n"]
- # 为每个关键词创建标签节点并关联
- for i, keyword in enumerate(keywords):
- if keyword:
- # 创建标签节点
- tag_cypher = """
- MERGE (t:Tag {name_zh: $name_zh})
- ON CREATE SET t.name_en = $name_en,
- t.create_time = $create_time
- RETURN t
- """
- tag_result = session.run(
- tag_cypher,
- name_zh=keyword,
- name_en=keywords_en[i] if i < len(keywords_en) else "",
- create_time=create_time,
- )
- tag_record = tag_result.single()
- if not tag_record:
- continue
- tag_node = tag_record["t"]
- # 创建关系
- rel_cypher = """
- MATCH (n), (t)
- WHERE id(n) = $node_id AND id(t) = $tag_id
- CREATE (n)-[r:HAS_TAG]->(t)
- RETURN r
- """
- session.run(rel_cypher, node_id=node.id, tag_id=tag_node.id)
- # 返回创建的节点
- return jsonify(success(dict(node)))
- except Exception as e:
- logger.error(f"创建文本资源节点失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 处理非结构化数据
- @bp.route("/unstructured/process", methods=["POST"])
- def processing_unstructured_data():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 获取参数
- node_id = request.json.get("id")
- if not node_id:
- return jsonify(failed("节点ID不能为空"))
- # 获取 MinIO 配置
- minio_client = get_minio_client()
- config = get_minio_config()
- prefix = config["PREFIX"]
- # 调用处理逻辑
- result = solve_unstructured_data(node_id, minio_client, prefix)
- if result:
- return jsonify(success({"message": "处理成功"}))
- else:
- return jsonify(failed("处理失败"))
- except Exception as e:
- logger.error(f"处理非结构化数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- # 创建文本图谱
- @bp.route("/text/graph", methods=["POST"])
- def create_text_graph():
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 获取参数
- node_id = request.json.get("id")
- entity_zh = request.json.get("entity_zh")
- entity_en = request.json.get("entity_en")
- if not all([node_id, entity_zh, entity_en]):
- return jsonify(failed("参数不完整"))
- # 创建图谱
- result = handle_txt_graph(node_id, entity_zh, entity_en)
- if result:
- return jsonify(success({"message": "图谱创建成功"}))
- else:
- return jsonify(failed("图谱创建失败"))
- except Exception as e:
- logger.error(f"创建文本图谱失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route("/config", methods=["GET"])
- @require_auth
- def get_meta_config():
- """获取元数据配置信息"""
- config = get_minio_config()
- return jsonify(
- {
- "bucket_name": config["MINIO_BUCKET"],
- "prefix": config["PREFIX"],
- "allowed_extensions": list(config["ALLOWED_EXTENSIONS"]),
- }
- )
- # 更新元数据
- @bp.route("/node/update", methods=["POST"])
- def meta_node_update():
- """
- 更新元数据节点
- 在更新前会进行冗余检测(如果修改了 name_zh/name_en):
- - 如果更新后的名称与其他节点完全匹配,返回错误
- - 如果存在疑似重复的元数据,创建审核记录并返回提示
- - 如果无重复,正常更新节点
- """
- try:
- if not request.json:
- return jsonify(failed("请求数据不能为空"))
- # 从请求中获取节点ID和更新数据
- node_id = request.json.get("id")
- if not node_id:
- return jsonify(failed("节点ID不能为空"))
- # 验证并转换节点ID为整数
- try:
- node_id = int(node_id)
- except (ValueError, TypeError):
- return jsonify(failed(f"节点ID必须为整数,当前值: {node_id}"))
- # 是否强制更新(跳过冗余检测)
- force_update = bool(request.json.get("force_update", False))
- # 更新节点
- with neo4j_driver.get_session() as session:
- # 检查节点是否存在并获取当前值
- check_cypher = """
- MATCH (n:DataMeta)
- WHERE id(n) = $node_id
- RETURN n
- """
- result = session.run(check_cypher, node_id=node_id)
- node = result.single()
- if not node or not node["n"]:
- return jsonify(failed("节点不存在"))
- # 获取当前节点属性
- current_node = dict(node["n"])
- # 处理每个可能的更新字段
- fields_to_update = {
- "name_zh": request.json.get("name_zh"),
- "category": request.json.get("category"),
- "alias": request.json.get("alias"),
- "affiliation": request.json.get("affiliation"),
- "data_type": request.json.get("data_type"),
- "describe": request.json.get("describe"),
- "status": request.json.get("status"),
- "name_en": request.json.get("name_en"),
- }
- # 计算更新后的值(用于冗余检测)
- updated_name_zh = (
- fields_to_update["name_zh"]
- if fields_to_update["name_zh"] is not None
- else current_node.get("name_zh", "")
- )
- updated_name_en = (
- fields_to_update["name_en"]
- if fields_to_update["name_en"] is not None
- else current_node.get("name_en", "")
- )
- updated_data_type = (
- fields_to_update["data_type"]
- if fields_to_update["data_type"] is not None
- else current_node.get("data_type", "varchar(255)")
- )
- # 处理标签
- tag = request.json.get("tag")
- tag_ids = normalize_tag_inputs(tag) if tag is not None else []
- # ========== 冗余检测(仅当修改了 name_zh 或 name_en 时)==========
- name_changed = (
- fields_to_update["name_zh"] is not None
- and fields_to_update["name_zh"] != current_node.get("name_zh")
- ) or (
- fields_to_update["name_en"] is not None
- and fields_to_update["name_en"] != current_node.get("name_en")
- )
- if name_changed and not force_update:
- redundancy_result = check_redundancy_for_update(
- node_id=node_id,
- name_zh=updated_name_zh,
- name_en=updated_name_en,
- data_type=updated_data_type,
- tag_ids=tag_ids,
- )
- # 存在完全匹配的其他元数据
- if redundancy_result["has_exact_match"]:
- exact_id = redundancy_result["exact_match_id"]
- logger.warning(
- f"更新后元数据与其他节点完全匹配: "
- f"node_id={node_id}, existing_id={exact_id}"
- )
- return jsonify(
- failed(
- f"更新后的元数据与已有节点(ID={exact_id})完全相同,"
- f"请检查是否需要合并或修改名称。"
- )
- )
- # 存在疑似重复的元数据,已创建审核记录
- if redundancy_result["review_created"]:
- candidates = redundancy_result["candidates"]
- candidate_names = [c.get("name_zh", "") for c in candidates[:3]]
- logger.info(
- f"更新元数据发现疑似重复: node_id={node_id}, "
- f"candidates={candidate_names}"
- )
- return jsonify(
- failed(
- f"发现疑似重复元数据,已创建审核记录。"
- f"疑似重复: {', '.join(candidate_names)}。"
- f"请前往元数据审核页面处理,或使用 force_update=true 强制更新。"
- )
- )
- # ========== 执行更新 ==========
- # 构建更新语句,只更新提供的属性
- update_cypher = """
- MATCH (n:DataMeta)
- WHERE id(n) = $node_id
- SET n.updateTime = $update_time
- """
- # 准备更新参数
- update_params = {"node_id": node_id, "update_time": get_formatted_time()}
- # 只更新提供了新值的字段
- for field, new_value in fields_to_update.items():
- if new_value is not None:
- # 特殊处理 data_type 字段映射
- if field == "data_type":
- update_cypher += f", n.data_type = ${field}\n"
- else:
- update_cypher += f", n.{field} = ${field}\n"
- update_params[field] = new_value
- update_cypher += "RETURN n"
- result = session.run(
- update_cypher, # type: ignore[arg-type]
- **update_params,
- )
- updated_node = result.single()
- if updated_node and updated_node["n"]:
- node_data = dict(updated_node["n"])
- node_data["id"] = updated_node["n"].id
- # 如果更新了标签,处理标签关系(支持列表)
- if tag is not None:
- # 先删除现有标签关系
- delete_tag_cypher = """
- MATCH (n:DataMeta)-[r:LABEL]->(t:DataLabel)
- WHERE id(n) = $node_id
- DELETE r
- """
- session.run(delete_tag_cypher, node_id=node_id)
- for tag_id in tag_ids:
- create_tag_cypher = """
- MATCH (n:DataMeta), (t:DataLabel)
- WHERE id(n) = $node_id AND id(t) = $tag_id
- MERGE (n)-[r:LABEL]->(t)
- RETURN r
- """
- session.run(create_tag_cypher, node_id=node_id, tag_id=tag_id)
- logger.info(f"成功更新元数据节点: ID={node_data['id']}")
- return jsonify(success(node_data))
- else:
- logger.error(f"更新元数据节点失败: ID={node_id}")
- return jsonify(failed("更新元数据节点失败"))
- except Exception as e:
- logger.error(f"更新元数据失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route("/review/list", methods=["POST"])
- def metadata_review_list():
- """
- 审核记录列表:疑似冗余/变动
- Body:
- - current: 页码(默认1)
- - size: 每页数量(默认10)
- - record_type: redundancy|change(可选)
- - status: pending|resolved|ignored(可选)
- - business_domain_id: 业务领域ID(可选)
- - keyword: 关键字(可选,匹配 new_meta.name_zh/name_en)
- """
- try:
- payload = request.get_json() or {}
- if not isinstance(payload, dict):
- return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
- def to_int(value, default):
- try:
- return int(value)
- except (TypeError, ValueError):
- return default
- page = to_int(payload.get("current", 1), 1)
- page_size = to_int(payload.get("size", 10), 10)
- record_type = payload.get("record_type")
- status = payload.get("status")
- business_domain_id = payload.get("business_domain_id")
- keyword = (payload.get("keyword") or "").strip()
- query = MetadataReviewRecord.query
- if record_type:
- query = query.filter(MetadataReviewRecord.record_type == record_type)
- if status:
- query = query.filter(MetadataReviewRecord.status == status)
- if business_domain_id is not None and str(business_domain_id).strip() != "":
- bd_id_int = int(business_domain_id)
- query = query.filter(MetadataReviewRecord.business_domain_id == bd_id_int)
- if keyword:
- # 兼容:使用JSONB ->> 提取进行模糊匹配
- name_zh_col = MetadataReviewRecord.new_meta["name_zh"].astext
- name_en_col = MetadataReviewRecord.new_meta["name_en"].astext
- query = query.filter(
- or_(
- name_zh_col.contains(keyword),
- name_en_col.contains(keyword),
- )
- )
- total = query.count()
- records = (
- query.order_by(MetadataReviewRecord.created_at.desc())
- .offset((page - 1) * page_size)
- .limit(page_size)
- .all()
- )
- # 将 tag_ids 转换为 tags
- records_data = [convert_tag_ids_to_tags(r.to_dict()) for r in records]
- return jsonify(
- success(
- {
- "records": records_data,
- "total": total,
- "size": page_size,
- "current": page,
- }
- )
- )
- except Exception as e:
- logger.error(f"审核记录列表查询失败: {str(e)}")
- return jsonify(failed("审核记录列表查询失败", error=str(e)))
- @bp.route("/review/create", methods=["POST"])
- def metadata_review_create():
- """
- 创建元数据审核记录
- Body:
- - record_type: 审核记录类型(redundancy: 疑似重复 / change: 疑似变动 / merge: 合并请求)
- - source: 触发来源(默认 "manual")
- - meta1: 第一个元数据信息
- - id: 节点ID
- - name_zh: 中文名
- - name_en: 英文名
- - data_type: 数据类型
- - status: 状态
- - tag_ids: 标签ID列表(可选)
- - meta2: 第二个元数据信息
- - id: 节点ID
- - name_zh: 中文名
- - name_en: 英文名
- - data_type: 数据类型
- - status: 状态
- - tag_ids: 标签ID列表(可选)
- - diff_fields: 差异字段列表(可选,如 ["name_zh", "name_en"])
- - notes: 备注(可选)
- Returns:
- 创建成功的审核记录信息
- """
- try:
- payload = request.get_json() or {}
- if not isinstance(payload, dict):
- return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
- record_type = payload.get("record_type")
- source = payload.get("source", "manual")
- meta1 = payload.get("meta1")
- meta2 = payload.get("meta2")
- diff_fields = payload.get("diff_fields", [])
- notes = payload.get("notes")
- # 参数校验
- if not record_type:
- return jsonify(failed("record_type 不能为空"))
- if record_type not in ("redundancy", "change", "merge"):
- return jsonify(
- failed("record_type 必须是 redundancy、change 或 merge 之一")
- )
- if not meta1 or not isinstance(meta1, dict):
- return jsonify(failed("meta1 不能为空且必须是对象"))
- if not meta2 or not isinstance(meta2, dict):
- return jsonify(failed("meta2 不能为空且必须是对象"))
- if not isinstance(diff_fields, list):
- return jsonify(failed("diff_fields 必须是数组"))
- # 校验元数据必要字段
- required_fields = ["id", "name_zh", "name_en", "data_type", "status"]
- for field in required_fields:
- if field not in meta1:
- return jsonify(failed(f"meta1 缺少必要字段: {field}"))
- if field not in meta2:
- return jsonify(failed(f"meta2 缺少必要字段: {field}"))
- # 构建 new_meta(主元数据信息)
- new_meta = {
- "id": meta1.get("id"),
- "name_zh": meta1.get("name_zh"),
- "name_en": meta1.get("name_en"),
- "data_type": meta1.get("data_type"),
- "status": meta1.get("status"),
- "tag_ids": meta1.get("tag_ids", []),
- }
- # 构建 candidates(候选/对比元数据列表)
- # 格式: [{"snapshot": {...}, "diff_fields": [...], "candidate_meta_id": id}]
- candidates = [
- {
- "snapshot": {
- "id": meta2.get("id"),
- "name_zh": meta2.get("name_zh"),
- "name_en": meta2.get("name_en"),
- "data_type": meta2.get("data_type"),
- "status": meta2.get("status"),
- "tag_ids": meta2.get("tag_ids", []),
- },
- "diff_fields": diff_fields,
- "candidate_meta_id": meta2.get("id"),
- }
- ]
- # 创建审核记录
- review_record = MetadataReviewRecord()
- review_record.record_type = record_type
- review_record.source = source
- review_record.new_meta = new_meta
- review_record.candidates = candidates
- review_record.status = "pending"
- review_record.notes = notes
- db.session.add(review_record)
- db.session.commit()
- logger.info(
- f"创建审核记录成功: id={review_record.id}, "
- f"record_type={record_type}, "
- f"meta1_name={meta1.get('name_zh')}, "
- f"meta2_name={meta2.get('name_zh')}"
- )
- return jsonify(
- success(
- {
- "record": review_record.to_dict(),
- "message": "审核记录创建成功,请前往数据审核页面进行处理",
- }
- )
- )
- except Exception as e:
- logger.error(f"创建审核记录失败: {str(e)}")
- db.session.rollback()
- return jsonify(failed("创建审核记录失败", error=str(e)))
- @bp.route("/review/detail", methods=["GET"])
- def metadata_review_detail():
- """
- 审核记录详情
- Query:
- - id: 记录ID
- """
- try:
- record_id = request.args.get("id")
- if not record_id:
- return jsonify(failed("缺少id参数"))
- record = MetadataReviewRecord.query.get(int(record_id))
- if not record:
- return jsonify(failed("记录不存在"))
- # 将 tag_ids 转换为 tags
- data = convert_tag_ids_to_tags(record.to_dict())
- # change 场景:返回受影响元数据的影响关系图谱(若有 meta_id)
- impact_graph = None
- if record.record_type == "change":
- old_meta = record.old_meta or {}
- meta_id = old_meta.get("meta_id")
- if meta_id is not None and str(meta_id).strip() != "":
- try:
- impact_graph = meta_impact_graph(int(meta_id))
- except Exception as e:
- logger.warning(f"获取影响图谱失败: {e}")
- data["impact_graph"] = impact_graph
- return jsonify(success(data))
- except Exception as e:
- logger.error(f"审核记录详情查询失败: {str(e)}")
- return jsonify(failed("审核记录详情查询失败", error=str(e)))
- @bp.route("/review/resolve", methods=["POST"])
- def metadata_review_resolve():
- """
- 处理审核记录
- Body:
- - id: 记录ID
- - action: alias | create_new | accept_change | reject_change | ignore
- - payload: 动作参数(可选)
- - resolved_by: 处理人(可选)
- - notes: 备注(可选)
- action=alias:
- payload: { primary_meta_id: int, alias_meta_id: int }
- 行为:在 DataMeta 节点之间重建 ALIAS 关系
- - 创建 (alias_meta)-[:ALIAS]->(primary_meta) 关系
- - 将所有指向 alias_meta 的 ALIAS 关系转移到 primary_meta
- - primary_meta 已有的 ALIAS 关系保持不变
- - BusinessDomain 的 INCLUDES 关系不受影响
- action=create_new:
- payload: { new_name_zh: str }
- 行为:创建新的 DataMeta(中文名区分)并关联业务领域
- action=accept_change:
- payload: { meta_id?: int }
- 行为:把 new_meta 写回目标 DataMeta,并写入 metadata_version_history(PG)
- action=reject_change/ignore:
- 行为:仅更新审核记录状态
- """
- try:
- payload = request.get_json() or {}
- if not isinstance(payload, dict):
- return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
- record_id = payload.get("id")
- action = payload.get("action")
- action_payload = payload.get("payload") or {}
- resolved_by = payload.get("resolved_by")
- notes = payload.get("notes")
- if not record_id:
- return jsonify(failed("id 不能为空"))
- if not action:
- return jsonify(failed("action 不能为空"))
- record = MetadataReviewRecord.query.get(int(record_id))
- if not record:
- return jsonify(failed("记录不存在"))
- if record.status != "pending":
- return jsonify(failed("记录已处理,无法重复处理"))
- # 需要业务领域上下文的动作
- bd_id = record.business_domain_id
- new_meta = record.new_meta or {}
- if action == "alias":
- primary_meta_id = action_payload.get("primary_meta_id")
- alias_meta_id = action_payload.get("alias_meta_id")
- if not primary_meta_id:
- return jsonify(failed("payload.primary_meta_id 不能为空"))
- if not alias_meta_id:
- return jsonify(failed("payload.alias_meta_id 不能为空"))
- if int(primary_meta_id) == int(alias_meta_id):
- return jsonify(failed("primary_meta_id 和 alias_meta_id 不能相同"))
- # 写入 Neo4j:重建 DataMeta 节点间的 ALIAS 关系
- from app.services.neo4j_driver import neo4j_driver
- with neo4j_driver.get_session() as session:
- # Step 1: 将所有指向 alias_meta 的 ALIAS 关系转移到 primary_meta
- # 查找所有以 alias_meta 为目标的 ALIAS 关系,创建新关系指向 primary_meta,然后删除旧关系
- session.run(
- """
- MATCH (other:DataMeta)-[old_rel:ALIAS]->(alias_meta:DataMeta)
- WHERE id(alias_meta) = $alias_meta_id
- WITH other, old_rel
- MATCH (primary_meta:DataMeta)
- WHERE id(primary_meta) = $primary_meta_id
- MERGE (other)-[:ALIAS]->(primary_meta)
- DELETE old_rel
- """,
- {
- "alias_meta_id": int(alias_meta_id),
- "primary_meta_id": int(primary_meta_id),
- },
- )
- # Step 2: 创建 alias_meta 指向 primary_meta 的 ALIAS 关系
- session.run(
- """
- MATCH (alias_meta:DataMeta), (primary_meta:DataMeta)
- WHERE id(alias_meta) = $alias_meta_id AND id(primary_meta) = $primary_meta_id
- MERGE (alias_meta)-[:ALIAS]->(primary_meta)
- """,
- {
- "alias_meta_id": int(alias_meta_id),
- "primary_meta_id": int(primary_meta_id),
- },
- )
- update_review_record_resolution(
- record,
- action="alias",
- payload={
- "primary_meta_id": int(primary_meta_id),
- "alias_meta_id": int(alias_meta_id),
- },
- resolved_by=resolved_by,
- notes=notes,
- )
- db.session.commit()
- return jsonify(success(record.to_dict()))
- if action == "create_new":
- new_name_zh = (action_payload.get("new_name_zh") or "").strip()
- if not bd_id:
- return jsonify(
- failed("记录缺少 business_domain_id,无法执行 create_new")
- )
- if not new_name_zh:
- return jsonify(failed("payload.new_name_zh 不能为空"))
- from app.core.meta_data import get_formatted_time
- from app.services.neo4j_driver import neo4j_driver
- with neo4j_driver.get_session() as session:
- # 创建新 DataMeta(避免覆盖旧节点)
- result = session.run(
- """
- CREATE (m:DataMeta {
- name_zh: $name_zh,
- name_en: $name_en,
- data_type: $data_type,
- create_time: $create_time,
- status: true
- })
- RETURN m
- """,
- {
- "name_zh": new_name_zh,
- "name_en": (new_meta.get("name_en") or "").strip(),
- "data_type": (new_meta.get("data_type") or "varchar(255)"),
- "create_time": get_formatted_time(),
- },
- ).single()
- if not result or not result.get("m"):
- return jsonify(failed("创建新元数据失败"))
- new_meta_id = int(result["m"].id)
- session.run(
- """
- MATCH (n:BusinessDomain), (m:DataMeta)
- WHERE id(n) = $domain_id AND id(m) = $meta_id
- MERGE (n)-[:INCLUDES]->(m)
- """,
- {"domain_id": int(bd_id), "meta_id": new_meta_id},
- )
- update_review_record_resolution(
- record,
- action="create_new",
- payload={"new_name_zh": new_name_zh},
- resolved_by=resolved_by,
- notes=notes,
- )
- db.session.commit()
- return jsonify(success(record.to_dict()))
- if action == "accept_change":
- old_meta = record.old_meta or {}
- meta_id = action_payload.get("meta_id") or old_meta.get("meta_id")
- if not meta_id:
- return jsonify(failed("无法确定需要更新的 meta_id"))
- from app.core.meta_data import get_formatted_time
- from app.services.neo4j_driver import neo4j_driver
- before_snapshot = old_meta.get("snapshot") or {}
- after_snapshot = new_meta
- # 写入 Neo4j:更新 DataMeta 属性,并尝试同步标签集合
- with neo4j_driver.get_session() as session:
- name_zh_val = (
- after_snapshot.get("name_zh")
- or before_snapshot.get("name_zh")
- or ""
- ).strip()
- name_en_val = (after_snapshot.get("name_en") or "").strip()
- data_type_val = after_snapshot.get("data_type") or "varchar(255)"
- session.run(
- """
- MATCH (m:DataMeta)
- WHERE id(m) = $meta_id
- SET m.name_zh = $name_zh,
- m.name_en = $name_en,
- m.data_type = $data_type,
- m.updateTime = $update_time,
- m.status = true
- """,
- {
- "meta_id": int(meta_id),
- "name_zh": name_zh_val,
- "name_en": name_en_val,
- "data_type": data_type_val,
- "update_time": get_formatted_time(),
- },
- )
- tag_ids = after_snapshot.get("tag_ids") or []
- tag_ids = [int(t) for t in tag_ids if t is not None]
- if tag_ids:
- session.run(
- """
- MATCH (m:DataMeta)-[r:LABEL]->(:DataLabel)
- WHERE id(m) = $meta_id
- DELETE r
- """,
- {"meta_id": int(meta_id)},
- )
- session.run(
- """
- MATCH (m:DataMeta)
- WHERE id(m) = $meta_id
- WITH m
- UNWIND $tag_ids AS tid
- MATCH (t:DataLabel) WHERE id(t) = tid
- MERGE (m)-[:LABEL]->(t)
- """,
- {"meta_id": int(meta_id), "tag_ids": tag_ids},
- )
- # 写入版本历史(PG)
- history = MetadataVersionHistory()
- history.meta_id = int(meta_id) if meta_id is not None else 0
- history.change_source = "ddl"
- history.before_snapshot = (
- before_snapshot if before_snapshot is not None else {}
- )
- history.after_snapshot = (
- after_snapshot if after_snapshot is not None else {}
- )
- history.created_by = resolved_by if resolved_by is not None else ""
- db.session.add(history)
- update_review_record_resolution(
- record,
- action="accept_change",
- payload={"meta_id": int(meta_id)},
- resolved_by=resolved_by,
- notes=notes,
- )
- db.session.commit()
- return jsonify(success(record.to_dict()))
- if action in ("reject_change", "ignore"):
- update_review_record_resolution(
- record,
- action=action,
- payload=action_payload,
- resolved_by=resolved_by,
- notes=notes,
- )
- db.session.commit()
- return jsonify(success(record.to_dict()))
- return jsonify(failed(f"不支持的action: {action}"))
- except Exception as e:
- logger.error(f"处理审核记录失败: {str(e)}")
- db.session.rollback()
- return jsonify(failed("处理审核记录失败", error=str(e)))
|