routes.py 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517
  1. import io
  2. import logging
  3. from flask import current_app, jsonify, request, send_file
  4. from minio import Minio
  5. from minio.error import S3Error
  6. from sqlalchemy import or_
  7. from app import db
  8. from app.api.meta_data import bp
  9. from app.core.meta_data import (
  10. check_redundancy_for_add,
  11. check_redundancy_for_update,
  12. get_file_content,
  13. get_formatted_time,
  14. handle_id_unstructured,
  15. handle_txt_graph,
  16. meta_impact_graph,
  17. meta_kinship_graph,
  18. meta_list,
  19. normalize_tag_inputs,
  20. parse_text,
  21. solve_unstructured_data,
  22. text_resource_solve,
  23. )
  24. from app.core.system.auth import require_auth
  25. from app.models.metadata_review import (
  26. MetadataReviewRecord,
  27. MetadataVersionHistory,
  28. update_review_record_resolution,
  29. )
  30. from app.models.result import failed, success
  31. from app.services.neo4j_driver import neo4j_driver
  32. logger = logging.getLogger("app")
  33. def get_minio_client():
  34. """获取 MinIO 客户端实例"""
  35. return Minio(
  36. current_app.config["MINIO_HOST"],
  37. access_key=current_app.config["MINIO_USER"],
  38. secret_key=current_app.config["MINIO_PASSWORD"],
  39. secure=current_app.config["MINIO_SECURE"],
  40. )
  41. def get_minio_config():
  42. """获取 MinIO 配置"""
  43. return {
  44. "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
  45. "PREFIX": current_app.config["PREFIX"],
  46. "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
  47. }
  48. def allowed_file(filename):
  49. """检查文件扩展名是否允许"""
  50. if "." not in filename:
  51. return False
  52. ext = filename.rsplit(".", 1)[1].lower()
  53. return ext in get_minio_config()["ALLOWED_EXTENSIONS"]
  54. # 元数据列表
  55. @bp.route("/node/list", methods=["POST"])
  56. def meta_node_list():
  57. try:
  58. payload = request.get_json() or {}
  59. if not isinstance(payload, dict):
  60. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  61. def to_int(value, default):
  62. try:
  63. return int(value)
  64. except (TypeError, ValueError):
  65. return default
  66. # 分页参数
  67. page = to_int(payload.get("current", 1), 1)
  68. page_size = to_int(payload.get("size", 10), 10)
  69. # 过滤参数
  70. name_en_filter = payload.get("name_en") or None
  71. name_zh_filter = payload.get("name_zh") or None
  72. category_filter = payload.get("category") or None
  73. time_filter = payload.get("time") or None
  74. logger.info(
  75. f"[node/list] 过滤参数: name_zh={name_zh_filter}, "
  76. f"name_en={name_en_filter}, category={category_filter}"
  77. )
  78. tag_filter = payload.get("tag")
  79. if tag_filter is not None and not isinstance(tag_filter, list):
  80. tag_filter = None
  81. # 调用核心业务逻辑
  82. result, total_count = meta_list(
  83. page,
  84. page_size,
  85. "",
  86. name_en_filter,
  87. name_zh_filter,
  88. category_filter,
  89. time_filter,
  90. tag_filter,
  91. )
  92. # 返回结果
  93. return jsonify(
  94. success(
  95. {
  96. "records": result,
  97. "total": total_count,
  98. "size": page_size,
  99. "current": page,
  100. }
  101. )
  102. )
  103. except Exception as e:
  104. logger.error(f"获取元数据列表失败: {str(e)}")
  105. return jsonify(failed(str(e)))
  106. # 元数据图谱
  107. @bp.route("/node/graph", methods=["POST"])
  108. def meta_node_graph():
  109. try:
  110. if not request.json:
  111. return jsonify(failed("请求数据不能为空"))
  112. # 从请求中获取节点ID
  113. node_id = request.json.get("nodeId")
  114. if node_id is None:
  115. return jsonify(failed("nodeId 不能为空"))
  116. try:
  117. node_id_int = int(node_id)
  118. except (TypeError, ValueError):
  119. return jsonify(failed("nodeId 必须为整数"))
  120. # 调用核心业务逻辑
  121. graph = meta_kinship_graph(node_id_int)
  122. is_dict = isinstance(graph, dict)
  123. nodes = graph.get("nodes", []) if is_dict else []
  124. relationships = graph.get("relationships", []) if is_dict else []
  125. # 当前节点属性
  126. node_info = next(
  127. (n for n in nodes if n.get("id") == node_id_int),
  128. {},
  129. )
  130. # 关联节点(包含属性,便于前端展示名称等)
  131. related_nodes = [n for n in nodes if n.get("id") != node_id_int]
  132. payload = {
  133. "node": node_info,
  134. "related_nodes": related_nodes,
  135. "relationships": relationships,
  136. }
  137. return jsonify(success(payload))
  138. except Exception as e:
  139. logger.error(f"获取元数据图谱失败: {str(e)}")
  140. return jsonify(failed(str(e)))
  141. # 删除元数据
  142. @bp.route("/node/delete", methods=["POST"])
  143. def meta_node_delete():
  144. try:
  145. if not request.json:
  146. return jsonify(failed("请求数据不能为空"))
  147. # 从请求中获取节点ID
  148. node_id = request.json.get("id")
  149. # 删除节点逻辑
  150. with neo4j_driver.get_session() as session:
  151. cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n"
  152. session.run(cypher, node_id=int(node_id))
  153. # 返回结果
  154. return jsonify(success({}))
  155. except Exception as e:
  156. logger.error(f"删除元数据失败: {str(e)}")
  157. return jsonify(failed(str(e)))
  158. # 编辑元数据
  159. @bp.route("/node/edit", methods=["POST"])
  160. def meta_node_edit():
  161. try:
  162. if not request.json:
  163. return jsonify(failed("请求数据不能为空"))
  164. # 从请求中获取节点ID
  165. node_id = request.json.get("id")
  166. if not node_id:
  167. return jsonify(failed("节点ID不能为空"))
  168. # 获取节点
  169. with neo4j_driver.get_session() as session:
  170. # 查询节点信息
  171. cypher = """
  172. MATCH (n:DataMeta)
  173. WHERE id(n) = $node_id
  174. RETURN n
  175. """
  176. result = session.run(cypher, node_id=int(node_id))
  177. node = result.single()
  178. if not node or not node["n"]:
  179. return jsonify(failed("节点不存在"))
  180. # 获取节点数据
  181. node_data = dict(node["n"])
  182. node_data["id"] = node["n"].id
  183. # 获取标签信息
  184. tag_cypher = """
  185. MATCH (n:DataMeta)-[:LABEL]->(t:DataLabel)
  186. WHERE id(n) = $node_id
  187. RETURN t
  188. """
  189. tag_result = session.run(tag_cypher, node_id=int(node_id))
  190. tags: list[dict] = []
  191. for record in tag_result:
  192. tag_node = record.get("t")
  193. if tag_node:
  194. tags.append(
  195. {
  196. "id": tag_node.id,
  197. "name_zh": tag_node.get("name_zh", ""),
  198. "name_en": tag_node.get("name_en", ""),
  199. }
  200. )
  201. # 获取主数据信息
  202. master_data_cypher = """
  203. MATCH (n:DataMeta)-[:master_data]->(m:master_data)
  204. WHERE id(n) = $node_id
  205. RETURN m
  206. """
  207. master_data_result = session.run(master_data_cypher, node_id=int(node_id))
  208. master_data = master_data_result.single()
  209. # 构建返回数据
  210. response_data = [
  211. {
  212. "master_data": (
  213. master_data["m"].id
  214. if master_data and master_data["m"]
  215. else None
  216. ),
  217. "name_zh": node_data.get("name_zh", ""),
  218. "name_en": node_data.get("name_en", ""),
  219. "create_time": node_data.get("create_time", ""),
  220. "update_time": node_data.get("update_time", ""),
  221. "status": bool(node_data.get("status", True)),
  222. "data_type": node_data.get("data_type", ""),
  223. "tag": tags,
  224. "affiliation": node_data.get("affiliation"),
  225. "category": node_data.get("category"),
  226. "alias": node_data.get("alias"),
  227. "describe": node_data.get("describe"),
  228. }
  229. ]
  230. logger.info(f"成功获取元数据节点: ID={node_data['id']}")
  231. return jsonify(success(response_data))
  232. except Exception as e:
  233. logger.error(f"获取元数据节点失败: {str(e)}")
  234. return jsonify(failed(str(e)))
  235. # 增加元数据
  236. @bp.route("/check", methods=["GET"])
  237. def meta_check():
  238. """
  239. 检查元数据中文名是否已存在
  240. 请求参数:
  241. - name_zh: 元数据中文名(URL参数)
  242. 返回:
  243. - exists: true/false 表示是否存在
  244. """
  245. try:
  246. name_zh = request.args.get("name_zh")
  247. if not name_zh:
  248. return jsonify(failed("缺少name_zh参数"))
  249. # 查询数据库检查是否存在
  250. with neo4j_driver.get_session() as session:
  251. cypher = """
  252. MATCH (n:DataMeta {name_zh: $name_zh})
  253. RETURN count(n) > 0 as exists
  254. """
  255. result = session.run(cypher, name_zh=name_zh)
  256. record = result.single()
  257. if record:
  258. exists = record["exists"]
  259. logger.info(f"检查元数据 '{name_zh}': {'存在' if exists else '不存在'}")
  260. return jsonify(
  261. success({"exists": exists, "name_zh": name_zh}, "查询成功")
  262. )
  263. else:
  264. return jsonify(
  265. success({"exists": False, "name_zh": name_zh}, "查询成功")
  266. )
  267. except Exception as e:
  268. logger.error(f"检查元数据失败: {str(e)}")
  269. return jsonify(failed(f"检查失败: {str(e)}"))
  270. @bp.route("/node/add", methods=["POST"])
  271. def meta_node_add():
  272. """
  273. 新增元数据节点
  274. 在创建前会进行冗余检测:
  275. - 如果存在完全匹配的元数据,返回已存在的节点信息
  276. - 如果存在疑似重复的元数据,创建审核记录并返回提示
  277. - 如果无重复,正常创建新节点
  278. """
  279. try:
  280. if not request.json:
  281. return jsonify(failed("请求数据不能为空"))
  282. # 从请求中获取节点信息
  283. node_name_zh = request.json.get("name_zh")
  284. node_type = request.json.get("data_type")
  285. node_category = request.json.get("category")
  286. node_alias = request.json.get("alias")
  287. node_affiliation = request.json.get("affiliation")
  288. node_tag = request.json.get("tag")
  289. node_desc = request.json.get("describe")
  290. node_status = bool(request.json.get("status", True))
  291. node_name_en = request.json.get("name_en")
  292. # 是否强制创建(跳过冗余检测)
  293. force_create = bool(request.json.get("force_create", False))
  294. if not node_name_zh:
  295. return jsonify(failed("节点名称不能为空"))
  296. if not node_type:
  297. return jsonify(failed("节点类型不能为空"))
  298. # 统一处理标签ID
  299. tag_ids = normalize_tag_inputs(node_tag)
  300. # ========== 冗余检测 ==========
  301. if not force_create:
  302. redundancy_result = check_redundancy_for_add(
  303. name_zh=node_name_zh,
  304. name_en=node_name_en or "",
  305. data_type=node_type,
  306. tag_ids=tag_ids,
  307. )
  308. # 存在完全匹配的元数据
  309. if redundancy_result["has_exact_match"]:
  310. exact_id = redundancy_result["exact_match_id"]
  311. logger.info(
  312. f"元数据已存在(完全匹配): name_zh={node_name_zh}, "
  313. f"existing_id={exact_id}"
  314. )
  315. # 返回已存在的节点信息
  316. with neo4j_driver.get_session() as session:
  317. existing = session.run(
  318. "MATCH (n:DataMeta) WHERE id(n) = $id RETURN n",
  319. {"id": exact_id},
  320. ).single()
  321. if existing and existing["n"]:
  322. existing_data = dict(existing["n"])
  323. existing_data["id"] = existing["n"].id
  324. return jsonify(
  325. success(existing_data, message="元数据已存在,返回已有节点")
  326. )
  327. return jsonify(failed(f"元数据已存在(ID={exact_id}),请勿重复创建"))
  328. # 存在疑似重复的元数据,已创建审核记录
  329. if redundancy_result["review_created"]:
  330. candidates = redundancy_result["candidates"]
  331. candidate_names = [c.get("name_zh", "") for c in candidates[:3]]
  332. logger.info(
  333. f"发现疑似重复元数据: name_zh={node_name_zh}, "
  334. f"candidates={candidate_names}"
  335. )
  336. return jsonify(
  337. failed(
  338. f"发现疑似重复元数据,已创建审核记录。"
  339. f"疑似重复: {', '.join(candidate_names)}。"
  340. f"请前往元数据审核页面处理,或使用 force_create=true 强制创建。"
  341. )
  342. )
  343. # ========== 创建节点 ==========
  344. with neo4j_driver.get_session() as session:
  345. cypher = """
  346. MERGE (n:DataMeta {name_zh: $name_zh})
  347. ON CREATE SET n.name_en = $name_en,
  348. n.data_type = $data_type,
  349. n.category = $category,
  350. n.alias = $alias,
  351. n.affiliation = $affiliation,
  352. n.describe = $describe,
  353. n.create_time = $create_time,
  354. n.updateTime = $update_time,
  355. n.status = $status,
  356. n.name_en = $name_en
  357. ON MATCH SET n.data_type = $data_type,
  358. n.category = $category,
  359. n.alias = $alias,
  360. n.affiliation = $affiliation,
  361. n.describe = $describe,
  362. n.updateTime = $update_time,
  363. n.status = $status,
  364. n.name_en = $name_en
  365. RETURN n
  366. """
  367. create_time = update_time = get_formatted_time()
  368. result = session.run(
  369. cypher,
  370. name_zh=node_name_zh,
  371. data_type=node_type,
  372. category=node_category,
  373. alias=node_alias,
  374. affiliation=node_affiliation,
  375. describe=node_desc,
  376. create_time=create_time,
  377. update_time=update_time,
  378. status=node_status,
  379. name_en=node_name_en,
  380. )
  381. node = result.single()
  382. if node and node["n"]:
  383. node_data = dict(node["n"])
  384. node_data["id"] = node["n"].id
  385. # 如果提供了标签列表,创建标签关系
  386. tag_nodes = []
  387. if tag_ids:
  388. for tag_id in tag_ids:
  389. # 获取标签节点信息
  390. tag_fetch = session.run(
  391. "MATCH (t:DataLabel) WHERE id(t) = $tag_id RETURN t",
  392. tag_id=tag_id,
  393. ).single()
  394. if not tag_fetch or not tag_fetch.get("t"):
  395. logger.warning(f"未找到标签节点: {tag_id}")
  396. continue
  397. tag_node = tag_fetch["t"]
  398. tag_nodes.append(
  399. {
  400. "id": tag_node.id,
  401. "name_zh": tag_node.get("name_zh", ""),
  402. "name_en": tag_node.get("name_en", ""),
  403. }
  404. )
  405. tag_cypher = """
  406. MATCH (n:DataMeta), (t:DataLabel)
  407. WHERE id(n) = $node_id AND id(t) = $tag_id
  408. MERGE (n)-[r:LABEL]->(t)
  409. RETURN r
  410. """
  411. session.run(tag_cypher, node_id=node["n"].id, tag_id=tag_id)
  412. node_data["tag"] = tag_nodes
  413. logger.info(
  414. f"成功创建或更新元数据节点: "
  415. f"ID={node_data['id']}, name={node_name_zh}"
  416. )
  417. return jsonify(success(node_data))
  418. else:
  419. logger.error(f"创建元数据节点失败: {node_name_zh}")
  420. return jsonify(failed("创建元数据节点失败"))
  421. except Exception as e:
  422. logger.error(f"添加元数据失败: {str(e)}")
  423. return jsonify(failed(str(e)))
  424. # 搜索元数据
  425. @bp.route("/search", methods=["GET"])
  426. def search_metadata_route():
  427. try:
  428. keyword = request.args.get("keyword", "")
  429. if not keyword:
  430. return jsonify(success([]))
  431. cypher = """
  432. MATCH (n:DataMeta)
  433. WHERE n.name_zh CONTAINS $keyword
  434. RETURN n LIMIT 100
  435. """
  436. with neo4j_driver.get_session() as session:
  437. result = session.run(cypher, keyword=keyword)
  438. metadata_list = [dict(record["n"]) for record in result]
  439. return jsonify(success(metadata_list))
  440. except Exception as e:
  441. logger.error(f"搜索元数据失败: {str(e)}")
  442. return jsonify(failed(str(e)))
  443. # 全文检索查询
  444. @bp.route("/full/text/query", methods=["POST"])
  445. def full_text_query():
  446. try:
  447. if not request.json:
  448. return jsonify(failed("请求数据不能为空"))
  449. # 获取查询条件
  450. search_term = request.json.get("query", "")
  451. if not search_term:
  452. return jsonify(failed("查询条件不能为空"))
  453. # 执行Neo4j全文索引查询
  454. with neo4j_driver.get_session() as session:
  455. cypher = """
  456. CALL db.index.fulltext.queryNodes("DataMetaFulltext", $term)
  457. YIELD node, score
  458. RETURN node, score
  459. ORDER BY score DESC
  460. LIMIT 20
  461. """
  462. result = session.run(cypher, term=search_term)
  463. # 处理查询结果
  464. search_results = []
  465. for record in result:
  466. node_data = dict(record["node"])
  467. node_data["id"] = record["node"].id
  468. node_data["score"] = record["score"]
  469. search_results.append(node_data)
  470. return jsonify(success(search_results))
  471. except Exception as e:
  472. logger.error(f"全文检索查询失败: {str(e)}")
  473. return jsonify(failed(str(e)))
  474. # 非结构化文本查询
  475. @bp.route("/unstructure/text/query", methods=["POST"])
  476. def unstructure_text_query():
  477. try:
  478. if not request.json:
  479. return jsonify(failed("请求数据不能为空"))
  480. # 获取查询参数
  481. node_id = request.json.get("id")
  482. if not node_id:
  483. return jsonify(failed("节点ID不能为空"))
  484. # 获取节点信息
  485. node_data = handle_id_unstructured(node_id)
  486. if not node_data:
  487. return jsonify(failed("节点不存在"))
  488. # 获取对象路径
  489. object_name = node_data.get("url")
  490. if not object_name:
  491. return jsonify(failed("文档路径不存在"))
  492. # 获取 MinIO 配置
  493. minio_client = get_minio_client()
  494. config = get_minio_config()
  495. bucket_name = config["MINIO_BUCKET"]
  496. # 从MinIO获取文件内容
  497. file_content = get_file_content(minio_client, bucket_name, object_name)
  498. # 解析文本内容
  499. parsed_data = parse_text(file_content)
  500. # 返回结果
  501. result = {
  502. "node": node_data,
  503. "parsed": parsed_data,
  504. "content": (
  505. file_content[:1000] + "..."
  506. if len(file_content) > 1000
  507. else file_content
  508. ),
  509. }
  510. return jsonify(success(result))
  511. except Exception as e:
  512. logger.error(f"非结构化文本查询失败: {str(e)}")
  513. return jsonify(failed(str(e)))
  514. # 文件上传
  515. @bp.route("/resource/upload", methods=["POST"])
  516. def upload_file():
  517. try:
  518. # 检查请求中是否有文件
  519. if "file" not in request.files:
  520. return jsonify(failed("没有找到上传的文件"))
  521. file = request.files["file"]
  522. # 检查文件名
  523. if not file.filename:
  524. return jsonify(failed("未选择文件"))
  525. # 保存文件名到本地变量(确保类型安全)
  526. filename = file.filename
  527. # 检查文件类型
  528. if not allowed_file(filename):
  529. return jsonify(failed("不支持的文件类型"))
  530. # 获取 MinIO 配置
  531. minio_client = get_minio_client()
  532. config = get_minio_config()
  533. # 上传到MinIO
  534. file_content = file.read()
  535. file_size = len(file_content)
  536. file_type = filename.rsplit(".", 1)[1].lower()
  537. # 提取文件名(不包含扩展名)
  538. filename_without_ext = filename.rsplit(".", 1)[0]
  539. # 生成紧凑的时间戳 (yyyyMMddHHmmss)
  540. import time
  541. timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
  542. # 生成唯一文件名
  543. object_name = (
  544. f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}"
  545. )
  546. # 上传文件
  547. minio_client.put_object(
  548. config["MINIO_BUCKET"],
  549. object_name,
  550. io.BytesIO(file_content),
  551. file_size,
  552. content_type=f"application/{file_type}",
  553. )
  554. # 返回结果
  555. return jsonify(
  556. success(
  557. {
  558. "filename": file.filename,
  559. "size": file_size,
  560. "type": file_type,
  561. "url": object_name,
  562. }
  563. )
  564. )
  565. except Exception as e:
  566. logger.error(f"文件上传失败: {str(e)}")
  567. return jsonify(failed(str(e)))
  568. # 文件下载显示
  569. @bp.route("/resource/display", methods=["POST"])
  570. def upload_file_display():
  571. response = None
  572. try:
  573. if not request.json:
  574. return jsonify(failed("请求数据不能为空"))
  575. object_name = request.json.get("url")
  576. if not object_name:
  577. return jsonify(failed("文件路径不能为空"))
  578. # 获取 MinIO 配置
  579. minio_client = get_minio_client()
  580. config = get_minio_config()
  581. # 获取文件内容
  582. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  583. file_data = response.read()
  584. # 获取文件名
  585. file_name = object_name.split("/")[-1]
  586. # 确定文件类型
  587. file_extension = file_name.split(".")[-1].lower()
  588. # 为不同文件类型设置合适的MIME类型
  589. mime_types = {
  590. "pdf": "application/pdf",
  591. "doc": "application/msword",
  592. "docx": (
  593. "application/vnd.openxmlformats-"
  594. "officedocument.wordprocessingml.document"
  595. ),
  596. "xls": "application/vnd.ms-excel",
  597. "xlsx": (
  598. "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  599. ),
  600. "txt": "text/plain",
  601. "csv": "text/csv",
  602. }
  603. content_type = mime_types.get(file_extension, "application/octet-stream")
  604. # 返回结果
  605. return jsonify(
  606. success(
  607. {
  608. "filename": file_name,
  609. "type": file_extension,
  610. "contentType": content_type,
  611. "size": len(file_data),
  612. "url": f"/api/meta/resource/download?url={object_name}",
  613. }
  614. )
  615. )
  616. except S3Error as e:
  617. logger.error(f"MinIO操作失败: {str(e)}")
  618. return jsonify(failed(f"文件访问失败: {str(e)}"))
  619. except Exception as e:
  620. logger.error(f"文件显示信息获取失败: {str(e)}")
  621. return jsonify(failed(str(e)))
  622. finally:
  623. if response:
  624. response.close()
  625. response.release_conn()
  626. # 文件下载接口
  627. @bp.route("/resource/download", methods=["GET"])
  628. def download_file():
  629. response = None
  630. try:
  631. object_name = request.args.get("url")
  632. if not object_name:
  633. return jsonify(failed("文件路径不能为空"))
  634. # URL解码,处理特殊字符
  635. import urllib.parse
  636. object_name = urllib.parse.unquote(object_name)
  637. # 记录下载请求信息,便于调试
  638. logger.info(f"下载文件请求: {object_name}")
  639. # 获取 MinIO 配置
  640. minio_client = get_minio_client()
  641. config = get_minio_config()
  642. # 获取文件
  643. try:
  644. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  645. file_data = response.read()
  646. except S3Error as e:
  647. logger.error(f"MinIO获取文件失败: {str(e)}")
  648. return jsonify(failed(f"文件获取失败: {str(e)}"))
  649. # 获取文件名,并处理特殊字符
  650. file_name = object_name.split("/")[-1]
  651. # 直接从内存返回文件,不创建临时文件
  652. file_stream = io.BytesIO(file_data)
  653. # 返回文件
  654. return send_file(
  655. file_stream,
  656. as_attachment=True,
  657. download_name=file_name,
  658. mimetype="application/octet-stream",
  659. )
  660. except Exception as e:
  661. logger.error(f"文件下载失败: {str(e)}")
  662. return jsonify(failed(str(e)))
  663. finally:
  664. if response:
  665. response.close()
  666. response.release_conn()
  667. # 文本资源翻译
  668. @bp.route("/resource/translate", methods=["POST"])
  669. def text_resource_translate():
  670. try:
  671. if not request.json:
  672. return jsonify(failed("请求数据不能为空"))
  673. # 获取参数
  674. name_zh = request.json.get("name_zh", "")
  675. keyword = request.json.get("keyword", "")
  676. if not name_zh:
  677. return jsonify(failed("名称不能为空"))
  678. # 调用资源处理逻辑
  679. result = text_resource_solve(None, name_zh, keyword)
  680. return jsonify(success(result))
  681. except Exception as e:
  682. logger.error(f"文本资源翻译失败: {str(e)}")
  683. return jsonify(failed(str(e)))
  684. # 创建文本资源节点
  685. @bp.route("/resource/node", methods=["POST"])
  686. def text_resource_node():
  687. try:
  688. if not request.json:
  689. return jsonify(failed("请求数据不能为空"))
  690. # 获取参数
  691. name_zh = request.json.get("name_zh", "")
  692. name_en = request.json.get("name_en", "")
  693. keywords = request.json.get("keywords", [])
  694. keywords_en = request.json.get("keywords_en", [])
  695. object_name = request.json.get("url", "")
  696. if not name_zh or not name_en or not object_name:
  697. return jsonify(failed("参数不完整"))
  698. # 创建节点
  699. with neo4j_driver.get_session() as session:
  700. # 创建资源节点
  701. cypher = """
  702. CREATE (n:DataMeta {
  703. name_zh: $name_zh,
  704. name_en: $name_en,
  705. keywords: $keywords,
  706. keywords_en: $keywords_en,
  707. url: $object_name,
  708. create_time: $create_time,
  709. updateTime: $update_time
  710. })
  711. RETURN n
  712. """
  713. create_time = update_time = get_formatted_time()
  714. result = session.run(
  715. cypher,
  716. name_zh=name_zh,
  717. name_en=name_en,
  718. keywords=keywords,
  719. keywords_en=keywords_en,
  720. object_name=object_name,
  721. create_time=create_time,
  722. update_time=update_time,
  723. )
  724. record = result.single()
  725. if not record:
  726. return jsonify(failed("创建节点失败"))
  727. node = record["n"]
  728. # 为每个关键词创建标签节点并关联
  729. for i, keyword in enumerate(keywords):
  730. if keyword:
  731. # 创建标签节点
  732. tag_cypher = """
  733. MERGE (t:Tag {name_zh: $name_zh})
  734. ON CREATE SET t.name_en = $name_en,
  735. t.create_time = $create_time
  736. RETURN t
  737. """
  738. tag_result = session.run(
  739. tag_cypher,
  740. name_zh=keyword,
  741. name_en=keywords_en[i] if i < len(keywords_en) else "",
  742. create_time=create_time,
  743. )
  744. tag_record = tag_result.single()
  745. if not tag_record:
  746. continue
  747. tag_node = tag_record["t"]
  748. # 创建关系
  749. rel_cypher = """
  750. MATCH (n), (t)
  751. WHERE id(n) = $node_id AND id(t) = $tag_id
  752. CREATE (n)-[r:HAS_TAG]->(t)
  753. RETURN r
  754. """
  755. session.run(rel_cypher, node_id=node.id, tag_id=tag_node.id)
  756. # 返回创建的节点
  757. return jsonify(success(dict(node)))
  758. except Exception as e:
  759. logger.error(f"创建文本资源节点失败: {str(e)}")
  760. return jsonify(failed(str(e)))
  761. # 处理非结构化数据
  762. @bp.route("/unstructured/process", methods=["POST"])
  763. def processing_unstructured_data():
  764. try:
  765. if not request.json:
  766. return jsonify(failed("请求数据不能为空"))
  767. # 获取参数
  768. node_id = request.json.get("id")
  769. if not node_id:
  770. return jsonify(failed("节点ID不能为空"))
  771. # 获取 MinIO 配置
  772. minio_client = get_minio_client()
  773. config = get_minio_config()
  774. prefix = config["PREFIX"]
  775. # 调用处理逻辑
  776. result = solve_unstructured_data(node_id, minio_client, prefix)
  777. if result:
  778. return jsonify(success({"message": "处理成功"}))
  779. else:
  780. return jsonify(failed("处理失败"))
  781. except Exception as e:
  782. logger.error(f"处理非结构化数据失败: {str(e)}")
  783. return jsonify(failed(str(e)))
  784. # 创建文本图谱
  785. @bp.route("/text/graph", methods=["POST"])
  786. def create_text_graph():
  787. try:
  788. if not request.json:
  789. return jsonify(failed("请求数据不能为空"))
  790. # 获取参数
  791. node_id = request.json.get("id")
  792. entity_zh = request.json.get("entity_zh")
  793. entity_en = request.json.get("entity_en")
  794. if not all([node_id, entity_zh, entity_en]):
  795. return jsonify(failed("参数不完整"))
  796. # 创建图谱
  797. result = handle_txt_graph(node_id, entity_zh, entity_en)
  798. if result:
  799. return jsonify(success({"message": "图谱创建成功"}))
  800. else:
  801. return jsonify(failed("图谱创建失败"))
  802. except Exception as e:
  803. logger.error(f"创建文本图谱失败: {str(e)}")
  804. return jsonify(failed(str(e)))
  805. @bp.route("/config", methods=["GET"])
  806. @require_auth
  807. def get_meta_config():
  808. """获取元数据配置信息"""
  809. config = get_minio_config()
  810. return jsonify(
  811. {
  812. "bucket_name": config["MINIO_BUCKET"],
  813. "prefix": config["PREFIX"],
  814. "allowed_extensions": list(config["ALLOWED_EXTENSIONS"]),
  815. }
  816. )
  817. # 更新元数据
  818. @bp.route("/node/update", methods=["POST"])
  819. def meta_node_update():
  820. """
  821. 更新元数据节点
  822. 在更新前会进行冗余检测(如果修改了 name_zh/name_en):
  823. - 如果更新后的名称与其他节点完全匹配,返回错误
  824. - 如果存在疑似重复的元数据,创建审核记录并返回提示
  825. - 如果无重复,正常更新节点
  826. """
  827. try:
  828. if not request.json:
  829. return jsonify(failed("请求数据不能为空"))
  830. # 从请求中获取节点ID和更新数据
  831. node_id = request.json.get("id")
  832. if not node_id:
  833. return jsonify(failed("节点ID不能为空"))
  834. # 验证并转换节点ID为整数
  835. try:
  836. node_id = int(node_id)
  837. except (ValueError, TypeError):
  838. return jsonify(failed(f"节点ID必须为整数,当前值: {node_id}"))
  839. # 是否强制更新(跳过冗余检测)
  840. force_update = bool(request.json.get("force_update", False))
  841. # 更新节点
  842. with neo4j_driver.get_session() as session:
  843. # 检查节点是否存在并获取当前值
  844. check_cypher = """
  845. MATCH (n:DataMeta)
  846. WHERE id(n) = $node_id
  847. RETURN n
  848. """
  849. result = session.run(check_cypher, node_id=node_id)
  850. node = result.single()
  851. if not node or not node["n"]:
  852. return jsonify(failed("节点不存在"))
  853. # 获取当前节点属性
  854. current_node = dict(node["n"])
  855. # 处理每个可能的更新字段
  856. fields_to_update = {
  857. "name_zh": request.json.get("name_zh"),
  858. "category": request.json.get("category"),
  859. "alias": request.json.get("alias"),
  860. "affiliation": request.json.get("affiliation"),
  861. "data_type": request.json.get("data_type"),
  862. "describe": request.json.get("describe"),
  863. "status": request.json.get("status"),
  864. "name_en": request.json.get("name_en"),
  865. }
  866. # 计算更新后的值(用于冗余检测)
  867. updated_name_zh = (
  868. fields_to_update["name_zh"]
  869. if fields_to_update["name_zh"] is not None
  870. else current_node.get("name_zh", "")
  871. )
  872. updated_name_en = (
  873. fields_to_update["name_en"]
  874. if fields_to_update["name_en"] is not None
  875. else current_node.get("name_en", "")
  876. )
  877. updated_data_type = (
  878. fields_to_update["data_type"]
  879. if fields_to_update["data_type"] is not None
  880. else current_node.get("data_type", "varchar(255)")
  881. )
  882. # 处理标签
  883. tag = request.json.get("tag")
  884. tag_ids = normalize_tag_inputs(tag) if tag is not None else []
  885. # ========== 冗余检测(仅当修改了 name_zh 或 name_en 时)==========
  886. name_changed = (
  887. fields_to_update["name_zh"] is not None
  888. and fields_to_update["name_zh"] != current_node.get("name_zh")
  889. ) or (
  890. fields_to_update["name_en"] is not None
  891. and fields_to_update["name_en"] != current_node.get("name_en")
  892. )
  893. if name_changed and not force_update:
  894. redundancy_result = check_redundancy_for_update(
  895. node_id=node_id,
  896. name_zh=updated_name_zh,
  897. name_en=updated_name_en,
  898. data_type=updated_data_type,
  899. tag_ids=tag_ids,
  900. )
  901. # 存在完全匹配的其他元数据
  902. if redundancy_result["has_exact_match"]:
  903. exact_id = redundancy_result["exact_match_id"]
  904. logger.warning(
  905. f"更新后元数据与其他节点完全匹配: "
  906. f"node_id={node_id}, existing_id={exact_id}"
  907. )
  908. return jsonify(
  909. failed(
  910. f"更新后的元数据与已有节点(ID={exact_id})完全相同,"
  911. f"请检查是否需要合并或修改名称。"
  912. )
  913. )
  914. # 存在疑似重复的元数据,已创建审核记录
  915. if redundancy_result["review_created"]:
  916. candidates = redundancy_result["candidates"]
  917. candidate_names = [c.get("name_zh", "") for c in candidates[:3]]
  918. logger.info(
  919. f"更新元数据发现疑似重复: node_id={node_id}, "
  920. f"candidates={candidate_names}"
  921. )
  922. return jsonify(
  923. failed(
  924. f"发现疑似重复元数据,已创建审核记录。"
  925. f"疑似重复: {', '.join(candidate_names)}。"
  926. f"请前往元数据审核页面处理,或使用 force_update=true 强制更新。"
  927. )
  928. )
  929. # ========== 执行更新 ==========
  930. # 构建更新语句,只更新提供的属性
  931. update_cypher = """
  932. MATCH (n:DataMeta)
  933. WHERE id(n) = $node_id
  934. SET n.updateTime = $update_time
  935. """
  936. # 准备更新参数
  937. update_params = {"node_id": node_id, "update_time": get_formatted_time()}
  938. # 只更新提供了新值的字段
  939. for field, new_value in fields_to_update.items():
  940. if new_value is not None:
  941. # 特殊处理 data_type 字段映射
  942. if field == "data_type":
  943. update_cypher += f", n.data_type = ${field}\n"
  944. else:
  945. update_cypher += f", n.{field} = ${field}\n"
  946. update_params[field] = new_value
  947. update_cypher += "RETURN n"
  948. result = session.run(
  949. update_cypher, # type: ignore[arg-type]
  950. **update_params,
  951. )
  952. updated_node = result.single()
  953. if updated_node and updated_node["n"]:
  954. node_data = dict(updated_node["n"])
  955. node_data["id"] = updated_node["n"].id
  956. # 如果更新了标签,处理标签关系(支持列表)
  957. if tag is not None:
  958. # 先删除现有标签关系
  959. delete_tag_cypher = """
  960. MATCH (n:DataMeta)-[r:LABEL]->(t:DataLabel)
  961. WHERE id(n) = $node_id
  962. DELETE r
  963. """
  964. session.run(delete_tag_cypher, node_id=node_id)
  965. for tag_id in tag_ids:
  966. create_tag_cypher = """
  967. MATCH (n:DataMeta), (t:DataLabel)
  968. WHERE id(n) = $node_id AND id(t) = $tag_id
  969. MERGE (n)-[r:LABEL]->(t)
  970. RETURN r
  971. """
  972. session.run(create_tag_cypher, node_id=node_id, tag_id=tag_id)
  973. logger.info(f"成功更新元数据节点: ID={node_data['id']}")
  974. return jsonify(success(node_data))
  975. else:
  976. logger.error(f"更新元数据节点失败: ID={node_id}")
  977. return jsonify(failed("更新元数据节点失败"))
  978. except Exception as e:
  979. logger.error(f"更新元数据失败: {str(e)}")
  980. return jsonify(failed(str(e)))
  981. @bp.route("/review/list", methods=["POST"])
  982. def metadata_review_list():
  983. """
  984. 审核记录列表:疑似冗余/变动
  985. Body:
  986. - current: 页码(默认1)
  987. - size: 每页数量(默认10)
  988. - record_type: redundancy|change(可选)
  989. - status: pending|resolved|ignored(可选)
  990. - business_domain_id: 业务领域ID(可选)
  991. - keyword: 关键字(可选,匹配 new_meta.name_zh/name_en)
  992. """
  993. try:
  994. payload = request.get_json() or {}
  995. if not isinstance(payload, dict):
  996. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  997. def to_int(value, default):
  998. try:
  999. return int(value)
  1000. except (TypeError, ValueError):
  1001. return default
  1002. page = to_int(payload.get("current", 1), 1)
  1003. page_size = to_int(payload.get("size", 10), 10)
  1004. record_type = payload.get("record_type")
  1005. status = payload.get("status")
  1006. business_domain_id = payload.get("business_domain_id")
  1007. keyword = (payload.get("keyword") or "").strip()
  1008. query = MetadataReviewRecord.query
  1009. if record_type:
  1010. query = query.filter(MetadataReviewRecord.record_type == record_type)
  1011. if status:
  1012. query = query.filter(MetadataReviewRecord.status == status)
  1013. if business_domain_id is not None and str(business_domain_id).strip() != "":
  1014. bd_id_int = int(business_domain_id)
  1015. query = query.filter(MetadataReviewRecord.business_domain_id == bd_id_int)
  1016. if keyword:
  1017. # 兼容:使用JSONB ->> 提取进行模糊匹配
  1018. name_zh_col = MetadataReviewRecord.new_meta["name_zh"].astext
  1019. name_en_col = MetadataReviewRecord.new_meta["name_en"].astext
  1020. query = query.filter(
  1021. or_(
  1022. name_zh_col.contains(keyword),
  1023. name_en_col.contains(keyword),
  1024. )
  1025. )
  1026. total = query.count()
  1027. records = (
  1028. query.order_by(MetadataReviewRecord.created_at.desc())
  1029. .offset((page - 1) * page_size)
  1030. .limit(page_size)
  1031. .all()
  1032. )
  1033. return jsonify(
  1034. success(
  1035. {
  1036. "records": [r.to_dict() for r in records],
  1037. "total": total,
  1038. "size": page_size,
  1039. "current": page,
  1040. }
  1041. )
  1042. )
  1043. except Exception as e:
  1044. logger.error(f"审核记录列表查询失败: {str(e)}")
  1045. return jsonify(failed("审核记录列表查询失败", error=str(e)))
  1046. @bp.route("/review/detail", methods=["GET"])
  1047. def metadata_review_detail():
  1048. """
  1049. 审核记录详情
  1050. Query:
  1051. - id: 记录ID
  1052. """
  1053. try:
  1054. record_id = request.args.get("id")
  1055. if not record_id:
  1056. return jsonify(failed("缺少id参数"))
  1057. record = MetadataReviewRecord.query.get(int(record_id))
  1058. if not record:
  1059. return jsonify(failed("记录不存在"))
  1060. data = record.to_dict()
  1061. # change 场景:返回受影响元数据的影响关系图谱(若有 meta_id)
  1062. impact_graph = None
  1063. if record.record_type == "change":
  1064. old_meta = record.old_meta or {}
  1065. meta_id = old_meta.get("meta_id")
  1066. if meta_id is not None and str(meta_id).strip() != "":
  1067. try:
  1068. impact_graph = meta_impact_graph(int(meta_id))
  1069. except Exception as e:
  1070. logger.warning(f"获取影响图谱失败: {e}")
  1071. data["impact_graph"] = impact_graph
  1072. return jsonify(success(data))
  1073. except Exception as e:
  1074. logger.error(f"审核记录详情查询失败: {str(e)}")
  1075. return jsonify(failed("审核记录详情查询失败", error=str(e)))
  1076. @bp.route("/review/resolve", methods=["POST"])
  1077. def metadata_review_resolve():
  1078. """
  1079. 处理审核记录
  1080. Body:
  1081. - id: 记录ID
  1082. - action: alias | create_new | accept_change | reject_change | ignore
  1083. - payload: 动作参数(可选)
  1084. - resolved_by: 处理人(可选)
  1085. - notes: 备注(可选)
  1086. action=alias:
  1087. payload: { candidate_meta_id: int }
  1088. 行为:为业务领域建立 INCLUDES 到 candidate_meta_id,关系上写入 alias_name_zh/alias_name_en
  1089. action=create_new:
  1090. payload: { new_name_zh: str }
  1091. 行为:创建新的 DataMeta(中文名区分)并关联业务领域
  1092. action=accept_change:
  1093. payload: { meta_id?: int }
  1094. 行为:把 new_meta 写回目标 DataMeta,并写入 metadata_version_history(PG)
  1095. action=reject_change/ignore:
  1096. 行为:仅更新审核记录状态
  1097. """
  1098. try:
  1099. payload = request.get_json() or {}
  1100. if not isinstance(payload, dict):
  1101. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  1102. record_id = payload.get("id")
  1103. action = payload.get("action")
  1104. action_payload = payload.get("payload") or {}
  1105. resolved_by = payload.get("resolved_by")
  1106. notes = payload.get("notes")
  1107. if not record_id:
  1108. return jsonify(failed("id 不能为空"))
  1109. if not action:
  1110. return jsonify(failed("action 不能为空"))
  1111. record = MetadataReviewRecord.query.get(int(record_id))
  1112. if not record:
  1113. return jsonify(failed("记录不存在"))
  1114. if record.status != "pending":
  1115. return jsonify(failed("记录已处理,无法重复处理"))
  1116. # 需要业务领域上下文的动作
  1117. bd_id = record.business_domain_id
  1118. new_meta = record.new_meta or {}
  1119. if action == "alias":
  1120. candidate_meta_id = action_payload.get("candidate_meta_id")
  1121. if not bd_id:
  1122. return jsonify(failed("记录缺少 business_domain_id,无法执行 alias"))
  1123. if not candidate_meta_id:
  1124. return jsonify(failed("payload.candidate_meta_id 不能为空"))
  1125. # 写入 Neo4j:建立 INCLUDES,并记录别名
  1126. from app.services.neo4j_driver import neo4j_driver
  1127. alias_name_zh = (new_meta.get("name_zh") or "").strip()
  1128. alias_name_en = (new_meta.get("name_en") or "").strip()
  1129. with neo4j_driver.get_session() as session:
  1130. session.run(
  1131. """
  1132. MATCH (n:BusinessDomain), (m:DataMeta)
  1133. WHERE id(n) = $domain_id AND id(m) = $meta_id
  1134. MERGE (n)-[r:INCLUDES]->(m)
  1135. SET r.alias_name_zh = $alias_name_zh,
  1136. r.alias_name_en = $alias_name_en
  1137. """,
  1138. {
  1139. "domain_id": int(bd_id),
  1140. "meta_id": int(candidate_meta_id),
  1141. "alias_name_zh": alias_name_zh,
  1142. "alias_name_en": alias_name_en,
  1143. },
  1144. )
  1145. update_review_record_resolution(
  1146. record,
  1147. action="alias",
  1148. payload={"candidate_meta_id": int(candidate_meta_id)},
  1149. resolved_by=resolved_by,
  1150. notes=notes,
  1151. )
  1152. db.session.commit()
  1153. return jsonify(success(record.to_dict()))
  1154. if action == "create_new":
  1155. new_name_zh = (action_payload.get("new_name_zh") or "").strip()
  1156. if not bd_id:
  1157. return jsonify(
  1158. failed("记录缺少 business_domain_id,无法执行 create_new")
  1159. )
  1160. if not new_name_zh:
  1161. return jsonify(failed("payload.new_name_zh 不能为空"))
  1162. from app.core.meta_data import get_formatted_time
  1163. from app.services.neo4j_driver import neo4j_driver
  1164. with neo4j_driver.get_session() as session:
  1165. # 创建新 DataMeta(避免覆盖旧节点)
  1166. result = session.run(
  1167. """
  1168. CREATE (m:DataMeta {
  1169. name_zh: $name_zh,
  1170. name_en: $name_en,
  1171. data_type: $data_type,
  1172. create_time: $create_time,
  1173. status: true
  1174. })
  1175. RETURN m
  1176. """,
  1177. {
  1178. "name_zh": new_name_zh,
  1179. "name_en": (new_meta.get("name_en") or "").strip(),
  1180. "data_type": (new_meta.get("data_type") or "varchar(255)"),
  1181. "create_time": get_formatted_time(),
  1182. },
  1183. ).single()
  1184. if not result or not result.get("m"):
  1185. return jsonify(failed("创建新元数据失败"))
  1186. new_meta_id = int(result["m"].id)
  1187. session.run(
  1188. """
  1189. MATCH (n:BusinessDomain), (m:DataMeta)
  1190. WHERE id(n) = $domain_id AND id(m) = $meta_id
  1191. MERGE (n)-[:INCLUDES]->(m)
  1192. """,
  1193. {"domain_id": int(bd_id), "meta_id": new_meta_id},
  1194. )
  1195. update_review_record_resolution(
  1196. record,
  1197. action="create_new",
  1198. payload={"new_name_zh": new_name_zh},
  1199. resolved_by=resolved_by,
  1200. notes=notes,
  1201. )
  1202. db.session.commit()
  1203. return jsonify(success(record.to_dict()))
  1204. if action == "accept_change":
  1205. old_meta = record.old_meta or {}
  1206. meta_id = action_payload.get("meta_id") or old_meta.get("meta_id")
  1207. if not meta_id:
  1208. return jsonify(failed("无法确定需要更新的 meta_id"))
  1209. from app.core.meta_data import get_formatted_time
  1210. from app.services.neo4j_driver import neo4j_driver
  1211. before_snapshot = old_meta.get("snapshot") or {}
  1212. after_snapshot = new_meta
  1213. # 写入 Neo4j:更新 DataMeta 属性,并尝试同步标签集合
  1214. with neo4j_driver.get_session() as session:
  1215. name_zh_val = (
  1216. after_snapshot.get("name_zh")
  1217. or before_snapshot.get("name_zh")
  1218. or ""
  1219. ).strip()
  1220. name_en_val = (after_snapshot.get("name_en") or "").strip()
  1221. data_type_val = after_snapshot.get("data_type") or "varchar(255)"
  1222. session.run(
  1223. """
  1224. MATCH (m:DataMeta)
  1225. WHERE id(m) = $meta_id
  1226. SET m.name_zh = $name_zh,
  1227. m.name_en = $name_en,
  1228. m.data_type = $data_type,
  1229. m.updateTime = $update_time,
  1230. m.status = true
  1231. """,
  1232. {
  1233. "meta_id": int(meta_id),
  1234. "name_zh": name_zh_val,
  1235. "name_en": name_en_val,
  1236. "data_type": data_type_val,
  1237. "update_time": get_formatted_time(),
  1238. },
  1239. )
  1240. tag_ids = after_snapshot.get("tag_ids") or []
  1241. tag_ids = [int(t) for t in tag_ids if t is not None]
  1242. if tag_ids:
  1243. session.run(
  1244. """
  1245. MATCH (m:DataMeta)-[r:LABEL]->(:DataLabel)
  1246. WHERE id(m) = $meta_id
  1247. DELETE r
  1248. """,
  1249. {"meta_id": int(meta_id)},
  1250. )
  1251. session.run(
  1252. """
  1253. MATCH (m:DataMeta)
  1254. WHERE id(m) = $meta_id
  1255. WITH m
  1256. UNWIND $tag_ids AS tid
  1257. MATCH (t:DataLabel) WHERE id(t) = tid
  1258. MERGE (m)-[:LABEL]->(t)
  1259. """,
  1260. {"meta_id": int(meta_id), "tag_ids": tag_ids},
  1261. )
  1262. # 写入版本历史(PG)
  1263. history = MetadataVersionHistory()
  1264. history.meta_id = int(meta_id) if meta_id is not None else 0
  1265. history.change_source = "ddl"
  1266. history.before_snapshot = (
  1267. before_snapshot if before_snapshot is not None else {}
  1268. )
  1269. history.after_snapshot = (
  1270. after_snapshot if after_snapshot is not None else {}
  1271. )
  1272. history.created_by = resolved_by if resolved_by is not None else ""
  1273. db.session.add(history)
  1274. update_review_record_resolution(
  1275. record,
  1276. action="accept_change",
  1277. payload={"meta_id": int(meta_id)},
  1278. resolved_by=resolved_by,
  1279. notes=notes,
  1280. )
  1281. db.session.commit()
  1282. return jsonify(success(record.to_dict()))
  1283. if action in ("reject_change", "ignore"):
  1284. update_review_record_resolution(
  1285. record,
  1286. action=action,
  1287. payload=action_payload,
  1288. resolved_by=resolved_by,
  1289. notes=notes,
  1290. )
  1291. db.session.commit()
  1292. return jsonify(success(record.to_dict()))
  1293. return jsonify(failed(f"不支持的action: {action}"))
  1294. except Exception as e:
  1295. logger.error(f"处理审核记录失败: {str(e)}")
  1296. db.session.rollback()
  1297. return jsonify(failed("处理审核记录失败", error=str(e)))