routes.py 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695
  1. import io
  2. import logging
  3. from flask import current_app, jsonify, request, send_file
  4. from minio import Minio
  5. from minio.error import S3Error
  6. from sqlalchemy import or_
  7. from app import db
  8. from app.api.meta_data import bp
  9. from app.core.meta_data import (
  10. check_redundancy_for_add,
  11. check_redundancy_for_update,
  12. convert_tag_ids_to_tags,
  13. get_file_content,
  14. get_formatted_time,
  15. handle_id_unstructured,
  16. handle_txt_graph,
  17. meta_impact_graph,
  18. meta_kinship_graph,
  19. meta_list,
  20. normalize_tag_inputs,
  21. parse_text,
  22. solve_unstructured_data,
  23. text_resource_solve,
  24. )
  25. from app.core.system.auth import require_auth
  26. from app.models.metadata_review import (
  27. MetadataReviewRecord,
  28. MetadataVersionHistory,
  29. update_review_record_resolution,
  30. )
  31. from app.models.result import failed, success
  32. from app.services.neo4j_driver import neo4j_driver
  33. logger = logging.getLogger("app")
  34. def get_minio_client():
  35. """获取 MinIO 客户端实例"""
  36. return Minio(
  37. current_app.config["MINIO_HOST"],
  38. access_key=current_app.config["MINIO_USER"],
  39. secret_key=current_app.config["MINIO_PASSWORD"],
  40. secure=current_app.config["MINIO_SECURE"],
  41. )
  42. def get_minio_config():
  43. """获取 MinIO 配置"""
  44. return {
  45. "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
  46. "PREFIX": current_app.config["PREFIX"],
  47. "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
  48. }
  49. def allowed_file(filename):
  50. """检查文件扩展名是否允许"""
  51. if "." not in filename:
  52. return False
  53. ext = filename.rsplit(".", 1)[1].lower()
  54. return ext in get_minio_config()["ALLOWED_EXTENSIONS"]
  55. # 元数据列表
  56. @bp.route("/node/list", methods=["POST"])
  57. def meta_node_list():
  58. try:
  59. payload = request.get_json() or {}
  60. if not isinstance(payload, dict):
  61. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  62. def to_int(value, default):
  63. try:
  64. return int(value)
  65. except (TypeError, ValueError):
  66. return default
  67. # 分页参数
  68. page = to_int(payload.get("current", 1), 1)
  69. page_size = to_int(payload.get("size", 10), 10)
  70. # 过滤参数
  71. name_en_filter = payload.get("name_en") or None
  72. name_zh_filter = payload.get("name_zh") or None
  73. category_filter = payload.get("category") or None
  74. time_filter = payload.get("time") or None
  75. logger.info(
  76. f"[node/list] 过滤参数: name_zh={name_zh_filter}, "
  77. f"name_en={name_en_filter}, category={category_filter}"
  78. )
  79. tag_filter = payload.get("tag")
  80. if tag_filter is not None and not isinstance(tag_filter, list):
  81. tag_filter = None
  82. # 调用核心业务逻辑
  83. result, total_count = meta_list(
  84. page,
  85. page_size,
  86. "",
  87. name_en_filter,
  88. name_zh_filter,
  89. category_filter,
  90. time_filter,
  91. tag_filter,
  92. )
  93. # 返回结果
  94. return jsonify(
  95. success(
  96. {
  97. "records": result,
  98. "total": total_count,
  99. "size": page_size,
  100. "current": page,
  101. }
  102. )
  103. )
  104. except Exception as e:
  105. logger.error(f"获取元数据列表失败: {str(e)}")
  106. return jsonify(failed(str(e)))
  107. # 元数据图谱
  108. @bp.route("/node/graph", methods=["POST"])
  109. def meta_node_graph():
  110. try:
  111. if not request.json:
  112. return jsonify(failed("请求数据不能为空"))
  113. # 从请求中获取节点ID
  114. node_id = request.json.get("nodeId")
  115. if node_id is None:
  116. return jsonify(failed("nodeId 不能为空"))
  117. try:
  118. node_id_int = int(node_id)
  119. except (TypeError, ValueError):
  120. return jsonify(failed("nodeId 必须为整数"))
  121. # 调用核心业务逻辑
  122. graph = meta_kinship_graph(node_id_int)
  123. is_dict = isinstance(graph, dict)
  124. nodes = graph.get("nodes", []) if is_dict else []
  125. relationships = graph.get("relationships", []) if is_dict else []
  126. # 当前节点属性
  127. node_info = next(
  128. (n for n in nodes if n.get("id") == node_id_int),
  129. {},
  130. )
  131. # 关联节点(包含属性,便于前端展示名称等)
  132. related_nodes = [n for n in nodes if n.get("id") != node_id_int]
  133. payload = {
  134. "node": node_info,
  135. "related_nodes": related_nodes,
  136. "relationships": relationships,
  137. }
  138. return jsonify(success(payload))
  139. except Exception as e:
  140. logger.error(f"获取元数据图谱失败: {str(e)}")
  141. return jsonify(failed(str(e)))
  142. # 删除元数据
  143. @bp.route("/node/delete", methods=["POST"])
  144. def meta_node_delete():
  145. try:
  146. if not request.json:
  147. return jsonify(failed("请求数据不能为空"))
  148. # 从请求中获取节点ID
  149. node_id = request.json.get("id")
  150. # 删除节点逻辑
  151. with neo4j_driver.get_session() as session:
  152. cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n"
  153. session.run(cypher, node_id=int(node_id))
  154. # 返回结果
  155. return jsonify(success({}))
  156. except Exception as e:
  157. logger.error(f"删除元数据失败: {str(e)}")
  158. return jsonify(failed(str(e)))
  159. # 编辑元数据
  160. @bp.route("/node/edit", methods=["POST"])
  161. def meta_node_edit():
  162. try:
  163. if not request.json:
  164. return jsonify(failed("请求数据不能为空"))
  165. # 从请求中获取节点ID
  166. node_id = request.json.get("id")
  167. if not node_id:
  168. return jsonify(failed("节点ID不能为空"))
  169. # 获取节点
  170. with neo4j_driver.get_session() as session:
  171. # 查询节点信息
  172. cypher = """
  173. MATCH (n:DataMeta)
  174. WHERE id(n) = $node_id
  175. RETURN n
  176. """
  177. result = session.run(cypher, node_id=int(node_id))
  178. node = result.single()
  179. if not node or not node["n"]:
  180. return jsonify(failed("节点不存在"))
  181. # 获取节点数据
  182. node_data = dict(node["n"])
  183. node_data["id"] = node["n"].id
  184. # 获取标签信息
  185. tag_cypher = """
  186. MATCH (n:DataMeta)-[:LABEL]->(t:DataLabel)
  187. WHERE id(n) = $node_id
  188. RETURN t
  189. """
  190. tag_result = session.run(tag_cypher, node_id=int(node_id))
  191. tags: list[dict] = []
  192. for record in tag_result:
  193. tag_node = record.get("t")
  194. if tag_node:
  195. tags.append(
  196. {
  197. "id": tag_node.id,
  198. "name_zh": tag_node.get("name_zh", ""),
  199. "name_en": tag_node.get("name_en", ""),
  200. }
  201. )
  202. # 获取主数据信息
  203. master_data_cypher = """
  204. MATCH (n:DataMeta)-[:master_data]->(m:master_data)
  205. WHERE id(n) = $node_id
  206. RETURN m
  207. """
  208. master_data_result = session.run(master_data_cypher, node_id=int(node_id))
  209. master_data = master_data_result.single()
  210. # 构建返回数据
  211. response_data = [
  212. {
  213. "master_data": (
  214. master_data["m"].id
  215. if master_data and master_data["m"]
  216. else None
  217. ),
  218. "name_zh": node_data.get("name_zh", ""),
  219. "name_en": node_data.get("name_en", ""),
  220. "create_time": node_data.get("create_time", ""),
  221. "update_time": node_data.get("update_time", ""),
  222. "status": bool(node_data.get("status", True)),
  223. "data_type": node_data.get("data_type", ""),
  224. "tag": tags,
  225. "affiliation": node_data.get("affiliation"),
  226. "category": node_data.get("category"),
  227. "alias": node_data.get("alias"),
  228. "describe": node_data.get("describe"),
  229. }
  230. ]
  231. logger.info(f"成功获取元数据节点: ID={node_data['id']}")
  232. return jsonify(success(response_data))
  233. except Exception as e:
  234. logger.error(f"获取元数据节点失败: {str(e)}")
  235. return jsonify(failed(str(e)))
  236. # 增加元数据
  237. @bp.route("/check", methods=["GET"])
  238. def meta_check():
  239. """
  240. 检查元数据中文名是否已存在
  241. 请求参数:
  242. - name_zh: 元数据中文名(URL参数)
  243. 返回:
  244. - exists: true/false 表示是否存在
  245. """
  246. try:
  247. name_zh = request.args.get("name_zh")
  248. if not name_zh:
  249. return jsonify(failed("缺少name_zh参数"))
  250. # 查询数据库检查是否存在
  251. with neo4j_driver.get_session() as session:
  252. cypher = """
  253. MATCH (n:DataMeta {name_zh: $name_zh})
  254. RETURN count(n) > 0 as exists
  255. """
  256. result = session.run(cypher, name_zh=name_zh)
  257. record = result.single()
  258. if record:
  259. exists = record["exists"]
  260. logger.info(f"检查元数据 '{name_zh}': {'存在' if exists else '不存在'}")
  261. return jsonify(
  262. success({"exists": exists, "name_zh": name_zh}, "查询成功")
  263. )
  264. else:
  265. return jsonify(
  266. success({"exists": False, "name_zh": name_zh}, "查询成功")
  267. )
  268. except Exception as e:
  269. logger.error(f"检查元数据失败: {str(e)}")
  270. return jsonify(failed(f"检查失败: {str(e)}"))
  271. @bp.route("/node/add", methods=["POST"])
  272. def meta_node_add():
  273. """
  274. 新增元数据节点
  275. 在创建前会进行冗余检测:
  276. - 如果存在完全匹配的元数据,返回已存在的节点信息
  277. - 如果存在疑似重复的元数据,创建审核记录并返回提示
  278. - 如果无重复,正常创建新节点
  279. """
  280. try:
  281. if not request.json:
  282. return jsonify(failed("请求数据不能为空"))
  283. # 从请求中获取节点信息
  284. node_name_zh = request.json.get("name_zh")
  285. node_type = request.json.get("data_type")
  286. node_category = request.json.get("category")
  287. node_alias = request.json.get("alias")
  288. node_affiliation = request.json.get("affiliation")
  289. node_tag = request.json.get("tag")
  290. node_desc = request.json.get("describe")
  291. node_status = bool(request.json.get("status", True))
  292. node_name_en = request.json.get("name_en")
  293. # 是否强制创建(跳过冗余检测)
  294. force_create = bool(request.json.get("force_create", False))
  295. if not node_name_zh:
  296. return jsonify(failed("节点名称不能为空"))
  297. if not node_type:
  298. return jsonify(failed("节点类型不能为空"))
  299. # 统一处理标签ID
  300. tag_ids = normalize_tag_inputs(node_tag)
  301. # ========== 冗余检测 ==========
  302. has_suspicious_duplicates = False
  303. suspicious_candidates = []
  304. if not force_create:
  305. redundancy_result = check_redundancy_for_add(
  306. name_zh=node_name_zh,
  307. name_en=node_name_en or "",
  308. data_type=node_type,
  309. tag_ids=tag_ids,
  310. )
  311. # 存在完全匹配的元数据,直接返回,不做任何操作
  312. if redundancy_result["has_exact_match"]:
  313. exact_id = redundancy_result["exact_match_id"]
  314. logger.info(
  315. f"元数据已存在(完全匹配): name_zh={node_name_zh}, "
  316. f"existing_id={exact_id}"
  317. )
  318. return jsonify(
  319. failed(
  320. f"元数据已存在(完全匹配),无需重复创建。"
  321. f"已存在的元数据ID: {exact_id}"
  322. )
  323. )
  324. # 存在疑似重复的元数据,标记状态,稍后创建节点后再写入审核记录
  325. if redundancy_result["has_candidates"]:
  326. has_suspicious_duplicates = True
  327. suspicious_candidates = redundancy_result["candidates"]
  328. logger.info(
  329. f"发现疑似重复元数据: name_zh={node_name_zh}, "
  330. f"候选数量={len(suspicious_candidates)}"
  331. )
  332. # ========== 创建节点 ==========
  333. with neo4j_driver.get_session() as session:
  334. cypher = """
  335. MERGE (n:DataMeta {name_zh: $name_zh})
  336. ON CREATE SET n.name_en = $name_en,
  337. n.data_type = $data_type,
  338. n.category = $category,
  339. n.alias = $alias,
  340. n.affiliation = $affiliation,
  341. n.describe = $describe,
  342. n.create_time = $create_time,
  343. n.updateTime = $update_time,
  344. n.status = $status,
  345. n.name_en = $name_en
  346. ON MATCH SET n.data_type = $data_type,
  347. n.category = $category,
  348. n.alias = $alias,
  349. n.affiliation = $affiliation,
  350. n.describe = $describe,
  351. n.updateTime = $update_time,
  352. n.status = $status,
  353. n.name_en = $name_en
  354. RETURN n
  355. """
  356. create_time = update_time = get_formatted_time()
  357. result = session.run(
  358. cypher,
  359. name_zh=node_name_zh,
  360. data_type=node_type,
  361. category=node_category,
  362. alias=node_alias,
  363. affiliation=node_affiliation,
  364. describe=node_desc,
  365. create_time=create_time,
  366. update_time=update_time,
  367. status=node_status,
  368. name_en=node_name_en,
  369. )
  370. node = result.single()
  371. if node and node["n"]:
  372. node_data = dict(node["n"])
  373. node_data["id"] = node["n"].id
  374. # 如果提供了标签列表,创建标签关系
  375. tag_nodes = []
  376. if tag_ids:
  377. for tag_id in tag_ids:
  378. # 获取标签节点信息
  379. tag_fetch = session.run(
  380. "MATCH (t:DataLabel) WHERE id(t) = $tag_id RETURN t",
  381. tag_id=tag_id,
  382. ).single()
  383. if not tag_fetch or not tag_fetch.get("t"):
  384. logger.warning(f"未找到标签节点: {tag_id}")
  385. continue
  386. tag_node = tag_fetch["t"]
  387. tag_nodes.append(
  388. {
  389. "id": tag_node.id,
  390. "name_zh": tag_node.get("name_zh", ""),
  391. "name_en": tag_node.get("name_en", ""),
  392. }
  393. )
  394. tag_cypher = """
  395. MATCH (n:DataMeta), (t:DataLabel)
  396. WHERE id(n) = $node_id AND id(t) = $tag_id
  397. MERGE (n)-[r:LABEL]->(t)
  398. RETURN r
  399. """
  400. session.run(tag_cypher, node_id=node["n"].id, tag_id=tag_id)
  401. node_data["tag"] = tag_nodes
  402. logger.info(
  403. f"成功创建或更新元数据节点: "
  404. f"ID={node_data['id']}, name={node_name_zh}"
  405. )
  406. # ========== 处理疑似重复情况 ==========
  407. # 如果存在疑似重复,创建审核记录
  408. if has_suspicious_duplicates and suspicious_candidates:
  409. from app.core.meta_data.redundancy_check import (
  410. write_redundancy_review_record_with_new_id,
  411. )
  412. # 构建新元数据快照(包含新创建的节点ID)
  413. new_meta_snapshot = {
  414. "id": node_data["id"],
  415. "name_zh": node_name_zh,
  416. "name_en": node_name_en or "",
  417. "data_type": node_type,
  418. "tag_ids": tag_ids,
  419. }
  420. # 写入审核记录
  421. write_redundancy_review_record_with_new_id(
  422. new_meta=new_meta_snapshot,
  423. candidates=suspicious_candidates,
  424. source="api",
  425. )
  426. # 返回成功创建,但提示疑似重复
  427. candidate_names = [
  428. c.get("name_zh", "") for c in suspicious_candidates[:3]
  429. ]
  430. return jsonify(
  431. success(
  432. node_data,
  433. message=(
  434. f"元数据创建成功,但发现疑似重复元数据。"
  435. f"疑似重复: {', '.join(candidate_names)}。"
  436. f"已创建审核记录,请前往元数据审核页面进行处理。"
  437. ),
  438. )
  439. )
  440. return jsonify(success(node_data))
  441. else:
  442. logger.error(f"创建元数据节点失败: {node_name_zh}")
  443. return jsonify(failed("创建元数据节点失败"))
  444. except Exception as e:
  445. logger.error(f"添加元数据失败: {str(e)}")
  446. return jsonify(failed(str(e)))
  447. # 搜索元数据
  448. @bp.route("/search", methods=["GET"])
  449. def search_metadata_route():
  450. try:
  451. keyword = request.args.get("keyword", "")
  452. if not keyword:
  453. return jsonify(success([]))
  454. cypher = """
  455. MATCH (n:DataMeta)
  456. WHERE n.name_zh CONTAINS $keyword
  457. RETURN n LIMIT 100
  458. """
  459. with neo4j_driver.get_session() as session:
  460. result = session.run(cypher, keyword=keyword)
  461. metadata_list = [dict(record["n"]) for record in result]
  462. return jsonify(success(metadata_list))
  463. except Exception as e:
  464. logger.error(f"搜索元数据失败: {str(e)}")
  465. return jsonify(failed(str(e)))
  466. # 全文检索查询
  467. @bp.route("/full/text/query", methods=["POST"])
  468. def full_text_query():
  469. try:
  470. if not request.json:
  471. return jsonify(failed("请求数据不能为空"))
  472. # 获取查询条件
  473. search_term = request.json.get("query", "")
  474. if not search_term:
  475. return jsonify(failed("查询条件不能为空"))
  476. # 执行Neo4j全文索引查询
  477. with neo4j_driver.get_session() as session:
  478. cypher = """
  479. CALL db.index.fulltext.queryNodes("DataMetaFulltext", $term)
  480. YIELD node, score
  481. RETURN node, score
  482. ORDER BY score DESC
  483. LIMIT 20
  484. """
  485. result = session.run(cypher, term=search_term)
  486. # 处理查询结果
  487. search_results = []
  488. for record in result:
  489. node_data = dict(record["node"])
  490. node_data["id"] = record["node"].id
  491. node_data["score"] = record["score"]
  492. search_results.append(node_data)
  493. return jsonify(success(search_results))
  494. except Exception as e:
  495. logger.error(f"全文检索查询失败: {str(e)}")
  496. return jsonify(failed(str(e)))
  497. # 非结构化文本查询
  498. @bp.route("/unstructure/text/query", methods=["POST"])
  499. def unstructure_text_query():
  500. try:
  501. if not request.json:
  502. return jsonify(failed("请求数据不能为空"))
  503. # 获取查询参数
  504. node_id = request.json.get("id")
  505. if not node_id:
  506. return jsonify(failed("节点ID不能为空"))
  507. # 获取节点信息
  508. node_data = handle_id_unstructured(node_id)
  509. if not node_data:
  510. return jsonify(failed("节点不存在"))
  511. # 获取对象路径
  512. object_name = node_data.get("url")
  513. if not object_name:
  514. return jsonify(failed("文档路径不存在"))
  515. # 获取 MinIO 配置
  516. minio_client = get_minio_client()
  517. config = get_minio_config()
  518. bucket_name = config["MINIO_BUCKET"]
  519. # 从MinIO获取文件内容
  520. file_content = get_file_content(minio_client, bucket_name, object_name)
  521. # 解析文本内容
  522. parsed_data = parse_text(file_content)
  523. # 返回结果
  524. result = {
  525. "node": node_data,
  526. "parsed": parsed_data,
  527. "content": (
  528. file_content[:1000] + "..."
  529. if len(file_content) > 1000
  530. else file_content
  531. ),
  532. }
  533. return jsonify(success(result))
  534. except Exception as e:
  535. logger.error(f"非结构化文本查询失败: {str(e)}")
  536. return jsonify(failed(str(e)))
  537. # 文件上传
  538. @bp.route("/resource/upload", methods=["POST"])
  539. def upload_file():
  540. try:
  541. # 检查请求中是否有文件
  542. if "file" not in request.files:
  543. return jsonify(failed("没有找到上传的文件"))
  544. file = request.files["file"]
  545. # 检查文件名
  546. if not file.filename:
  547. return jsonify(failed("未选择文件"))
  548. # 保存文件名到本地变量(确保类型安全)
  549. filename = file.filename
  550. # 检查文件类型
  551. if not allowed_file(filename):
  552. return jsonify(failed("不支持的文件类型"))
  553. # 获取 MinIO 配置
  554. minio_client = get_minio_client()
  555. config = get_minio_config()
  556. # 上传到MinIO
  557. file_content = file.read()
  558. file_size = len(file_content)
  559. file_type = filename.rsplit(".", 1)[1].lower()
  560. # 提取文件名(不包含扩展名)
  561. filename_without_ext = filename.rsplit(".", 1)[0]
  562. # 生成紧凑的时间戳 (yyyyMMddHHmmss)
  563. import time
  564. timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
  565. # 生成唯一文件名
  566. object_name = (
  567. f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}"
  568. )
  569. # 上传文件
  570. minio_client.put_object(
  571. config["MINIO_BUCKET"],
  572. object_name,
  573. io.BytesIO(file_content),
  574. file_size,
  575. content_type=f"application/{file_type}",
  576. )
  577. # 返回结果
  578. return jsonify(
  579. success(
  580. {
  581. "filename": file.filename,
  582. "size": file_size,
  583. "type": file_type,
  584. "url": object_name,
  585. }
  586. )
  587. )
  588. except Exception as e:
  589. logger.error(f"文件上传失败: {str(e)}")
  590. return jsonify(failed(str(e)))
  591. # 文件下载显示
  592. @bp.route("/resource/display", methods=["POST"])
  593. def upload_file_display():
  594. response = None
  595. try:
  596. if not request.json:
  597. return jsonify(failed("请求数据不能为空"))
  598. object_name = request.json.get("url")
  599. if not object_name:
  600. return jsonify(failed("文件路径不能为空"))
  601. # 获取 MinIO 配置
  602. minio_client = get_minio_client()
  603. config = get_minio_config()
  604. # 获取文件内容
  605. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  606. file_data = response.read()
  607. # 获取文件名
  608. file_name = object_name.split("/")[-1]
  609. # 确定文件类型
  610. file_extension = file_name.split(".")[-1].lower()
  611. # 为不同文件类型设置合适的MIME类型
  612. mime_types = {
  613. "pdf": "application/pdf",
  614. "doc": "application/msword",
  615. "docx": (
  616. "application/vnd.openxmlformats-"
  617. "officedocument.wordprocessingml.document"
  618. ),
  619. "xls": "application/vnd.ms-excel",
  620. "xlsx": (
  621. "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  622. ),
  623. "txt": "text/plain",
  624. "csv": "text/csv",
  625. }
  626. content_type = mime_types.get(file_extension, "application/octet-stream")
  627. # 返回结果
  628. return jsonify(
  629. success(
  630. {
  631. "filename": file_name,
  632. "type": file_extension,
  633. "contentType": content_type,
  634. "size": len(file_data),
  635. "url": f"/api/meta/resource/download?url={object_name}",
  636. }
  637. )
  638. )
  639. except S3Error as e:
  640. logger.error(f"MinIO操作失败: {str(e)}")
  641. return jsonify(failed(f"文件访问失败: {str(e)}"))
  642. except Exception as e:
  643. logger.error(f"文件显示信息获取失败: {str(e)}")
  644. return jsonify(failed(str(e)))
  645. finally:
  646. if response:
  647. response.close()
  648. response.release_conn()
  649. # 文件下载接口
  650. @bp.route("/resource/download", methods=["GET"])
  651. def download_file():
  652. response = None
  653. try:
  654. object_name = request.args.get("url")
  655. if not object_name:
  656. return jsonify(failed("文件路径不能为空"))
  657. # URL解码,处理特殊字符
  658. import urllib.parse
  659. object_name = urllib.parse.unquote(object_name)
  660. # 记录下载请求信息,便于调试
  661. logger.info(f"下载文件请求: {object_name}")
  662. # 获取 MinIO 配置
  663. minio_client = get_minio_client()
  664. config = get_minio_config()
  665. # 获取文件
  666. try:
  667. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  668. file_data = response.read()
  669. except S3Error as e:
  670. logger.error(f"MinIO获取文件失败: {str(e)}")
  671. return jsonify(failed(f"文件获取失败: {str(e)}"))
  672. # 获取文件名,并处理特殊字符
  673. file_name = object_name.split("/")[-1]
  674. # 直接从内存返回文件,不创建临时文件
  675. file_stream = io.BytesIO(file_data)
  676. # 返回文件
  677. return send_file(
  678. file_stream,
  679. as_attachment=True,
  680. download_name=file_name,
  681. mimetype="application/octet-stream",
  682. )
  683. except Exception as e:
  684. logger.error(f"文件下载失败: {str(e)}")
  685. return jsonify(failed(str(e)))
  686. finally:
  687. if response:
  688. response.close()
  689. response.release_conn()
  690. # 文本资源翻译
  691. @bp.route("/resource/translate", methods=["POST"])
  692. def text_resource_translate():
  693. try:
  694. if not request.json:
  695. return jsonify(failed("请求数据不能为空"))
  696. # 获取参数
  697. name_zh = request.json.get("name_zh", "")
  698. keyword = request.json.get("keyword", "")
  699. if not name_zh:
  700. return jsonify(failed("名称不能为空"))
  701. # 调用资源处理逻辑
  702. result = text_resource_solve(None, name_zh, keyword)
  703. return jsonify(success(result))
  704. except Exception as e:
  705. logger.error(f"文本资源翻译失败: {str(e)}")
  706. return jsonify(failed(str(e)))
  707. # 创建文本资源节点
  708. @bp.route("/resource/node", methods=["POST"])
  709. def text_resource_node():
  710. try:
  711. if not request.json:
  712. return jsonify(failed("请求数据不能为空"))
  713. # 获取参数
  714. name_zh = request.json.get("name_zh", "")
  715. name_en = request.json.get("name_en", "")
  716. keywords = request.json.get("keywords", [])
  717. keywords_en = request.json.get("keywords_en", [])
  718. object_name = request.json.get("url", "")
  719. if not name_zh or not name_en or not object_name:
  720. return jsonify(failed("参数不完整"))
  721. # 创建节点
  722. with neo4j_driver.get_session() as session:
  723. # 创建资源节点
  724. cypher = """
  725. CREATE (n:DataMeta {
  726. name_zh: $name_zh,
  727. name_en: $name_en,
  728. keywords: $keywords,
  729. keywords_en: $keywords_en,
  730. url: $object_name,
  731. create_time: $create_time,
  732. updateTime: $update_time
  733. })
  734. RETURN n
  735. """
  736. create_time = update_time = get_formatted_time()
  737. result = session.run(
  738. cypher,
  739. name_zh=name_zh,
  740. name_en=name_en,
  741. keywords=keywords,
  742. keywords_en=keywords_en,
  743. object_name=object_name,
  744. create_time=create_time,
  745. update_time=update_time,
  746. )
  747. record = result.single()
  748. if not record:
  749. return jsonify(failed("创建节点失败"))
  750. node = record["n"]
  751. # 为每个关键词创建标签节点并关联
  752. for i, keyword in enumerate(keywords):
  753. if keyword:
  754. # 创建标签节点
  755. tag_cypher = """
  756. MERGE (t:Tag {name_zh: $name_zh})
  757. ON CREATE SET t.name_en = $name_en,
  758. t.create_time = $create_time
  759. RETURN t
  760. """
  761. tag_result = session.run(
  762. tag_cypher,
  763. name_zh=keyword,
  764. name_en=keywords_en[i] if i < len(keywords_en) else "",
  765. create_time=create_time,
  766. )
  767. tag_record = tag_result.single()
  768. if not tag_record:
  769. continue
  770. tag_node = tag_record["t"]
  771. # 创建关系
  772. rel_cypher = """
  773. MATCH (n), (t)
  774. WHERE id(n) = $node_id AND id(t) = $tag_id
  775. CREATE (n)-[r:HAS_TAG]->(t)
  776. RETURN r
  777. """
  778. session.run(rel_cypher, node_id=node.id, tag_id=tag_node.id)
  779. # 返回创建的节点
  780. return jsonify(success(dict(node)))
  781. except Exception as e:
  782. logger.error(f"创建文本资源节点失败: {str(e)}")
  783. return jsonify(failed(str(e)))
  784. # 处理非结构化数据
  785. @bp.route("/unstructured/process", methods=["POST"])
  786. def processing_unstructured_data():
  787. try:
  788. if not request.json:
  789. return jsonify(failed("请求数据不能为空"))
  790. # 获取参数
  791. node_id = request.json.get("id")
  792. if not node_id:
  793. return jsonify(failed("节点ID不能为空"))
  794. # 获取 MinIO 配置
  795. minio_client = get_minio_client()
  796. config = get_minio_config()
  797. prefix = config["PREFIX"]
  798. # 调用处理逻辑
  799. result = solve_unstructured_data(node_id, minio_client, prefix)
  800. if result:
  801. return jsonify(success({"message": "处理成功"}))
  802. else:
  803. return jsonify(failed("处理失败"))
  804. except Exception as e:
  805. logger.error(f"处理非结构化数据失败: {str(e)}")
  806. return jsonify(failed(str(e)))
  807. # 创建文本图谱
  808. @bp.route("/text/graph", methods=["POST"])
  809. def create_text_graph():
  810. try:
  811. if not request.json:
  812. return jsonify(failed("请求数据不能为空"))
  813. # 获取参数
  814. node_id = request.json.get("id")
  815. entity_zh = request.json.get("entity_zh")
  816. entity_en = request.json.get("entity_en")
  817. if not all([node_id, entity_zh, entity_en]):
  818. return jsonify(failed("参数不完整"))
  819. # 创建图谱
  820. result = handle_txt_graph(node_id, entity_zh, entity_en)
  821. if result:
  822. return jsonify(success({"message": "图谱创建成功"}))
  823. else:
  824. return jsonify(failed("图谱创建失败"))
  825. except Exception as e:
  826. logger.error(f"创建文本图谱失败: {str(e)}")
  827. return jsonify(failed(str(e)))
  828. @bp.route("/config", methods=["GET"])
  829. @require_auth
  830. def get_meta_config():
  831. """获取元数据配置信息"""
  832. config = get_minio_config()
  833. return jsonify(
  834. {
  835. "bucket_name": config["MINIO_BUCKET"],
  836. "prefix": config["PREFIX"],
  837. "allowed_extensions": list(config["ALLOWED_EXTENSIONS"]),
  838. }
  839. )
  840. # 更新元数据
  841. @bp.route("/node/update", methods=["POST"])
  842. def meta_node_update():
  843. """
  844. 更新元数据节点
  845. 在更新前会进行冗余检测(如果修改了 name_zh/name_en):
  846. - 如果更新后的名称与其他节点完全匹配,返回错误
  847. - 如果存在疑似重复的元数据,创建审核记录并返回提示
  848. - 如果无重复,正常更新节点
  849. """
  850. try:
  851. if not request.json:
  852. return jsonify(failed("请求数据不能为空"))
  853. # 从请求中获取节点ID和更新数据
  854. node_id = request.json.get("id")
  855. if not node_id:
  856. return jsonify(failed("节点ID不能为空"))
  857. # 验证并转换节点ID为整数
  858. try:
  859. node_id = int(node_id)
  860. except (ValueError, TypeError):
  861. return jsonify(failed(f"节点ID必须为整数,当前值: {node_id}"))
  862. # 是否强制更新(跳过冗余检测)
  863. force_update = bool(request.json.get("force_update", False))
  864. # 更新节点
  865. with neo4j_driver.get_session() as session:
  866. # 检查节点是否存在并获取当前值
  867. check_cypher = """
  868. MATCH (n:DataMeta)
  869. WHERE id(n) = $node_id
  870. RETURN n
  871. """
  872. result = session.run(check_cypher, node_id=node_id)
  873. node = result.single()
  874. if not node or not node["n"]:
  875. return jsonify(failed("节点不存在"))
  876. # 获取当前节点属性
  877. current_node = dict(node["n"])
  878. # 处理每个可能的更新字段
  879. fields_to_update = {
  880. "name_zh": request.json.get("name_zh"),
  881. "category": request.json.get("category"),
  882. "alias": request.json.get("alias"),
  883. "affiliation": request.json.get("affiliation"),
  884. "data_type": request.json.get("data_type"),
  885. "describe": request.json.get("describe"),
  886. "status": request.json.get("status"),
  887. "name_en": request.json.get("name_en"),
  888. }
  889. # 计算更新后的值(用于冗余检测)
  890. updated_name_zh = (
  891. fields_to_update["name_zh"]
  892. if fields_to_update["name_zh"] is not None
  893. else current_node.get("name_zh", "")
  894. )
  895. updated_name_en = (
  896. fields_to_update["name_en"]
  897. if fields_to_update["name_en"] is not None
  898. else current_node.get("name_en", "")
  899. )
  900. updated_data_type = (
  901. fields_to_update["data_type"]
  902. if fields_to_update["data_type"] is not None
  903. else current_node.get("data_type", "varchar(255)")
  904. )
  905. # 处理标签
  906. tag = request.json.get("tag")
  907. tag_ids = normalize_tag_inputs(tag) if tag is not None else []
  908. # ========== 冗余检测(仅当修改了 name_zh 或 name_en 时)==========
  909. name_changed = (
  910. fields_to_update["name_zh"] is not None
  911. and fields_to_update["name_zh"] != current_node.get("name_zh")
  912. ) or (
  913. fields_to_update["name_en"] is not None
  914. and fields_to_update["name_en"] != current_node.get("name_en")
  915. )
  916. if name_changed and not force_update:
  917. redundancy_result = check_redundancy_for_update(
  918. node_id=node_id,
  919. name_zh=updated_name_zh,
  920. name_en=updated_name_en,
  921. data_type=updated_data_type,
  922. tag_ids=tag_ids,
  923. )
  924. # 存在完全匹配的其他元数据
  925. if redundancy_result["has_exact_match"]:
  926. exact_id = redundancy_result["exact_match_id"]
  927. logger.warning(
  928. f"更新后元数据与其他节点完全匹配: "
  929. f"node_id={node_id}, existing_id={exact_id}"
  930. )
  931. return jsonify(
  932. failed(
  933. f"更新后的元数据与已有节点(ID={exact_id})完全相同,"
  934. f"请检查是否需要合并或修改名称。"
  935. )
  936. )
  937. # 存在疑似重复的元数据,已创建审核记录
  938. if redundancy_result["review_created"]:
  939. candidates = redundancy_result["candidates"]
  940. candidate_names = [c.get("name_zh", "") for c in candidates[:3]]
  941. logger.info(
  942. f"更新元数据发现疑似重复: node_id={node_id}, "
  943. f"candidates={candidate_names}"
  944. )
  945. return jsonify(
  946. failed(
  947. f"发现疑似重复元数据,已创建审核记录。"
  948. f"疑似重复: {', '.join(candidate_names)}。"
  949. f"请前往元数据审核页面处理,或使用 force_update=true 强制更新。"
  950. )
  951. )
  952. # ========== 执行更新 ==========
  953. # 构建更新语句,只更新提供的属性
  954. update_cypher = """
  955. MATCH (n:DataMeta)
  956. WHERE id(n) = $node_id
  957. SET n.updateTime = $update_time
  958. """
  959. # 准备更新参数
  960. update_params = {"node_id": node_id, "update_time": get_formatted_time()}
  961. # 只更新提供了新值的字段
  962. for field, new_value in fields_to_update.items():
  963. if new_value is not None:
  964. # 特殊处理 data_type 字段映射
  965. if field == "data_type":
  966. update_cypher += f", n.data_type = ${field}\n"
  967. else:
  968. update_cypher += f", n.{field} = ${field}\n"
  969. update_params[field] = new_value
  970. update_cypher += "RETURN n"
  971. result = session.run(
  972. update_cypher, # type: ignore[arg-type]
  973. **update_params,
  974. )
  975. updated_node = result.single()
  976. if updated_node and updated_node["n"]:
  977. node_data = dict(updated_node["n"])
  978. node_data["id"] = updated_node["n"].id
  979. # 如果更新了标签,处理标签关系(支持列表)
  980. if tag is not None:
  981. # 先删除现有标签关系
  982. delete_tag_cypher = """
  983. MATCH (n:DataMeta)-[r:LABEL]->(t:DataLabel)
  984. WHERE id(n) = $node_id
  985. DELETE r
  986. """
  987. session.run(delete_tag_cypher, node_id=node_id)
  988. for tag_id in tag_ids:
  989. create_tag_cypher = """
  990. MATCH (n:DataMeta), (t:DataLabel)
  991. WHERE id(n) = $node_id AND id(t) = $tag_id
  992. MERGE (n)-[r:LABEL]->(t)
  993. RETURN r
  994. """
  995. session.run(create_tag_cypher, node_id=node_id, tag_id=tag_id)
  996. logger.info(f"成功更新元数据节点: ID={node_data['id']}")
  997. return jsonify(success(node_data))
  998. else:
  999. logger.error(f"更新元数据节点失败: ID={node_id}")
  1000. return jsonify(failed("更新元数据节点失败"))
  1001. except Exception as e:
  1002. logger.error(f"更新元数据失败: {str(e)}")
  1003. return jsonify(failed(str(e)))
  1004. @bp.route("/review/list", methods=["POST"])
  1005. def metadata_review_list():
  1006. """
  1007. 审核记录列表:疑似冗余/变动
  1008. Body:
  1009. - current: 页码(默认1)
  1010. - size: 每页数量(默认10)
  1011. - record_type: redundancy|change(可选)
  1012. - status: pending|resolved|ignored(可选)
  1013. - business_domain_id: 业务领域ID(可选)
  1014. - keyword: 关键字(可选,匹配 new_meta.name_zh/name_en)
  1015. """
  1016. try:
  1017. payload = request.get_json() or {}
  1018. if not isinstance(payload, dict):
  1019. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  1020. def to_int(value, default):
  1021. try:
  1022. return int(value)
  1023. except (TypeError, ValueError):
  1024. return default
  1025. page = to_int(payload.get("current", 1), 1)
  1026. page_size = to_int(payload.get("size", 10), 10)
  1027. record_type = payload.get("record_type")
  1028. status = payload.get("status")
  1029. business_domain_id = payload.get("business_domain_id")
  1030. keyword = (payload.get("keyword") or "").strip()
  1031. query = MetadataReviewRecord.query
  1032. if record_type:
  1033. query = query.filter(MetadataReviewRecord.record_type == record_type)
  1034. if status:
  1035. query = query.filter(MetadataReviewRecord.status == status)
  1036. if business_domain_id is not None and str(business_domain_id).strip() != "":
  1037. bd_id_int = int(business_domain_id)
  1038. query = query.filter(MetadataReviewRecord.business_domain_id == bd_id_int)
  1039. if keyword:
  1040. # 兼容:使用JSONB ->> 提取进行模糊匹配
  1041. name_zh_col = MetadataReviewRecord.new_meta["name_zh"].astext
  1042. name_en_col = MetadataReviewRecord.new_meta["name_en"].astext
  1043. query = query.filter(
  1044. or_(
  1045. name_zh_col.contains(keyword),
  1046. name_en_col.contains(keyword),
  1047. )
  1048. )
  1049. total = query.count()
  1050. records = (
  1051. query.order_by(MetadataReviewRecord.created_at.desc())
  1052. .offset((page - 1) * page_size)
  1053. .limit(page_size)
  1054. .all()
  1055. )
  1056. # 将 tag_ids 转换为 tags
  1057. records_data = [convert_tag_ids_to_tags(r.to_dict()) for r in records]
  1058. return jsonify(
  1059. success(
  1060. {
  1061. "records": records_data,
  1062. "total": total,
  1063. "size": page_size,
  1064. "current": page,
  1065. }
  1066. )
  1067. )
  1068. except Exception as e:
  1069. logger.error(f"审核记录列表查询失败: {str(e)}")
  1070. return jsonify(failed("审核记录列表查询失败", error=str(e)))
  1071. @bp.route("/review/create", methods=["POST"])
  1072. def metadata_review_create():
  1073. """
  1074. 创建元数据审核记录
  1075. Body:
  1076. - record_type: 审核记录类型(redundancy: 疑似重复 / change: 疑似变动 / merge: 合并请求)
  1077. - source: 触发来源(默认 "manual")
  1078. - meta1: 第一个元数据信息
  1079. - id: 节点ID
  1080. - name_zh: 中文名
  1081. - name_en: 英文名
  1082. - data_type: 数据类型
  1083. - status: 状态
  1084. - tag_ids: 标签ID列表(可选)
  1085. - meta2: 第二个元数据信息
  1086. - id: 节点ID
  1087. - name_zh: 中文名
  1088. - name_en: 英文名
  1089. - data_type: 数据类型
  1090. - status: 状态
  1091. - tag_ids: 标签ID列表(可选)
  1092. - diff_fields: 差异字段列表(可选,如 ["name_zh", "name_en"])
  1093. - notes: 备注(可选)
  1094. Returns:
  1095. 创建成功的审核记录信息
  1096. """
  1097. try:
  1098. payload = request.get_json() or {}
  1099. if not isinstance(payload, dict):
  1100. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  1101. record_type = payload.get("record_type")
  1102. source = payload.get("source", "manual")
  1103. meta1 = payload.get("meta1")
  1104. meta2 = payload.get("meta2")
  1105. diff_fields = payload.get("diff_fields", [])
  1106. notes = payload.get("notes")
  1107. # 参数校验
  1108. if not record_type:
  1109. return jsonify(failed("record_type 不能为空"))
  1110. if record_type not in ("redundancy", "change", "merge"):
  1111. return jsonify(
  1112. failed("record_type 必须是 redundancy、change 或 merge 之一")
  1113. )
  1114. if not meta1 or not isinstance(meta1, dict):
  1115. return jsonify(failed("meta1 不能为空且必须是对象"))
  1116. if not meta2 or not isinstance(meta2, dict):
  1117. return jsonify(failed("meta2 不能为空且必须是对象"))
  1118. if not isinstance(diff_fields, list):
  1119. return jsonify(failed("diff_fields 必须是数组"))
  1120. # 校验元数据必要字段
  1121. required_fields = ["id", "name_zh", "name_en", "data_type", "status"]
  1122. for field in required_fields:
  1123. if field not in meta1:
  1124. return jsonify(failed(f"meta1 缺少必要字段: {field}"))
  1125. if field not in meta2:
  1126. return jsonify(failed(f"meta2 缺少必要字段: {field}"))
  1127. # 构建 new_meta(主元数据信息)
  1128. new_meta = {
  1129. "id": meta1.get("id"),
  1130. "name_zh": meta1.get("name_zh"),
  1131. "name_en": meta1.get("name_en"),
  1132. "data_type": meta1.get("data_type"),
  1133. "status": meta1.get("status"),
  1134. "tag_ids": meta1.get("tag_ids", []),
  1135. }
  1136. # 构建 candidates(候选/对比元数据列表)
  1137. # 格式: [{"snapshot": {...}, "diff_fields": [...], "candidate_meta_id": id}]
  1138. candidates = [
  1139. {
  1140. "snapshot": {
  1141. "id": meta2.get("id"),
  1142. "name_zh": meta2.get("name_zh"),
  1143. "name_en": meta2.get("name_en"),
  1144. "data_type": meta2.get("data_type"),
  1145. "status": meta2.get("status"),
  1146. "tag_ids": meta2.get("tag_ids", []),
  1147. },
  1148. "diff_fields": diff_fields,
  1149. "candidate_meta_id": meta2.get("id"),
  1150. }
  1151. ]
  1152. # 创建审核记录
  1153. review_record = MetadataReviewRecord()
  1154. review_record.record_type = record_type
  1155. review_record.source = source
  1156. review_record.new_meta = new_meta
  1157. review_record.candidates = candidates
  1158. review_record.status = "pending"
  1159. review_record.notes = notes
  1160. db.session.add(review_record)
  1161. db.session.commit()
  1162. logger.info(
  1163. f"创建审核记录成功: id={review_record.id}, "
  1164. f"record_type={record_type}, "
  1165. f"meta1_name={meta1.get('name_zh')}, "
  1166. f"meta2_name={meta2.get('name_zh')}"
  1167. )
  1168. return jsonify(
  1169. success(
  1170. {
  1171. "record": review_record.to_dict(),
  1172. "message": "审核记录创建成功,请前往数据审核页面进行处理",
  1173. }
  1174. )
  1175. )
  1176. except Exception as e:
  1177. logger.error(f"创建审核记录失败: {str(e)}")
  1178. db.session.rollback()
  1179. return jsonify(failed("创建审核记录失败", error=str(e)))
  1180. @bp.route("/review/detail", methods=["GET"])
  1181. def metadata_review_detail():
  1182. """
  1183. 审核记录详情
  1184. Query:
  1185. - id: 记录ID
  1186. """
  1187. try:
  1188. record_id = request.args.get("id")
  1189. if not record_id:
  1190. return jsonify(failed("缺少id参数"))
  1191. record = MetadataReviewRecord.query.get(int(record_id))
  1192. if not record:
  1193. return jsonify(failed("记录不存在"))
  1194. # 将 tag_ids 转换为 tags
  1195. data = convert_tag_ids_to_tags(record.to_dict())
  1196. # change 场景:返回受影响元数据的影响关系图谱(若有 meta_id)
  1197. impact_graph = None
  1198. if record.record_type == "change":
  1199. old_meta = record.old_meta or {}
  1200. meta_id = old_meta.get("meta_id")
  1201. if meta_id is not None and str(meta_id).strip() != "":
  1202. try:
  1203. impact_graph = meta_impact_graph(int(meta_id))
  1204. except Exception as e:
  1205. logger.warning(f"获取影响图谱失败: {e}")
  1206. data["impact_graph"] = impact_graph
  1207. return jsonify(success(data))
  1208. except Exception as e:
  1209. logger.error(f"审核记录详情查询失败: {str(e)}")
  1210. return jsonify(failed("审核记录详情查询失败", error=str(e)))
  1211. @bp.route("/review/resolve", methods=["POST"])
  1212. def metadata_review_resolve():
  1213. """
  1214. 处理审核记录
  1215. Body:
  1216. - id: 记录ID
  1217. - action: alias | create_new | accept_change | reject_change | ignore
  1218. - payload: 动作参数(可选)
  1219. - resolved_by: 处理人(可选)
  1220. - notes: 备注(可选)
  1221. action=alias:
  1222. payload: { primary_meta_id: int, alias_meta_id: int }
  1223. 行为:在 DataMeta 节点之间重建 ALIAS 关系
  1224. - 创建 (alias_meta)-[:ALIAS]->(primary_meta) 关系
  1225. - 将所有指向 alias_meta 的 ALIAS 关系转移到 primary_meta
  1226. - primary_meta 已有的 ALIAS 关系保持不变
  1227. - BusinessDomain 的 INCLUDES 关系不受影响
  1228. action=create_new:
  1229. payload: { new_name_zh: str }
  1230. 行为:创建新的 DataMeta(中文名区分)并关联业务领域
  1231. action=accept_change:
  1232. payload: { meta_id?: int }
  1233. 行为:把 new_meta 写回目标 DataMeta,并写入 metadata_version_history(PG)
  1234. action=reject_change/ignore:
  1235. 行为:仅更新审核记录状态
  1236. """
  1237. try:
  1238. payload = request.get_json() or {}
  1239. if not isinstance(payload, dict):
  1240. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  1241. record_id = payload.get("id")
  1242. action = payload.get("action")
  1243. action_payload = payload.get("payload") or {}
  1244. resolved_by = payload.get("resolved_by")
  1245. notes = payload.get("notes")
  1246. if not record_id:
  1247. return jsonify(failed("id 不能为空"))
  1248. if not action:
  1249. return jsonify(failed("action 不能为空"))
  1250. record = MetadataReviewRecord.query.get(int(record_id))
  1251. if not record:
  1252. return jsonify(failed("记录不存在"))
  1253. if record.status != "pending":
  1254. return jsonify(failed("记录已处理,无法重复处理"))
  1255. # 需要业务领域上下文的动作
  1256. bd_id = record.business_domain_id
  1257. new_meta = record.new_meta or {}
  1258. if action == "alias":
  1259. primary_meta_id = action_payload.get("primary_meta_id")
  1260. alias_meta_id = action_payload.get("alias_meta_id")
  1261. if not primary_meta_id:
  1262. return jsonify(failed("payload.primary_meta_id 不能为空"))
  1263. if not alias_meta_id:
  1264. return jsonify(failed("payload.alias_meta_id 不能为空"))
  1265. if int(primary_meta_id) == int(alias_meta_id):
  1266. return jsonify(failed("primary_meta_id 和 alias_meta_id 不能相同"))
  1267. # 写入 Neo4j:重建 DataMeta 节点间的 ALIAS 关系
  1268. from app.services.neo4j_driver import neo4j_driver
  1269. with neo4j_driver.get_session() as session:
  1270. # Step 1: 将所有指向 alias_meta 的 ALIAS 关系转移到 primary_meta
  1271. # 查找所有以 alias_meta 为目标的 ALIAS 关系,创建新关系指向 primary_meta,然后删除旧关系
  1272. session.run(
  1273. """
  1274. MATCH (other:DataMeta)-[old_rel:ALIAS]->(alias_meta:DataMeta)
  1275. WHERE id(alias_meta) = $alias_meta_id
  1276. WITH other, old_rel
  1277. MATCH (primary_meta:DataMeta)
  1278. WHERE id(primary_meta) = $primary_meta_id
  1279. MERGE (other)-[:ALIAS]->(primary_meta)
  1280. DELETE old_rel
  1281. """,
  1282. {
  1283. "alias_meta_id": int(alias_meta_id),
  1284. "primary_meta_id": int(primary_meta_id),
  1285. },
  1286. )
  1287. # Step 2: 创建 alias_meta 指向 primary_meta 的 ALIAS 关系
  1288. session.run(
  1289. """
  1290. MATCH (alias_meta:DataMeta), (primary_meta:DataMeta)
  1291. WHERE id(alias_meta) = $alias_meta_id AND id(primary_meta) = $primary_meta_id
  1292. MERGE (alias_meta)-[:ALIAS]->(primary_meta)
  1293. """,
  1294. {
  1295. "alias_meta_id": int(alias_meta_id),
  1296. "primary_meta_id": int(primary_meta_id),
  1297. },
  1298. )
  1299. update_review_record_resolution(
  1300. record,
  1301. action="alias",
  1302. payload={
  1303. "primary_meta_id": int(primary_meta_id),
  1304. "alias_meta_id": int(alias_meta_id),
  1305. },
  1306. resolved_by=resolved_by,
  1307. notes=notes,
  1308. )
  1309. db.session.commit()
  1310. return jsonify(success(record.to_dict()))
  1311. if action == "create_new":
  1312. new_name_zh = (action_payload.get("new_name_zh") or "").strip()
  1313. if not bd_id:
  1314. return jsonify(
  1315. failed("记录缺少 business_domain_id,无法执行 create_new")
  1316. )
  1317. if not new_name_zh:
  1318. return jsonify(failed("payload.new_name_zh 不能为空"))
  1319. from app.core.meta_data import get_formatted_time
  1320. from app.services.neo4j_driver import neo4j_driver
  1321. with neo4j_driver.get_session() as session:
  1322. # 创建新 DataMeta(避免覆盖旧节点)
  1323. result = session.run(
  1324. """
  1325. CREATE (m:DataMeta {
  1326. name_zh: $name_zh,
  1327. name_en: $name_en,
  1328. data_type: $data_type,
  1329. create_time: $create_time,
  1330. status: true
  1331. })
  1332. RETURN m
  1333. """,
  1334. {
  1335. "name_zh": new_name_zh,
  1336. "name_en": (new_meta.get("name_en") or "").strip(),
  1337. "data_type": (new_meta.get("data_type") or "varchar(255)"),
  1338. "create_time": get_formatted_time(),
  1339. },
  1340. ).single()
  1341. if not result or not result.get("m"):
  1342. return jsonify(failed("创建新元数据失败"))
  1343. new_meta_id = int(result["m"].id)
  1344. session.run(
  1345. """
  1346. MATCH (n:BusinessDomain), (m:DataMeta)
  1347. WHERE id(n) = $domain_id AND id(m) = $meta_id
  1348. MERGE (n)-[:INCLUDES]->(m)
  1349. """,
  1350. {"domain_id": int(bd_id), "meta_id": new_meta_id},
  1351. )
  1352. update_review_record_resolution(
  1353. record,
  1354. action="create_new",
  1355. payload={"new_name_zh": new_name_zh},
  1356. resolved_by=resolved_by,
  1357. notes=notes,
  1358. )
  1359. db.session.commit()
  1360. return jsonify(success(record.to_dict()))
  1361. if action == "accept_change":
  1362. old_meta = record.old_meta or {}
  1363. meta_id = action_payload.get("meta_id") or old_meta.get("meta_id")
  1364. if not meta_id:
  1365. return jsonify(failed("无法确定需要更新的 meta_id"))
  1366. from app.core.meta_data import get_formatted_time
  1367. from app.services.neo4j_driver import neo4j_driver
  1368. before_snapshot = old_meta.get("snapshot") or {}
  1369. after_snapshot = new_meta
  1370. # 写入 Neo4j:更新 DataMeta 属性,并尝试同步标签集合
  1371. with neo4j_driver.get_session() as session:
  1372. name_zh_val = (
  1373. after_snapshot.get("name_zh")
  1374. or before_snapshot.get("name_zh")
  1375. or ""
  1376. ).strip()
  1377. name_en_val = (after_snapshot.get("name_en") or "").strip()
  1378. data_type_val = after_snapshot.get("data_type") or "varchar(255)"
  1379. session.run(
  1380. """
  1381. MATCH (m:DataMeta)
  1382. WHERE id(m) = $meta_id
  1383. SET m.name_zh = $name_zh,
  1384. m.name_en = $name_en,
  1385. m.data_type = $data_type,
  1386. m.updateTime = $update_time,
  1387. m.status = true
  1388. """,
  1389. {
  1390. "meta_id": int(meta_id),
  1391. "name_zh": name_zh_val,
  1392. "name_en": name_en_val,
  1393. "data_type": data_type_val,
  1394. "update_time": get_formatted_time(),
  1395. },
  1396. )
  1397. tag_ids = after_snapshot.get("tag_ids") or []
  1398. tag_ids = [int(t) for t in tag_ids if t is not None]
  1399. if tag_ids:
  1400. session.run(
  1401. """
  1402. MATCH (m:DataMeta)-[r:LABEL]->(:DataLabel)
  1403. WHERE id(m) = $meta_id
  1404. DELETE r
  1405. """,
  1406. {"meta_id": int(meta_id)},
  1407. )
  1408. session.run(
  1409. """
  1410. MATCH (m:DataMeta)
  1411. WHERE id(m) = $meta_id
  1412. WITH m
  1413. UNWIND $tag_ids AS tid
  1414. MATCH (t:DataLabel) WHERE id(t) = tid
  1415. MERGE (m)-[:LABEL]->(t)
  1416. """,
  1417. {"meta_id": int(meta_id), "tag_ids": tag_ids},
  1418. )
  1419. # 写入版本历史(PG)
  1420. history = MetadataVersionHistory()
  1421. history.meta_id = int(meta_id) if meta_id is not None else 0
  1422. history.change_source = "ddl"
  1423. history.before_snapshot = (
  1424. before_snapshot if before_snapshot is not None else {}
  1425. )
  1426. history.after_snapshot = (
  1427. after_snapshot if after_snapshot is not None else {}
  1428. )
  1429. history.created_by = resolved_by if resolved_by is not None else ""
  1430. db.session.add(history)
  1431. update_review_record_resolution(
  1432. record,
  1433. action="accept_change",
  1434. payload={"meta_id": int(meta_id)},
  1435. resolved_by=resolved_by,
  1436. notes=notes,
  1437. )
  1438. db.session.commit()
  1439. return jsonify(success(record.to_dict()))
  1440. if action in ("reject_change", "ignore"):
  1441. update_review_record_resolution(
  1442. record,
  1443. action=action,
  1444. payload=action_payload,
  1445. resolved_by=resolved_by,
  1446. notes=notes,
  1447. )
  1448. db.session.commit()
  1449. return jsonify(success(record.to_dict()))
  1450. return jsonify(failed(f"不支持的action: {action}"))
  1451. except Exception as e:
  1452. logger.error(f"处理审核记录失败: {str(e)}")
  1453. db.session.rollback()
  1454. return jsonify(failed("处理审核记录失败", error=str(e)))