routes.py 60 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702
  1. import io
  2. import logging
  3. from flask import current_app, jsonify, request, send_file
  4. from minio import Minio
  5. from minio.error import S3Error
  6. from sqlalchemy import or_
  7. from app import db
  8. from app.api.meta_data import bp
  9. from app.core.meta_data import (
  10. check_redundancy_for_add,
  11. check_redundancy_for_update,
  12. convert_tag_ids_to_tags,
  13. get_file_content,
  14. get_formatted_time,
  15. handle_id_unstructured,
  16. handle_txt_graph,
  17. meta_impact_graph,
  18. meta_kinship_graph,
  19. meta_list,
  20. normalize_tag_inputs,
  21. parse_text,
  22. solve_unstructured_data,
  23. text_resource_solve,
  24. )
  25. from app.core.system.auth import require_auth
  26. from app.models.metadata_review import (
  27. MetadataReviewRecord,
  28. MetadataVersionHistory,
  29. update_review_record_resolution,
  30. )
  31. from app.models.result import failed, success
  32. from app.services.neo4j_driver import neo4j_driver
  33. logger = logging.getLogger("app")
  34. def get_minio_client():
  35. """获取 MinIO 客户端实例"""
  36. return Minio(
  37. current_app.config["MINIO_HOST"],
  38. access_key=current_app.config["MINIO_USER"],
  39. secret_key=current_app.config["MINIO_PASSWORD"],
  40. secure=current_app.config["MINIO_SECURE"],
  41. )
  42. def get_minio_config():
  43. """获取 MinIO 配置"""
  44. return {
  45. "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
  46. "PREFIX": current_app.config["PREFIX"],
  47. "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
  48. }
  49. def allowed_file(filename):
  50. """检查文件扩展名是否允许"""
  51. if "." not in filename:
  52. return False
  53. ext = filename.rsplit(".", 1)[1].lower()
  54. return ext in get_minio_config()["ALLOWED_EXTENSIONS"]
  55. # 元数据列表
  56. @bp.route("/node/list", methods=["POST"])
  57. def meta_node_list():
  58. try:
  59. payload = request.get_json() or {}
  60. if not isinstance(payload, dict):
  61. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  62. def to_int(value, default):
  63. try:
  64. return int(value)
  65. except (TypeError, ValueError):
  66. return default
  67. # 分页参数(size 未传则返回全部记录)
  68. raw_page_size = payload.get("size", None)
  69. if raw_page_size is None:
  70. page_size = None
  71. page = 1
  72. else:
  73. page_size = to_int(raw_page_size, 10)
  74. page = to_int(payload.get("current", 1), 1)
  75. # 过滤参数
  76. name_en_filter = payload.get("name_en") or None
  77. name_zh_filter = payload.get("name_zh") or None
  78. category_filter = payload.get("category") or None
  79. time_filter = payload.get("time") or None
  80. logger.info(
  81. f"[node/list] 过滤参数: name_zh={name_zh_filter}, "
  82. f"name_en={name_en_filter}, category={category_filter}"
  83. )
  84. tag_filter = payload.get("tag")
  85. if tag_filter is not None and not isinstance(tag_filter, list):
  86. tag_filter = None
  87. # 调用核心业务逻辑
  88. result, total_count = meta_list(
  89. page,
  90. page_size,
  91. "",
  92. name_en_filter,
  93. name_zh_filter,
  94. category_filter,
  95. time_filter,
  96. tag_filter,
  97. )
  98. # 返回结果
  99. response_size = total_count if page_size is None else page_size
  100. response_page = 1 if page_size is None else page
  101. return jsonify(
  102. success(
  103. {
  104. "records": result,
  105. "total": total_count,
  106. "size": response_size,
  107. "current": response_page,
  108. }
  109. )
  110. )
  111. except Exception as e:
  112. logger.error(f"获取元数据列表失败: {str(e)}")
  113. return jsonify(failed(str(e)))
  114. # 元数据图谱
  115. @bp.route("/node/graph", methods=["POST"])
  116. def meta_node_graph():
  117. try:
  118. if not request.json:
  119. return jsonify(failed("请求数据不能为空"))
  120. # 从请求中获取节点ID
  121. node_id = request.json.get("nodeId")
  122. if node_id is None:
  123. return jsonify(failed("nodeId 不能为空"))
  124. try:
  125. node_id_int = int(node_id)
  126. except (TypeError, ValueError):
  127. return jsonify(failed("nodeId 必须为整数"))
  128. # 调用核心业务逻辑
  129. graph = meta_kinship_graph(node_id_int)
  130. is_dict = isinstance(graph, dict)
  131. nodes = graph.get("nodes", []) if is_dict else []
  132. relationships = graph.get("relationships", []) if is_dict else []
  133. # 当前节点属性
  134. node_info = next(
  135. (n for n in nodes if n.get("id") == node_id_int),
  136. {},
  137. )
  138. # 关联节点(包含属性,便于前端展示名称等)
  139. related_nodes = [n for n in nodes if n.get("id") != node_id_int]
  140. payload = {
  141. "node": node_info,
  142. "related_nodes": related_nodes,
  143. "relationships": relationships,
  144. }
  145. return jsonify(success(payload))
  146. except Exception as e:
  147. logger.error(f"获取元数据图谱失败: {str(e)}")
  148. return jsonify(failed(str(e)))
  149. # 删除元数据
  150. @bp.route("/node/delete", methods=["POST"])
  151. def meta_node_delete():
  152. try:
  153. if not request.json:
  154. return jsonify(failed("请求数据不能为空"))
  155. # 从请求中获取节点ID
  156. node_id = request.json.get("id")
  157. # 删除节点逻辑
  158. with neo4j_driver.get_session() as session:
  159. cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n"
  160. session.run(cypher, node_id=int(node_id))
  161. # 返回结果
  162. return jsonify(success({}))
  163. except Exception as e:
  164. logger.error(f"删除元数据失败: {str(e)}")
  165. return jsonify(failed(str(e)))
  166. # 编辑元数据
  167. @bp.route("/node/edit", methods=["POST"])
  168. def meta_node_edit():
  169. try:
  170. if not request.json:
  171. return jsonify(failed("请求数据不能为空"))
  172. # 从请求中获取节点ID
  173. node_id = request.json.get("id")
  174. if not node_id:
  175. return jsonify(failed("节点ID不能为空"))
  176. # 获取节点
  177. with neo4j_driver.get_session() as session:
  178. # 查询节点信息
  179. cypher = """
  180. MATCH (n:DataMeta)
  181. WHERE id(n) = $node_id
  182. RETURN n
  183. """
  184. result = session.run(cypher, node_id=int(node_id))
  185. node = result.single()
  186. if not node or not node["n"]:
  187. return jsonify(failed("节点不存在"))
  188. # 获取节点数据
  189. node_data = dict(node["n"])
  190. node_data["id"] = node["n"].id
  191. # 获取标签信息
  192. tag_cypher = """
  193. MATCH (n:DataMeta)-[:LABEL]->(t:DataLabel)
  194. WHERE id(n) = $node_id
  195. RETURN t
  196. """
  197. tag_result = session.run(tag_cypher, node_id=int(node_id))
  198. tags: list[dict] = []
  199. for record in tag_result:
  200. tag_node = record.get("t")
  201. if tag_node:
  202. tags.append(
  203. {
  204. "id": tag_node.id,
  205. "name_zh": tag_node.get("name_zh", ""),
  206. "name_en": tag_node.get("name_en", ""),
  207. }
  208. )
  209. # 获取主数据信息
  210. master_data_cypher = """
  211. MATCH (n:DataMeta)-[:master_data]->(m:master_data)
  212. WHERE id(n) = $node_id
  213. RETURN m
  214. """
  215. master_data_result = session.run(master_data_cypher, node_id=int(node_id))
  216. master_data = master_data_result.single()
  217. # 构建返回数据
  218. response_data = [
  219. {
  220. "master_data": (
  221. master_data["m"].id
  222. if master_data and master_data["m"]
  223. else None
  224. ),
  225. "name_zh": node_data.get("name_zh", ""),
  226. "name_en": node_data.get("name_en", ""),
  227. "create_time": node_data.get("create_time", ""),
  228. "update_time": node_data.get("update_time", ""),
  229. "status": bool(node_data.get("status", True)),
  230. "data_type": node_data.get("data_type", ""),
  231. "tag": tags,
  232. "affiliation": node_data.get("affiliation"),
  233. "category": node_data.get("category"),
  234. "alias": node_data.get("alias"),
  235. "describe": node_data.get("describe"),
  236. }
  237. ]
  238. logger.info(f"成功获取元数据节点: ID={node_data['id']}")
  239. return jsonify(success(response_data))
  240. except Exception as e:
  241. logger.error(f"获取元数据节点失败: {str(e)}")
  242. return jsonify(failed(str(e)))
  243. # 增加元数据
  244. @bp.route("/check", methods=["GET"])
  245. def meta_check():
  246. """
  247. 检查元数据中文名是否已存在
  248. 请求参数:
  249. - name_zh: 元数据中文名(URL参数)
  250. 返回:
  251. - exists: true/false 表示是否存在
  252. """
  253. try:
  254. name_zh = request.args.get("name_zh")
  255. if not name_zh:
  256. return jsonify(failed("缺少name_zh参数"))
  257. # 查询数据库检查是否存在
  258. with neo4j_driver.get_session() as session:
  259. cypher = """
  260. MATCH (n:DataMeta {name_zh: $name_zh})
  261. RETURN count(n) > 0 as exists
  262. """
  263. result = session.run(cypher, name_zh=name_zh)
  264. record = result.single()
  265. if record:
  266. exists = record["exists"]
  267. logger.info(f"检查元数据 '{name_zh}': {'存在' if exists else '不存在'}")
  268. return jsonify(
  269. success({"exists": exists, "name_zh": name_zh}, "查询成功")
  270. )
  271. else:
  272. return jsonify(
  273. success({"exists": False, "name_zh": name_zh}, "查询成功")
  274. )
  275. except Exception as e:
  276. logger.error(f"检查元数据失败: {str(e)}")
  277. return jsonify(failed(f"检查失败: {str(e)}"))
  278. @bp.route("/node/add", methods=["POST"])
  279. def meta_node_add():
  280. """
  281. 新增元数据节点
  282. 在创建前会进行冗余检测:
  283. - 如果存在完全匹配的元数据,返回已存在的节点信息
  284. - 如果存在疑似重复的元数据,创建审核记录并返回提示
  285. - 如果无重复,正常创建新节点
  286. """
  287. try:
  288. if not request.json:
  289. return jsonify(failed("请求数据不能为空"))
  290. # 从请求中获取节点信息
  291. node_name_zh = request.json.get("name_zh")
  292. node_type = request.json.get("data_type")
  293. node_category = request.json.get("category")
  294. node_alias = request.json.get("alias")
  295. node_affiliation = request.json.get("affiliation")
  296. node_tag = request.json.get("tag")
  297. node_desc = request.json.get("describe")
  298. node_status = bool(request.json.get("status", True))
  299. node_name_en = request.json.get("name_en")
  300. # 是否强制创建(跳过冗余检测)
  301. force_create = bool(request.json.get("force_create", False))
  302. if not node_name_zh:
  303. return jsonify(failed("节点名称不能为空"))
  304. if not node_type:
  305. return jsonify(failed("节点类型不能为空"))
  306. # 统一处理标签ID
  307. tag_ids = normalize_tag_inputs(node_tag)
  308. # ========== 冗余检测 ==========
  309. has_suspicious_duplicates = False
  310. suspicious_candidates = []
  311. if not force_create:
  312. redundancy_result = check_redundancy_for_add(
  313. name_zh=node_name_zh,
  314. name_en=node_name_en or "",
  315. data_type=node_type,
  316. tag_ids=tag_ids,
  317. )
  318. # 存在完全匹配的元数据,直接返回,不做任何操作
  319. if redundancy_result["has_exact_match"]:
  320. exact_id = redundancy_result["exact_match_id"]
  321. logger.info(
  322. f"元数据已存在(完全匹配): name_zh={node_name_zh}, "
  323. f"existing_id={exact_id}"
  324. )
  325. return jsonify(
  326. failed(
  327. f"元数据已存在(完全匹配),无需重复创建。"
  328. f"已存在的元数据ID: {exact_id}"
  329. )
  330. )
  331. # 存在疑似重复的元数据,标记状态,稍后创建节点后再写入审核记录
  332. if redundancy_result["has_candidates"]:
  333. has_suspicious_duplicates = True
  334. suspicious_candidates = redundancy_result["candidates"]
  335. logger.info(
  336. f"发现疑似重复元数据: name_zh={node_name_zh}, "
  337. f"候选数量={len(suspicious_candidates)}"
  338. )
  339. # ========== 创建节点 ==========
  340. with neo4j_driver.get_session() as session:
  341. cypher = """
  342. MERGE (n:DataMeta {name_zh: $name_zh})
  343. ON CREATE SET n.name_en = $name_en,
  344. n.data_type = $data_type,
  345. n.category = $category,
  346. n.alias = $alias,
  347. n.affiliation = $affiliation,
  348. n.describe = $describe,
  349. n.create_time = $create_time,
  350. n.updateTime = $update_time,
  351. n.status = $status,
  352. n.name_en = $name_en
  353. ON MATCH SET n.data_type = $data_type,
  354. n.category = $category,
  355. n.alias = $alias,
  356. n.affiliation = $affiliation,
  357. n.describe = $describe,
  358. n.updateTime = $update_time,
  359. n.status = $status,
  360. n.name_en = $name_en
  361. RETURN n
  362. """
  363. create_time = update_time = get_formatted_time()
  364. result = session.run(
  365. cypher,
  366. name_zh=node_name_zh,
  367. data_type=node_type,
  368. category=node_category,
  369. alias=node_alias,
  370. affiliation=node_affiliation,
  371. describe=node_desc,
  372. create_time=create_time,
  373. update_time=update_time,
  374. status=node_status,
  375. name_en=node_name_en,
  376. )
  377. node = result.single()
  378. if node and node["n"]:
  379. node_data = dict(node["n"])
  380. node_data["id"] = node["n"].id
  381. # 如果提供了标签列表,创建标签关系
  382. tag_nodes = []
  383. if tag_ids:
  384. for tag_id in tag_ids:
  385. # 获取标签节点信息
  386. tag_fetch = session.run(
  387. "MATCH (t:DataLabel) WHERE id(t) = $tag_id RETURN t",
  388. tag_id=tag_id,
  389. ).single()
  390. if not tag_fetch or not tag_fetch.get("t"):
  391. logger.warning(f"未找到标签节点: {tag_id}")
  392. continue
  393. tag_node = tag_fetch["t"]
  394. tag_nodes.append(
  395. {
  396. "id": tag_node.id,
  397. "name_zh": tag_node.get("name_zh", ""),
  398. "name_en": tag_node.get("name_en", ""),
  399. }
  400. )
  401. tag_cypher = """
  402. MATCH (n:DataMeta), (t:DataLabel)
  403. WHERE id(n) = $node_id AND id(t) = $tag_id
  404. MERGE (n)-[r:LABEL]->(t)
  405. RETURN r
  406. """
  407. session.run(tag_cypher, node_id=node["n"].id, tag_id=tag_id)
  408. node_data["tag"] = tag_nodes
  409. logger.info(
  410. f"成功创建或更新元数据节点: "
  411. f"ID={node_data['id']}, name={node_name_zh}"
  412. )
  413. # ========== 处理疑似重复情况 ==========
  414. # 如果存在疑似重复,创建审核记录
  415. if has_suspicious_duplicates and suspicious_candidates:
  416. from app.core.meta_data.redundancy_check import (
  417. write_redundancy_review_record_with_new_id,
  418. )
  419. # 构建新元数据快照(包含新创建的节点ID)
  420. new_meta_snapshot = {
  421. "id": node_data["id"],
  422. "name_zh": node_name_zh,
  423. "name_en": node_name_en or "",
  424. "data_type": node_type,
  425. "tag_ids": tag_ids,
  426. }
  427. # 写入审核记录
  428. write_redundancy_review_record_with_new_id(
  429. new_meta=new_meta_snapshot,
  430. candidates=suspicious_candidates,
  431. source="api",
  432. )
  433. # 返回成功创建,但提示疑似重复
  434. candidate_names = [
  435. c.get("name_zh", "") for c in suspicious_candidates[:3]
  436. ]
  437. return jsonify(
  438. success(
  439. node_data,
  440. message=(
  441. f"元数据创建成功,但发现疑似重复元数据。"
  442. f"疑似重复: {', '.join(candidate_names)}。"
  443. f"已创建审核记录,请前往元数据审核页面进行处理。"
  444. ),
  445. )
  446. )
  447. return jsonify(success(node_data))
  448. else:
  449. logger.error(f"创建元数据节点失败: {node_name_zh}")
  450. return jsonify(failed("创建元数据节点失败"))
  451. except Exception as e:
  452. logger.error(f"添加元数据失败: {str(e)}")
  453. return jsonify(failed(str(e)))
  454. # 搜索元数据
  455. @bp.route("/search", methods=["GET"])
  456. def search_metadata_route():
  457. try:
  458. keyword = request.args.get("keyword", "")
  459. if not keyword:
  460. return jsonify(success([]))
  461. cypher = """
  462. MATCH (n:DataMeta)
  463. WHERE n.name_zh CONTAINS $keyword
  464. RETURN n LIMIT 100
  465. """
  466. with neo4j_driver.get_session() as session:
  467. result = session.run(cypher, keyword=keyword)
  468. metadata_list = [dict(record["n"]) for record in result]
  469. return jsonify(success(metadata_list))
  470. except Exception as e:
  471. logger.error(f"搜索元数据失败: {str(e)}")
  472. return jsonify(failed(str(e)))
  473. # 全文检索查询
  474. @bp.route("/full/text/query", methods=["POST"])
  475. def full_text_query():
  476. try:
  477. if not request.json:
  478. return jsonify(failed("请求数据不能为空"))
  479. # 获取查询条件
  480. search_term = request.json.get("query", "")
  481. if not search_term:
  482. return jsonify(failed("查询条件不能为空"))
  483. # 执行Neo4j全文索引查询
  484. with neo4j_driver.get_session() as session:
  485. cypher = """
  486. CALL db.index.fulltext.queryNodes("DataMetaFulltext", $term)
  487. YIELD node, score
  488. RETURN node, score
  489. ORDER BY score DESC
  490. LIMIT 20
  491. """
  492. result = session.run(cypher, term=search_term)
  493. # 处理查询结果
  494. search_results = []
  495. for record in result:
  496. node_data = dict(record["node"])
  497. node_data["id"] = record["node"].id
  498. node_data["score"] = record["score"]
  499. search_results.append(node_data)
  500. return jsonify(success(search_results))
  501. except Exception as e:
  502. logger.error(f"全文检索查询失败: {str(e)}")
  503. return jsonify(failed(str(e)))
  504. # 非结构化文本查询
  505. @bp.route("/unstructure/text/query", methods=["POST"])
  506. def unstructure_text_query():
  507. try:
  508. if not request.json:
  509. return jsonify(failed("请求数据不能为空"))
  510. # 获取查询参数
  511. node_id = request.json.get("id")
  512. if not node_id:
  513. return jsonify(failed("节点ID不能为空"))
  514. # 获取节点信息
  515. node_data = handle_id_unstructured(node_id)
  516. if not node_data:
  517. return jsonify(failed("节点不存在"))
  518. # 获取对象路径
  519. object_name = node_data.get("url")
  520. if not object_name:
  521. return jsonify(failed("文档路径不存在"))
  522. # 获取 MinIO 配置
  523. minio_client = get_minio_client()
  524. config = get_minio_config()
  525. bucket_name = config["MINIO_BUCKET"]
  526. # 从MinIO获取文件内容
  527. file_content = get_file_content(minio_client, bucket_name, object_name)
  528. # 解析文本内容
  529. parsed_data = parse_text(file_content)
  530. # 返回结果
  531. result = {
  532. "node": node_data,
  533. "parsed": parsed_data,
  534. "content": (
  535. file_content[:1000] + "..."
  536. if len(file_content) > 1000
  537. else file_content
  538. ),
  539. }
  540. return jsonify(success(result))
  541. except Exception as e:
  542. logger.error(f"非结构化文本查询失败: {str(e)}")
  543. return jsonify(failed(str(e)))
  544. # 文件上传
  545. @bp.route("/resource/upload", methods=["POST"])
  546. def upload_file():
  547. try:
  548. # 检查请求中是否有文件
  549. if "file" not in request.files:
  550. return jsonify(failed("没有找到上传的文件"))
  551. file = request.files["file"]
  552. # 检查文件名
  553. if not file.filename:
  554. return jsonify(failed("未选择文件"))
  555. # 保存文件名到本地变量(确保类型安全)
  556. filename = file.filename
  557. # 检查文件类型
  558. if not allowed_file(filename):
  559. return jsonify(failed("不支持的文件类型"))
  560. # 获取 MinIO 配置
  561. minio_client = get_minio_client()
  562. config = get_minio_config()
  563. # 上传到MinIO
  564. file_content = file.read()
  565. file_size = len(file_content)
  566. file_type = filename.rsplit(".", 1)[1].lower()
  567. # 提取文件名(不包含扩展名)
  568. filename_without_ext = filename.rsplit(".", 1)[0]
  569. # 生成紧凑的时间戳 (yyyyMMddHHmmss)
  570. import time
  571. timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
  572. # 生成唯一文件名
  573. object_name = (
  574. f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}"
  575. )
  576. # 上传文件
  577. minio_client.put_object(
  578. config["MINIO_BUCKET"],
  579. object_name,
  580. io.BytesIO(file_content),
  581. file_size,
  582. content_type=f"application/{file_type}",
  583. )
  584. # 返回结果
  585. return jsonify(
  586. success(
  587. {
  588. "filename": file.filename,
  589. "size": file_size,
  590. "type": file_type,
  591. "url": object_name,
  592. }
  593. )
  594. )
  595. except Exception as e:
  596. logger.error(f"文件上传失败: {str(e)}")
  597. return jsonify(failed(str(e)))
  598. # 文件下载显示
  599. @bp.route("/resource/display", methods=["POST"])
  600. def upload_file_display():
  601. response = None
  602. try:
  603. if not request.json:
  604. return jsonify(failed("请求数据不能为空"))
  605. object_name = request.json.get("url")
  606. if not object_name:
  607. return jsonify(failed("文件路径不能为空"))
  608. # 获取 MinIO 配置
  609. minio_client = get_minio_client()
  610. config = get_minio_config()
  611. # 获取文件内容
  612. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  613. file_data = response.read()
  614. # 获取文件名
  615. file_name = object_name.split("/")[-1]
  616. # 确定文件类型
  617. file_extension = file_name.split(".")[-1].lower()
  618. # 为不同文件类型设置合适的MIME类型
  619. mime_types = {
  620. "pdf": "application/pdf",
  621. "doc": "application/msword",
  622. "docx": (
  623. "application/vnd.openxmlformats-"
  624. "officedocument.wordprocessingml.document"
  625. ),
  626. "xls": "application/vnd.ms-excel",
  627. "xlsx": (
  628. "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  629. ),
  630. "txt": "text/plain",
  631. "csv": "text/csv",
  632. }
  633. content_type = mime_types.get(file_extension, "application/octet-stream")
  634. # 返回结果
  635. return jsonify(
  636. success(
  637. {
  638. "filename": file_name,
  639. "type": file_extension,
  640. "contentType": content_type,
  641. "size": len(file_data),
  642. "url": f"/api/meta/resource/download?url={object_name}",
  643. }
  644. )
  645. )
  646. except S3Error as e:
  647. logger.error(f"MinIO操作失败: {str(e)}")
  648. return jsonify(failed(f"文件访问失败: {str(e)}"))
  649. except Exception as e:
  650. logger.error(f"文件显示信息获取失败: {str(e)}")
  651. return jsonify(failed(str(e)))
  652. finally:
  653. if response:
  654. response.close()
  655. response.release_conn()
  656. # 文件下载接口
  657. @bp.route("/resource/download", methods=["GET"])
  658. def download_file():
  659. response = None
  660. try:
  661. object_name = request.args.get("url")
  662. if not object_name:
  663. return jsonify(failed("文件路径不能为空"))
  664. # URL解码,处理特殊字符
  665. import urllib.parse
  666. object_name = urllib.parse.unquote(object_name)
  667. # 记录下载请求信息,便于调试
  668. logger.info(f"下载文件请求: {object_name}")
  669. # 获取 MinIO 配置
  670. minio_client = get_minio_client()
  671. config = get_minio_config()
  672. # 获取文件
  673. try:
  674. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  675. file_data = response.read()
  676. except S3Error as e:
  677. logger.error(f"MinIO获取文件失败: {str(e)}")
  678. return jsonify(failed(f"文件获取失败: {str(e)}"))
  679. # 获取文件名,并处理特殊字符
  680. file_name = object_name.split("/")[-1]
  681. # 直接从内存返回文件,不创建临时文件
  682. file_stream = io.BytesIO(file_data)
  683. # 返回文件
  684. return send_file(
  685. file_stream,
  686. as_attachment=True,
  687. download_name=file_name,
  688. mimetype="application/octet-stream",
  689. )
  690. except Exception as e:
  691. logger.error(f"文件下载失败: {str(e)}")
  692. return jsonify(failed(str(e)))
  693. finally:
  694. if response:
  695. response.close()
  696. response.release_conn()
  697. # 文本资源翻译
  698. @bp.route("/resource/translate", methods=["POST"])
  699. def text_resource_translate():
  700. try:
  701. if not request.json:
  702. return jsonify(failed("请求数据不能为空"))
  703. # 获取参数
  704. name_zh = request.json.get("name_zh", "")
  705. keyword = request.json.get("keyword", "")
  706. if not name_zh:
  707. return jsonify(failed("名称不能为空"))
  708. # 调用资源处理逻辑
  709. result = text_resource_solve(None, name_zh, keyword)
  710. return jsonify(success(result))
  711. except Exception as e:
  712. logger.error(f"文本资源翻译失败: {str(e)}")
  713. return jsonify(failed(str(e)))
  714. # 创建文本资源节点
  715. @bp.route("/resource/node", methods=["POST"])
  716. def text_resource_node():
  717. try:
  718. if not request.json:
  719. return jsonify(failed("请求数据不能为空"))
  720. # 获取参数
  721. name_zh = request.json.get("name_zh", "")
  722. name_en = request.json.get("name_en", "")
  723. keywords = request.json.get("keywords", [])
  724. keywords_en = request.json.get("keywords_en", [])
  725. object_name = request.json.get("url", "")
  726. if not name_zh or not name_en or not object_name:
  727. return jsonify(failed("参数不完整"))
  728. # 创建节点
  729. with neo4j_driver.get_session() as session:
  730. # 创建资源节点
  731. cypher = """
  732. CREATE (n:DataMeta {
  733. name_zh: $name_zh,
  734. name_en: $name_en,
  735. keywords: $keywords,
  736. keywords_en: $keywords_en,
  737. url: $object_name,
  738. create_time: $create_time,
  739. updateTime: $update_time
  740. })
  741. RETURN n
  742. """
  743. create_time = update_time = get_formatted_time()
  744. result = session.run(
  745. cypher,
  746. name_zh=name_zh,
  747. name_en=name_en,
  748. keywords=keywords,
  749. keywords_en=keywords_en,
  750. object_name=object_name,
  751. create_time=create_time,
  752. update_time=update_time,
  753. )
  754. record = result.single()
  755. if not record:
  756. return jsonify(failed("创建节点失败"))
  757. node = record["n"]
  758. # 为每个关键词创建标签节点并关联
  759. for i, keyword in enumerate(keywords):
  760. if keyword:
  761. # 创建标签节点
  762. tag_cypher = """
  763. MERGE (t:Tag {name_zh: $name_zh})
  764. ON CREATE SET t.name_en = $name_en,
  765. t.create_time = $create_time
  766. RETURN t
  767. """
  768. tag_result = session.run(
  769. tag_cypher,
  770. name_zh=keyword,
  771. name_en=keywords_en[i] if i < len(keywords_en) else "",
  772. create_time=create_time,
  773. )
  774. tag_record = tag_result.single()
  775. if not tag_record:
  776. continue
  777. tag_node = tag_record["t"]
  778. # 创建关系
  779. rel_cypher = """
  780. MATCH (n), (t)
  781. WHERE id(n) = $node_id AND id(t) = $tag_id
  782. CREATE (n)-[r:HAS_TAG]->(t)
  783. RETURN r
  784. """
  785. session.run(rel_cypher, node_id=node.id, tag_id=tag_node.id)
  786. # 返回创建的节点
  787. return jsonify(success(dict(node)))
  788. except Exception as e:
  789. logger.error(f"创建文本资源节点失败: {str(e)}")
  790. return jsonify(failed(str(e)))
  791. # 处理非结构化数据
  792. @bp.route("/unstructured/process", methods=["POST"])
  793. def processing_unstructured_data():
  794. try:
  795. if not request.json:
  796. return jsonify(failed("请求数据不能为空"))
  797. # 获取参数
  798. node_id = request.json.get("id")
  799. if not node_id:
  800. return jsonify(failed("节点ID不能为空"))
  801. # 获取 MinIO 配置
  802. minio_client = get_minio_client()
  803. config = get_minio_config()
  804. prefix = config["PREFIX"]
  805. # 调用处理逻辑
  806. result = solve_unstructured_data(node_id, minio_client, prefix)
  807. if result:
  808. return jsonify(success({"message": "处理成功"}))
  809. else:
  810. return jsonify(failed("处理失败"))
  811. except Exception as e:
  812. logger.error(f"处理非结构化数据失败: {str(e)}")
  813. return jsonify(failed(str(e)))
  814. # 创建文本图谱
  815. @bp.route("/text/graph", methods=["POST"])
  816. def create_text_graph():
  817. try:
  818. if not request.json:
  819. return jsonify(failed("请求数据不能为空"))
  820. # 获取参数
  821. node_id = request.json.get("id")
  822. entity_zh = request.json.get("entity_zh")
  823. entity_en = request.json.get("entity_en")
  824. if not all([node_id, entity_zh, entity_en]):
  825. return jsonify(failed("参数不完整"))
  826. # 创建图谱
  827. result = handle_txt_graph(node_id, entity_zh, entity_en)
  828. if result:
  829. return jsonify(success({"message": "图谱创建成功"}))
  830. else:
  831. return jsonify(failed("图谱创建失败"))
  832. except Exception as e:
  833. logger.error(f"创建文本图谱失败: {str(e)}")
  834. return jsonify(failed(str(e)))
  835. @bp.route("/config", methods=["GET"])
  836. @require_auth
  837. def get_meta_config():
  838. """获取元数据配置信息"""
  839. config = get_minio_config()
  840. return jsonify(
  841. {
  842. "bucket_name": config["MINIO_BUCKET"],
  843. "prefix": config["PREFIX"],
  844. "allowed_extensions": list(config["ALLOWED_EXTENSIONS"]),
  845. }
  846. )
  847. # 更新元数据
  848. @bp.route("/node/update", methods=["POST"])
  849. def meta_node_update():
  850. """
  851. 更新元数据节点
  852. 在更新前会进行冗余检测(如果修改了 name_zh/name_en):
  853. - 如果更新后的名称与其他节点完全匹配,返回错误
  854. - 如果存在疑似重复的元数据,创建审核记录并返回提示
  855. - 如果无重复,正常更新节点
  856. """
  857. try:
  858. if not request.json:
  859. return jsonify(failed("请求数据不能为空"))
  860. # 从请求中获取节点ID和更新数据
  861. node_id = request.json.get("id")
  862. if not node_id:
  863. return jsonify(failed("节点ID不能为空"))
  864. # 验证并转换节点ID为整数
  865. try:
  866. node_id = int(node_id)
  867. except (ValueError, TypeError):
  868. return jsonify(failed(f"节点ID必须为整数,当前值: {node_id}"))
  869. # 是否强制更新(跳过冗余检测)
  870. force_update = bool(request.json.get("force_update", False))
  871. # 更新节点
  872. with neo4j_driver.get_session() as session:
  873. # 检查节点是否存在并获取当前值
  874. check_cypher = """
  875. MATCH (n:DataMeta)
  876. WHERE id(n) = $node_id
  877. RETURN n
  878. """
  879. result = session.run(check_cypher, node_id=node_id)
  880. node = result.single()
  881. if not node or not node["n"]:
  882. return jsonify(failed("节点不存在"))
  883. # 获取当前节点属性
  884. current_node = dict(node["n"])
  885. # 处理每个可能的更新字段
  886. fields_to_update = {
  887. "name_zh": request.json.get("name_zh"),
  888. "category": request.json.get("category"),
  889. "alias": request.json.get("alias"),
  890. "affiliation": request.json.get("affiliation"),
  891. "data_type": request.json.get("data_type"),
  892. "describe": request.json.get("describe"),
  893. "status": request.json.get("status"),
  894. "name_en": request.json.get("name_en"),
  895. }
  896. # 计算更新后的值(用于冗余检测)
  897. updated_name_zh = (
  898. fields_to_update["name_zh"]
  899. if fields_to_update["name_zh"] is not None
  900. else current_node.get("name_zh", "")
  901. )
  902. updated_name_en = (
  903. fields_to_update["name_en"]
  904. if fields_to_update["name_en"] is not None
  905. else current_node.get("name_en", "")
  906. )
  907. updated_data_type = (
  908. fields_to_update["data_type"]
  909. if fields_to_update["data_type"] is not None
  910. else current_node.get("data_type", "varchar(255)")
  911. )
  912. # 处理标签
  913. tag = request.json.get("tag")
  914. tag_ids = normalize_tag_inputs(tag) if tag is not None else []
  915. # ========== 冗余检测(仅当修改了 name_zh 或 name_en 时)==========
  916. name_changed = (
  917. fields_to_update["name_zh"] is not None
  918. and fields_to_update["name_zh"] != current_node.get("name_zh")
  919. ) or (
  920. fields_to_update["name_en"] is not None
  921. and fields_to_update["name_en"] != current_node.get("name_en")
  922. )
  923. if name_changed and not force_update:
  924. redundancy_result = check_redundancy_for_update(
  925. node_id=node_id,
  926. name_zh=updated_name_zh,
  927. name_en=updated_name_en,
  928. data_type=updated_data_type,
  929. tag_ids=tag_ids,
  930. )
  931. # 存在完全匹配的其他元数据
  932. if redundancy_result["has_exact_match"]:
  933. exact_id = redundancy_result["exact_match_id"]
  934. logger.warning(
  935. f"更新后元数据与其他节点完全匹配: "
  936. f"node_id={node_id}, existing_id={exact_id}"
  937. )
  938. return jsonify(
  939. failed(
  940. f"更新后的元数据与已有节点(ID={exact_id})完全相同,"
  941. f"请检查是否需要合并或修改名称。"
  942. )
  943. )
  944. # 存在疑似重复的元数据,已创建审核记录
  945. if redundancy_result["review_created"]:
  946. candidates = redundancy_result["candidates"]
  947. candidate_names = [c.get("name_zh", "") for c in candidates[:3]]
  948. logger.info(
  949. f"更新元数据发现疑似重复: node_id={node_id}, "
  950. f"candidates={candidate_names}"
  951. )
  952. return jsonify(
  953. failed(
  954. f"发现疑似重复元数据,已创建审核记录。"
  955. f"疑似重复: {', '.join(candidate_names)}。"
  956. f"请前往元数据审核页面处理,或使用 force_update=true 强制更新。"
  957. )
  958. )
  959. # ========== 执行更新 ==========
  960. # 构建更新语句,只更新提供的属性
  961. update_cypher = """
  962. MATCH (n:DataMeta)
  963. WHERE id(n) = $node_id
  964. SET n.updateTime = $update_time
  965. """
  966. # 准备更新参数
  967. update_params = {"node_id": node_id, "update_time": get_formatted_time()}
  968. # 只更新提供了新值的字段
  969. for field, new_value in fields_to_update.items():
  970. if new_value is not None:
  971. # 特殊处理 data_type 字段映射
  972. if field == "data_type":
  973. update_cypher += f", n.data_type = ${field}\n"
  974. else:
  975. update_cypher += f", n.{field} = ${field}\n"
  976. update_params[field] = new_value
  977. update_cypher += "RETURN n"
  978. result = session.run(
  979. update_cypher, # type: ignore[arg-type]
  980. **update_params,
  981. )
  982. updated_node = result.single()
  983. if updated_node and updated_node["n"]:
  984. node_data = dict(updated_node["n"])
  985. node_data["id"] = updated_node["n"].id
  986. # 如果更新了标签,处理标签关系(支持列表)
  987. if tag is not None:
  988. # 先删除现有标签关系
  989. delete_tag_cypher = """
  990. MATCH (n:DataMeta)-[r:LABEL]->(t:DataLabel)
  991. WHERE id(n) = $node_id
  992. DELETE r
  993. """
  994. session.run(delete_tag_cypher, node_id=node_id)
  995. for tag_id in tag_ids:
  996. create_tag_cypher = """
  997. MATCH (n:DataMeta), (t:DataLabel)
  998. WHERE id(n) = $node_id AND id(t) = $tag_id
  999. MERGE (n)-[r:LABEL]->(t)
  1000. RETURN r
  1001. """
  1002. session.run(create_tag_cypher, node_id=node_id, tag_id=tag_id)
  1003. logger.info(f"成功更新元数据节点: ID={node_data['id']}")
  1004. return jsonify(success(node_data))
  1005. else:
  1006. logger.error(f"更新元数据节点失败: ID={node_id}")
  1007. return jsonify(failed("更新元数据节点失败"))
  1008. except Exception as e:
  1009. logger.error(f"更新元数据失败: {str(e)}")
  1010. return jsonify(failed(str(e)))
  1011. @bp.route("/review/list", methods=["POST"])
  1012. def metadata_review_list():
  1013. """
  1014. 审核记录列表:疑似冗余/变动
  1015. Body:
  1016. - current: 页码(默认1)
  1017. - size: 每页数量(默认10)
  1018. - record_type: redundancy|change(可选)
  1019. - status: pending|resolved|ignored(可选)
  1020. - business_domain_id: 业务领域ID(可选)
  1021. - keyword: 关键字(可选,匹配 new_meta.name_zh/name_en)
  1022. """
  1023. try:
  1024. payload = request.get_json() or {}
  1025. if not isinstance(payload, dict):
  1026. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  1027. def to_int(value, default):
  1028. try:
  1029. return int(value)
  1030. except (TypeError, ValueError):
  1031. return default
  1032. page = to_int(payload.get("current", 1), 1)
  1033. page_size = to_int(payload.get("size", 10), 10)
  1034. record_type = payload.get("record_type")
  1035. status = payload.get("status")
  1036. business_domain_id = payload.get("business_domain_id")
  1037. keyword = (payload.get("keyword") or "").strip()
  1038. query = MetadataReviewRecord.query
  1039. if record_type:
  1040. query = query.filter(MetadataReviewRecord.record_type == record_type)
  1041. if status:
  1042. query = query.filter(MetadataReviewRecord.status == status)
  1043. if business_domain_id is not None and str(business_domain_id).strip() != "":
  1044. bd_id_int = int(business_domain_id)
  1045. query = query.filter(MetadataReviewRecord.business_domain_id == bd_id_int)
  1046. if keyword:
  1047. # 兼容:使用JSONB ->> 提取进行模糊匹配
  1048. name_zh_col = MetadataReviewRecord.new_meta["name_zh"].astext
  1049. name_en_col = MetadataReviewRecord.new_meta["name_en"].astext
  1050. query = query.filter(
  1051. or_(
  1052. name_zh_col.contains(keyword),
  1053. name_en_col.contains(keyword),
  1054. )
  1055. )
  1056. total = query.count()
  1057. records = (
  1058. query.order_by(MetadataReviewRecord.created_at.desc())
  1059. .offset((page - 1) * page_size)
  1060. .limit(page_size)
  1061. .all()
  1062. )
  1063. # 将 tag_ids 转换为 tags
  1064. records_data = [convert_tag_ids_to_tags(r.to_dict()) for r in records]
  1065. return jsonify(
  1066. success(
  1067. {
  1068. "records": records_data,
  1069. "total": total,
  1070. "size": page_size,
  1071. "current": page,
  1072. }
  1073. )
  1074. )
  1075. except Exception as e:
  1076. logger.error(f"审核记录列表查询失败: {str(e)}")
  1077. return jsonify(failed("审核记录列表查询失败", error=str(e)))
  1078. @bp.route("/review/create", methods=["POST"])
  1079. def metadata_review_create():
  1080. """
  1081. 创建元数据审核记录
  1082. Body:
  1083. - record_type: 审核记录类型(redundancy: 疑似重复 / change: 疑似变动 / merge: 合并请求)
  1084. - source: 触发来源(默认 "manual")
  1085. - meta1: 第一个元数据信息
  1086. - id: 节点ID
  1087. - name_zh: 中文名
  1088. - name_en: 英文名
  1089. - data_type: 数据类型
  1090. - status: 状态
  1091. - tag_ids: 标签ID列表(可选)
  1092. - meta2: 第二个元数据信息
  1093. - id: 节点ID
  1094. - name_zh: 中文名
  1095. - name_en: 英文名
  1096. - data_type: 数据类型
  1097. - status: 状态
  1098. - tag_ids: 标签ID列表(可选)
  1099. - diff_fields: 差异字段列表(可选,如 ["name_zh", "name_en"])
  1100. - notes: 备注(可选)
  1101. Returns:
  1102. 创建成功的审核记录信息
  1103. """
  1104. try:
  1105. payload = request.get_json() or {}
  1106. if not isinstance(payload, dict):
  1107. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  1108. record_type = payload.get("record_type")
  1109. source = payload.get("source", "manual")
  1110. meta1 = payload.get("meta1")
  1111. meta2 = payload.get("meta2")
  1112. diff_fields = payload.get("diff_fields", [])
  1113. notes = payload.get("notes")
  1114. # 参数校验
  1115. if not record_type:
  1116. return jsonify(failed("record_type 不能为空"))
  1117. if record_type not in ("redundancy", "change", "merge"):
  1118. return jsonify(
  1119. failed("record_type 必须是 redundancy、change 或 merge 之一")
  1120. )
  1121. if not meta1 or not isinstance(meta1, dict):
  1122. return jsonify(failed("meta1 不能为空且必须是对象"))
  1123. if not meta2 or not isinstance(meta2, dict):
  1124. return jsonify(failed("meta2 不能为空且必须是对象"))
  1125. if not isinstance(diff_fields, list):
  1126. return jsonify(failed("diff_fields 必须是数组"))
  1127. # 校验元数据必要字段
  1128. required_fields = ["id", "name_zh", "name_en", "data_type", "status"]
  1129. for field in required_fields:
  1130. if field not in meta1:
  1131. return jsonify(failed(f"meta1 缺少必要字段: {field}"))
  1132. if field not in meta2:
  1133. return jsonify(failed(f"meta2 缺少必要字段: {field}"))
  1134. # 构建 new_meta(主元数据信息)
  1135. new_meta = {
  1136. "id": meta1.get("id"),
  1137. "name_zh": meta1.get("name_zh"),
  1138. "name_en": meta1.get("name_en"),
  1139. "data_type": meta1.get("data_type"),
  1140. "status": meta1.get("status"),
  1141. "tag_ids": meta1.get("tag_ids", []),
  1142. }
  1143. # 构建 candidates(候选/对比元数据列表)
  1144. # 格式: [{"snapshot": {...}, "diff_fields": [...], "candidate_meta_id": id}]
  1145. candidates = [
  1146. {
  1147. "snapshot": {
  1148. "id": meta2.get("id"),
  1149. "name_zh": meta2.get("name_zh"),
  1150. "name_en": meta2.get("name_en"),
  1151. "data_type": meta2.get("data_type"),
  1152. "status": meta2.get("status"),
  1153. "tag_ids": meta2.get("tag_ids", []),
  1154. },
  1155. "diff_fields": diff_fields,
  1156. "candidate_meta_id": meta2.get("id"),
  1157. }
  1158. ]
  1159. # 创建审核记录
  1160. review_record = MetadataReviewRecord()
  1161. review_record.record_type = record_type
  1162. review_record.source = source
  1163. review_record.new_meta = new_meta
  1164. review_record.candidates = candidates
  1165. review_record.status = "pending"
  1166. review_record.notes = notes
  1167. db.session.add(review_record)
  1168. db.session.commit()
  1169. logger.info(
  1170. f"创建审核记录成功: id={review_record.id}, "
  1171. f"record_type={record_type}, "
  1172. f"meta1_name={meta1.get('name_zh')}, "
  1173. f"meta2_name={meta2.get('name_zh')}"
  1174. )
  1175. return jsonify(
  1176. success(
  1177. {
  1178. "record": review_record.to_dict(),
  1179. "message": "审核记录创建成功,请前往数据审核页面进行处理",
  1180. }
  1181. )
  1182. )
  1183. except Exception as e:
  1184. logger.error(f"创建审核记录失败: {str(e)}")
  1185. db.session.rollback()
  1186. return jsonify(failed("创建审核记录失败", error=str(e)))
  1187. @bp.route("/review/detail", methods=["GET"])
  1188. def metadata_review_detail():
  1189. """
  1190. 审核记录详情
  1191. Query:
  1192. - id: 记录ID
  1193. """
  1194. try:
  1195. record_id = request.args.get("id")
  1196. if not record_id:
  1197. return jsonify(failed("缺少id参数"))
  1198. record = MetadataReviewRecord.query.get(int(record_id))
  1199. if not record:
  1200. return jsonify(failed("记录不存在"))
  1201. # 将 tag_ids 转换为 tags
  1202. data = convert_tag_ids_to_tags(record.to_dict())
  1203. # change 场景:返回受影响元数据的影响关系图谱(若有 meta_id)
  1204. impact_graph = None
  1205. if record.record_type == "change":
  1206. old_meta = record.old_meta or {}
  1207. meta_id = old_meta.get("meta_id")
  1208. if meta_id is not None and str(meta_id).strip() != "":
  1209. try:
  1210. impact_graph = meta_impact_graph(int(meta_id))
  1211. except Exception as e:
  1212. logger.warning(f"获取影响图谱失败: {e}")
  1213. data["impact_graph"] = impact_graph
  1214. return jsonify(success(data))
  1215. except Exception as e:
  1216. logger.error(f"审核记录详情查询失败: {str(e)}")
  1217. return jsonify(failed("审核记录详情查询失败", error=str(e)))
  1218. @bp.route("/review/resolve", methods=["POST"])
  1219. def metadata_review_resolve():
  1220. """
  1221. 处理审核记录
  1222. Body:
  1223. - id: 记录ID
  1224. - action: alias | create_new | accept_change | reject_change | ignore
  1225. - payload: 动作参数(可选)
  1226. - resolved_by: 处理人(可选)
  1227. - notes: 备注(可选)
  1228. action=alias:
  1229. payload: { primary_meta_id: int, alias_meta_id: int }
  1230. 行为:在 DataMeta 节点之间重建 ALIAS 关系
  1231. - 创建 (alias_meta)-[:ALIAS]->(primary_meta) 关系
  1232. - 将所有指向 alias_meta 的 ALIAS 关系转移到 primary_meta
  1233. - primary_meta 已有的 ALIAS 关系保持不变
  1234. - BusinessDomain 的 INCLUDES 关系不受影响
  1235. action=create_new:
  1236. payload: { new_name_zh: str }
  1237. 行为:创建新的 DataMeta(中文名区分)并关联业务领域
  1238. action=accept_change:
  1239. payload: { meta_id?: int }
  1240. 行为:把 new_meta 写回目标 DataMeta,并写入 metadata_version_history(PG)
  1241. action=reject_change/ignore:
  1242. 行为:仅更新审核记录状态
  1243. """
  1244. try:
  1245. payload = request.get_json() or {}
  1246. if not isinstance(payload, dict):
  1247. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  1248. record_id = payload.get("id")
  1249. action = payload.get("action")
  1250. action_payload = payload.get("payload") or {}
  1251. resolved_by = payload.get("resolved_by")
  1252. notes = payload.get("notes")
  1253. if not record_id:
  1254. return jsonify(failed("id 不能为空"))
  1255. if not action:
  1256. return jsonify(failed("action 不能为空"))
  1257. record = MetadataReviewRecord.query.get(int(record_id))
  1258. if not record:
  1259. return jsonify(failed("记录不存在"))
  1260. if record.status != "pending":
  1261. return jsonify(failed("记录已处理,无法重复处理"))
  1262. # 需要业务领域上下文的动作
  1263. bd_id = record.business_domain_id
  1264. new_meta = record.new_meta or {}
  1265. if action == "alias":
  1266. primary_meta_id = action_payload.get("primary_meta_id")
  1267. alias_meta_id = action_payload.get("alias_meta_id")
  1268. if not primary_meta_id:
  1269. return jsonify(failed("payload.primary_meta_id 不能为空"))
  1270. if not alias_meta_id:
  1271. return jsonify(failed("payload.alias_meta_id 不能为空"))
  1272. if int(primary_meta_id) == int(alias_meta_id):
  1273. return jsonify(failed("primary_meta_id 和 alias_meta_id 不能相同"))
  1274. # 写入 Neo4j:重建 DataMeta 节点间的 ALIAS 关系
  1275. from app.services.neo4j_driver import neo4j_driver
  1276. with neo4j_driver.get_session() as session:
  1277. # Step 1: 将所有指向 alias_meta 的 ALIAS 关系转移到 primary_meta
  1278. # 查找所有以 alias_meta 为目标的 ALIAS 关系,创建新关系指向 primary_meta,然后删除旧关系
  1279. session.run(
  1280. """
  1281. MATCH (other:DataMeta)-[old_rel:ALIAS]->(alias_meta:DataMeta)
  1282. WHERE id(alias_meta) = $alias_meta_id
  1283. WITH other, old_rel
  1284. MATCH (primary_meta:DataMeta)
  1285. WHERE id(primary_meta) = $primary_meta_id
  1286. MERGE (other)-[:ALIAS]->(primary_meta)
  1287. DELETE old_rel
  1288. """,
  1289. {
  1290. "alias_meta_id": int(alias_meta_id),
  1291. "primary_meta_id": int(primary_meta_id),
  1292. },
  1293. )
  1294. # Step 2: 创建 alias_meta 指向 primary_meta 的 ALIAS 关系
  1295. session.run(
  1296. """
  1297. MATCH (alias_meta:DataMeta), (primary_meta:DataMeta)
  1298. WHERE id(alias_meta) = $alias_meta_id AND id(primary_meta) = $primary_meta_id
  1299. MERGE (alias_meta)-[:ALIAS]->(primary_meta)
  1300. """,
  1301. {
  1302. "alias_meta_id": int(alias_meta_id),
  1303. "primary_meta_id": int(primary_meta_id),
  1304. },
  1305. )
  1306. update_review_record_resolution(
  1307. record,
  1308. action="alias",
  1309. payload={
  1310. "primary_meta_id": int(primary_meta_id),
  1311. "alias_meta_id": int(alias_meta_id),
  1312. },
  1313. resolved_by=resolved_by,
  1314. notes=notes,
  1315. )
  1316. db.session.commit()
  1317. return jsonify(success(record.to_dict()))
  1318. if action == "create_new":
  1319. new_name_zh = (action_payload.get("new_name_zh") or "").strip()
  1320. if not bd_id:
  1321. return jsonify(
  1322. failed("记录缺少 business_domain_id,无法执行 create_new")
  1323. )
  1324. if not new_name_zh:
  1325. return jsonify(failed("payload.new_name_zh 不能为空"))
  1326. from app.core.meta_data import get_formatted_time
  1327. from app.services.neo4j_driver import neo4j_driver
  1328. with neo4j_driver.get_session() as session:
  1329. # 创建新 DataMeta(避免覆盖旧节点)
  1330. result = session.run(
  1331. """
  1332. CREATE (m:DataMeta {
  1333. name_zh: $name_zh,
  1334. name_en: $name_en,
  1335. data_type: $data_type,
  1336. create_time: $create_time,
  1337. status: true
  1338. })
  1339. RETURN m
  1340. """,
  1341. {
  1342. "name_zh": new_name_zh,
  1343. "name_en": (new_meta.get("name_en") or "").strip(),
  1344. "data_type": (new_meta.get("data_type") or "varchar(255)"),
  1345. "create_time": get_formatted_time(),
  1346. },
  1347. ).single()
  1348. if not result or not result.get("m"):
  1349. return jsonify(failed("创建新元数据失败"))
  1350. new_meta_id = int(result["m"].id)
  1351. session.run(
  1352. """
  1353. MATCH (n:BusinessDomain), (m:DataMeta)
  1354. WHERE id(n) = $domain_id AND id(m) = $meta_id
  1355. MERGE (n)-[:INCLUDES]->(m)
  1356. """,
  1357. {"domain_id": int(bd_id), "meta_id": new_meta_id},
  1358. )
  1359. update_review_record_resolution(
  1360. record,
  1361. action="create_new",
  1362. payload={"new_name_zh": new_name_zh},
  1363. resolved_by=resolved_by,
  1364. notes=notes,
  1365. )
  1366. db.session.commit()
  1367. return jsonify(success(record.to_dict()))
  1368. if action == "accept_change":
  1369. old_meta = record.old_meta or {}
  1370. meta_id = action_payload.get("meta_id") or old_meta.get("meta_id")
  1371. if not meta_id:
  1372. return jsonify(failed("无法确定需要更新的 meta_id"))
  1373. from app.core.meta_data import get_formatted_time
  1374. from app.services.neo4j_driver import neo4j_driver
  1375. before_snapshot = old_meta.get("snapshot") or {}
  1376. after_snapshot = new_meta
  1377. # 写入 Neo4j:更新 DataMeta 属性,并尝试同步标签集合
  1378. with neo4j_driver.get_session() as session:
  1379. name_zh_val = (
  1380. after_snapshot.get("name_zh")
  1381. or before_snapshot.get("name_zh")
  1382. or ""
  1383. ).strip()
  1384. name_en_val = (after_snapshot.get("name_en") or "").strip()
  1385. data_type_val = after_snapshot.get("data_type") or "varchar(255)"
  1386. session.run(
  1387. """
  1388. MATCH (m:DataMeta)
  1389. WHERE id(m) = $meta_id
  1390. SET m.name_zh = $name_zh,
  1391. m.name_en = $name_en,
  1392. m.data_type = $data_type,
  1393. m.updateTime = $update_time,
  1394. m.status = true
  1395. """,
  1396. {
  1397. "meta_id": int(meta_id),
  1398. "name_zh": name_zh_val,
  1399. "name_en": name_en_val,
  1400. "data_type": data_type_val,
  1401. "update_time": get_formatted_time(),
  1402. },
  1403. )
  1404. tag_ids = after_snapshot.get("tag_ids") or []
  1405. tag_ids = [int(t) for t in tag_ids if t is not None]
  1406. if tag_ids:
  1407. session.run(
  1408. """
  1409. MATCH (m:DataMeta)-[r:LABEL]->(:DataLabel)
  1410. WHERE id(m) = $meta_id
  1411. DELETE r
  1412. """,
  1413. {"meta_id": int(meta_id)},
  1414. )
  1415. session.run(
  1416. """
  1417. MATCH (m:DataMeta)
  1418. WHERE id(m) = $meta_id
  1419. WITH m
  1420. UNWIND $tag_ids AS tid
  1421. MATCH (t:DataLabel) WHERE id(t) = tid
  1422. MERGE (m)-[:LABEL]->(t)
  1423. """,
  1424. {"meta_id": int(meta_id), "tag_ids": tag_ids},
  1425. )
  1426. # 写入版本历史(PG)
  1427. history = MetadataVersionHistory()
  1428. history.meta_id = int(meta_id) if meta_id is not None else 0
  1429. history.change_source = "ddl"
  1430. history.before_snapshot = (
  1431. before_snapshot if before_snapshot is not None else {}
  1432. )
  1433. history.after_snapshot = (
  1434. after_snapshot if after_snapshot is not None else {}
  1435. )
  1436. history.created_by = resolved_by if resolved_by is not None else ""
  1437. db.session.add(history)
  1438. update_review_record_resolution(
  1439. record,
  1440. action="accept_change",
  1441. payload={"meta_id": int(meta_id)},
  1442. resolved_by=resolved_by,
  1443. notes=notes,
  1444. )
  1445. db.session.commit()
  1446. return jsonify(success(record.to_dict()))
  1447. if action in ("reject_change", "ignore"):
  1448. update_review_record_resolution(
  1449. record,
  1450. action=action,
  1451. payload=action_payload,
  1452. resolved_by=resolved_by,
  1453. notes=notes,
  1454. )
  1455. db.session.commit()
  1456. return jsonify(success(record.to_dict()))
  1457. return jsonify(failed(f"不支持的action: {action}"))
  1458. except Exception as e:
  1459. logger.error(f"处理审核记录失败: {str(e)}")
  1460. db.session.rollback()
  1461. return jsonify(failed("处理审核记录失败", error=str(e)))