routes.py 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522
  1. import io
  2. import logging
  3. from flask import current_app, jsonify, request, send_file
  4. from minio import Minio
  5. from minio.error import S3Error
  6. from sqlalchemy import or_
  7. from app import db
  8. from app.api.meta_data import bp
  9. from app.core.meta_data import (
  10. check_redundancy_for_add,
  11. check_redundancy_for_update,
  12. convert_tag_ids_to_tags,
  13. get_file_content,
  14. get_formatted_time,
  15. handle_id_unstructured,
  16. handle_txt_graph,
  17. meta_impact_graph,
  18. meta_kinship_graph,
  19. meta_list,
  20. normalize_tag_inputs,
  21. parse_text,
  22. solve_unstructured_data,
  23. text_resource_solve,
  24. )
  25. from app.core.system.auth import require_auth
  26. from app.models.metadata_review import (
  27. MetadataReviewRecord,
  28. MetadataVersionHistory,
  29. update_review_record_resolution,
  30. )
  31. from app.models.result import failed, success
  32. from app.services.neo4j_driver import neo4j_driver
  33. logger = logging.getLogger("app")
  34. def get_minio_client():
  35. """获取 MinIO 客户端实例"""
  36. return Minio(
  37. current_app.config["MINIO_HOST"],
  38. access_key=current_app.config["MINIO_USER"],
  39. secret_key=current_app.config["MINIO_PASSWORD"],
  40. secure=current_app.config["MINIO_SECURE"],
  41. )
  42. def get_minio_config():
  43. """获取 MinIO 配置"""
  44. return {
  45. "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
  46. "PREFIX": current_app.config["PREFIX"],
  47. "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
  48. }
  49. def allowed_file(filename):
  50. """检查文件扩展名是否允许"""
  51. if "." not in filename:
  52. return False
  53. ext = filename.rsplit(".", 1)[1].lower()
  54. return ext in get_minio_config()["ALLOWED_EXTENSIONS"]
  55. # 元数据列表
  56. @bp.route("/node/list", methods=["POST"])
  57. def meta_node_list():
  58. try:
  59. payload = request.get_json() or {}
  60. if not isinstance(payload, dict):
  61. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  62. def to_int(value, default):
  63. try:
  64. return int(value)
  65. except (TypeError, ValueError):
  66. return default
  67. # 分页参数
  68. page = to_int(payload.get("current", 1), 1)
  69. page_size = to_int(payload.get("size", 10), 10)
  70. # 过滤参数
  71. name_en_filter = payload.get("name_en") or None
  72. name_zh_filter = payload.get("name_zh") or None
  73. category_filter = payload.get("category") or None
  74. time_filter = payload.get("time") or None
  75. logger.info(
  76. f"[node/list] 过滤参数: name_zh={name_zh_filter}, "
  77. f"name_en={name_en_filter}, category={category_filter}"
  78. )
  79. tag_filter = payload.get("tag")
  80. if tag_filter is not None and not isinstance(tag_filter, list):
  81. tag_filter = None
  82. # 调用核心业务逻辑
  83. result, total_count = meta_list(
  84. page,
  85. page_size,
  86. "",
  87. name_en_filter,
  88. name_zh_filter,
  89. category_filter,
  90. time_filter,
  91. tag_filter,
  92. )
  93. # 返回结果
  94. return jsonify(
  95. success(
  96. {
  97. "records": result,
  98. "total": total_count,
  99. "size": page_size,
  100. "current": page,
  101. }
  102. )
  103. )
  104. except Exception as e:
  105. logger.error(f"获取元数据列表失败: {str(e)}")
  106. return jsonify(failed(str(e)))
  107. # 元数据图谱
  108. @bp.route("/node/graph", methods=["POST"])
  109. def meta_node_graph():
  110. try:
  111. if not request.json:
  112. return jsonify(failed("请求数据不能为空"))
  113. # 从请求中获取节点ID
  114. node_id = request.json.get("nodeId")
  115. if node_id is None:
  116. return jsonify(failed("nodeId 不能为空"))
  117. try:
  118. node_id_int = int(node_id)
  119. except (TypeError, ValueError):
  120. return jsonify(failed("nodeId 必须为整数"))
  121. # 调用核心业务逻辑
  122. graph = meta_kinship_graph(node_id_int)
  123. is_dict = isinstance(graph, dict)
  124. nodes = graph.get("nodes", []) if is_dict else []
  125. relationships = graph.get("relationships", []) if is_dict else []
  126. # 当前节点属性
  127. node_info = next(
  128. (n for n in nodes if n.get("id") == node_id_int),
  129. {},
  130. )
  131. # 关联节点(包含属性,便于前端展示名称等)
  132. related_nodes = [n for n in nodes if n.get("id") != node_id_int]
  133. payload = {
  134. "node": node_info,
  135. "related_nodes": related_nodes,
  136. "relationships": relationships,
  137. }
  138. return jsonify(success(payload))
  139. except Exception as e:
  140. logger.error(f"获取元数据图谱失败: {str(e)}")
  141. return jsonify(failed(str(e)))
  142. # 删除元数据
  143. @bp.route("/node/delete", methods=["POST"])
  144. def meta_node_delete():
  145. try:
  146. if not request.json:
  147. return jsonify(failed("请求数据不能为空"))
  148. # 从请求中获取节点ID
  149. node_id = request.json.get("id")
  150. # 删除节点逻辑
  151. with neo4j_driver.get_session() as session:
  152. cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n"
  153. session.run(cypher, node_id=int(node_id))
  154. # 返回结果
  155. return jsonify(success({}))
  156. except Exception as e:
  157. logger.error(f"删除元数据失败: {str(e)}")
  158. return jsonify(failed(str(e)))
  159. # 编辑元数据
  160. @bp.route("/node/edit", methods=["POST"])
  161. def meta_node_edit():
  162. try:
  163. if not request.json:
  164. return jsonify(failed("请求数据不能为空"))
  165. # 从请求中获取节点ID
  166. node_id = request.json.get("id")
  167. if not node_id:
  168. return jsonify(failed("节点ID不能为空"))
  169. # 获取节点
  170. with neo4j_driver.get_session() as session:
  171. # 查询节点信息
  172. cypher = """
  173. MATCH (n:DataMeta)
  174. WHERE id(n) = $node_id
  175. RETURN n
  176. """
  177. result = session.run(cypher, node_id=int(node_id))
  178. node = result.single()
  179. if not node or not node["n"]:
  180. return jsonify(failed("节点不存在"))
  181. # 获取节点数据
  182. node_data = dict(node["n"])
  183. node_data["id"] = node["n"].id
  184. # 获取标签信息
  185. tag_cypher = """
  186. MATCH (n:DataMeta)-[:LABEL]->(t:DataLabel)
  187. WHERE id(n) = $node_id
  188. RETURN t
  189. """
  190. tag_result = session.run(tag_cypher, node_id=int(node_id))
  191. tags: list[dict] = []
  192. for record in tag_result:
  193. tag_node = record.get("t")
  194. if tag_node:
  195. tags.append(
  196. {
  197. "id": tag_node.id,
  198. "name_zh": tag_node.get("name_zh", ""),
  199. "name_en": tag_node.get("name_en", ""),
  200. }
  201. )
  202. # 获取主数据信息
  203. master_data_cypher = """
  204. MATCH (n:DataMeta)-[:master_data]->(m:master_data)
  205. WHERE id(n) = $node_id
  206. RETURN m
  207. """
  208. master_data_result = session.run(master_data_cypher, node_id=int(node_id))
  209. master_data = master_data_result.single()
  210. # 构建返回数据
  211. response_data = [
  212. {
  213. "master_data": (
  214. master_data["m"].id
  215. if master_data and master_data["m"]
  216. else None
  217. ),
  218. "name_zh": node_data.get("name_zh", ""),
  219. "name_en": node_data.get("name_en", ""),
  220. "create_time": node_data.get("create_time", ""),
  221. "update_time": node_data.get("update_time", ""),
  222. "status": bool(node_data.get("status", True)),
  223. "data_type": node_data.get("data_type", ""),
  224. "tag": tags,
  225. "affiliation": node_data.get("affiliation"),
  226. "category": node_data.get("category"),
  227. "alias": node_data.get("alias"),
  228. "describe": node_data.get("describe"),
  229. }
  230. ]
  231. logger.info(f"成功获取元数据节点: ID={node_data['id']}")
  232. return jsonify(success(response_data))
  233. except Exception as e:
  234. logger.error(f"获取元数据节点失败: {str(e)}")
  235. return jsonify(failed(str(e)))
  236. # 增加元数据
  237. @bp.route("/check", methods=["GET"])
  238. def meta_check():
  239. """
  240. 检查元数据中文名是否已存在
  241. 请求参数:
  242. - name_zh: 元数据中文名(URL参数)
  243. 返回:
  244. - exists: true/false 表示是否存在
  245. """
  246. try:
  247. name_zh = request.args.get("name_zh")
  248. if not name_zh:
  249. return jsonify(failed("缺少name_zh参数"))
  250. # 查询数据库检查是否存在
  251. with neo4j_driver.get_session() as session:
  252. cypher = """
  253. MATCH (n:DataMeta {name_zh: $name_zh})
  254. RETURN count(n) > 0 as exists
  255. """
  256. result = session.run(cypher, name_zh=name_zh)
  257. record = result.single()
  258. if record:
  259. exists = record["exists"]
  260. logger.info(f"检查元数据 '{name_zh}': {'存在' if exists else '不存在'}")
  261. return jsonify(
  262. success({"exists": exists, "name_zh": name_zh}, "查询成功")
  263. )
  264. else:
  265. return jsonify(
  266. success({"exists": False, "name_zh": name_zh}, "查询成功")
  267. )
  268. except Exception as e:
  269. logger.error(f"检查元数据失败: {str(e)}")
  270. return jsonify(failed(f"检查失败: {str(e)}"))
  271. @bp.route("/node/add", methods=["POST"])
  272. def meta_node_add():
  273. """
  274. 新增元数据节点
  275. 在创建前会进行冗余检测:
  276. - 如果存在完全匹配的元数据,返回已存在的节点信息
  277. - 如果存在疑似重复的元数据,创建审核记录并返回提示
  278. - 如果无重复,正常创建新节点
  279. """
  280. try:
  281. if not request.json:
  282. return jsonify(failed("请求数据不能为空"))
  283. # 从请求中获取节点信息
  284. node_name_zh = request.json.get("name_zh")
  285. node_type = request.json.get("data_type")
  286. node_category = request.json.get("category")
  287. node_alias = request.json.get("alias")
  288. node_affiliation = request.json.get("affiliation")
  289. node_tag = request.json.get("tag")
  290. node_desc = request.json.get("describe")
  291. node_status = bool(request.json.get("status", True))
  292. node_name_en = request.json.get("name_en")
  293. # 是否强制创建(跳过冗余检测)
  294. force_create = bool(request.json.get("force_create", False))
  295. if not node_name_zh:
  296. return jsonify(failed("节点名称不能为空"))
  297. if not node_type:
  298. return jsonify(failed("节点类型不能为空"))
  299. # 统一处理标签ID
  300. tag_ids = normalize_tag_inputs(node_tag)
  301. # ========== 冗余检测 ==========
  302. if not force_create:
  303. redundancy_result = check_redundancy_for_add(
  304. name_zh=node_name_zh,
  305. name_en=node_name_en or "",
  306. data_type=node_type,
  307. tag_ids=tag_ids,
  308. )
  309. # 存在完全匹配的元数据
  310. if redundancy_result["has_exact_match"]:
  311. exact_id = redundancy_result["exact_match_id"]
  312. logger.info(
  313. f"元数据已存在(完全匹配): name_zh={node_name_zh}, "
  314. f"existing_id={exact_id}"
  315. )
  316. # 返回已存在的节点信息
  317. with neo4j_driver.get_session() as session:
  318. existing = session.run(
  319. "MATCH (n:DataMeta) WHERE id(n) = $id RETURN n",
  320. {"id": exact_id},
  321. ).single()
  322. if existing and existing["n"]:
  323. existing_data = dict(existing["n"])
  324. existing_data["id"] = existing["n"].id
  325. return jsonify(
  326. success(existing_data, message="元数据已存在,返回已有节点")
  327. )
  328. return jsonify(failed(f"元数据已存在(ID={exact_id}),请勿重复创建"))
  329. # 存在疑似重复的元数据,已创建审核记录
  330. if redundancy_result["review_created"]:
  331. candidates = redundancy_result["candidates"]
  332. candidate_names = [c.get("name_zh", "") for c in candidates[:3]]
  333. logger.info(
  334. f"发现疑似重复元数据: name_zh={node_name_zh}, "
  335. f"candidates={candidate_names}"
  336. )
  337. return jsonify(
  338. failed(
  339. f"发现疑似重复元数据,已创建审核记录。"
  340. f"疑似重复: {', '.join(candidate_names)}。"
  341. f"请前往元数据审核页面处理,或使用 force_create=true 强制创建。"
  342. )
  343. )
  344. # ========== 创建节点 ==========
  345. with neo4j_driver.get_session() as session:
  346. cypher = """
  347. MERGE (n:DataMeta {name_zh: $name_zh})
  348. ON CREATE SET n.name_en = $name_en,
  349. n.data_type = $data_type,
  350. n.category = $category,
  351. n.alias = $alias,
  352. n.affiliation = $affiliation,
  353. n.describe = $describe,
  354. n.create_time = $create_time,
  355. n.updateTime = $update_time,
  356. n.status = $status,
  357. n.name_en = $name_en
  358. ON MATCH SET n.data_type = $data_type,
  359. n.category = $category,
  360. n.alias = $alias,
  361. n.affiliation = $affiliation,
  362. n.describe = $describe,
  363. n.updateTime = $update_time,
  364. n.status = $status,
  365. n.name_en = $name_en
  366. RETURN n
  367. """
  368. create_time = update_time = get_formatted_time()
  369. result = session.run(
  370. cypher,
  371. name_zh=node_name_zh,
  372. data_type=node_type,
  373. category=node_category,
  374. alias=node_alias,
  375. affiliation=node_affiliation,
  376. describe=node_desc,
  377. create_time=create_time,
  378. update_time=update_time,
  379. status=node_status,
  380. name_en=node_name_en,
  381. )
  382. node = result.single()
  383. if node and node["n"]:
  384. node_data = dict(node["n"])
  385. node_data["id"] = node["n"].id
  386. # 如果提供了标签列表,创建标签关系
  387. tag_nodes = []
  388. if tag_ids:
  389. for tag_id in tag_ids:
  390. # 获取标签节点信息
  391. tag_fetch = session.run(
  392. "MATCH (t:DataLabel) WHERE id(t) = $tag_id RETURN t",
  393. tag_id=tag_id,
  394. ).single()
  395. if not tag_fetch or not tag_fetch.get("t"):
  396. logger.warning(f"未找到标签节点: {tag_id}")
  397. continue
  398. tag_node = tag_fetch["t"]
  399. tag_nodes.append(
  400. {
  401. "id": tag_node.id,
  402. "name_zh": tag_node.get("name_zh", ""),
  403. "name_en": tag_node.get("name_en", ""),
  404. }
  405. )
  406. tag_cypher = """
  407. MATCH (n:DataMeta), (t:DataLabel)
  408. WHERE id(n) = $node_id AND id(t) = $tag_id
  409. MERGE (n)-[r:LABEL]->(t)
  410. RETURN r
  411. """
  412. session.run(tag_cypher, node_id=node["n"].id, tag_id=tag_id)
  413. node_data["tag"] = tag_nodes
  414. logger.info(
  415. f"成功创建或更新元数据节点: "
  416. f"ID={node_data['id']}, name={node_name_zh}"
  417. )
  418. return jsonify(success(node_data))
  419. else:
  420. logger.error(f"创建元数据节点失败: {node_name_zh}")
  421. return jsonify(failed("创建元数据节点失败"))
  422. except Exception as e:
  423. logger.error(f"添加元数据失败: {str(e)}")
  424. return jsonify(failed(str(e)))
  425. # 搜索元数据
  426. @bp.route("/search", methods=["GET"])
  427. def search_metadata_route():
  428. try:
  429. keyword = request.args.get("keyword", "")
  430. if not keyword:
  431. return jsonify(success([]))
  432. cypher = """
  433. MATCH (n:DataMeta)
  434. WHERE n.name_zh CONTAINS $keyword
  435. RETURN n LIMIT 100
  436. """
  437. with neo4j_driver.get_session() as session:
  438. result = session.run(cypher, keyword=keyword)
  439. metadata_list = [dict(record["n"]) for record in result]
  440. return jsonify(success(metadata_list))
  441. except Exception as e:
  442. logger.error(f"搜索元数据失败: {str(e)}")
  443. return jsonify(failed(str(e)))
  444. # 全文检索查询
  445. @bp.route("/full/text/query", methods=["POST"])
  446. def full_text_query():
  447. try:
  448. if not request.json:
  449. return jsonify(failed("请求数据不能为空"))
  450. # 获取查询条件
  451. search_term = request.json.get("query", "")
  452. if not search_term:
  453. return jsonify(failed("查询条件不能为空"))
  454. # 执行Neo4j全文索引查询
  455. with neo4j_driver.get_session() as session:
  456. cypher = """
  457. CALL db.index.fulltext.queryNodes("DataMetaFulltext", $term)
  458. YIELD node, score
  459. RETURN node, score
  460. ORDER BY score DESC
  461. LIMIT 20
  462. """
  463. result = session.run(cypher, term=search_term)
  464. # 处理查询结果
  465. search_results = []
  466. for record in result:
  467. node_data = dict(record["node"])
  468. node_data["id"] = record["node"].id
  469. node_data["score"] = record["score"]
  470. search_results.append(node_data)
  471. return jsonify(success(search_results))
  472. except Exception as e:
  473. logger.error(f"全文检索查询失败: {str(e)}")
  474. return jsonify(failed(str(e)))
  475. # 非结构化文本查询
  476. @bp.route("/unstructure/text/query", methods=["POST"])
  477. def unstructure_text_query():
  478. try:
  479. if not request.json:
  480. return jsonify(failed("请求数据不能为空"))
  481. # 获取查询参数
  482. node_id = request.json.get("id")
  483. if not node_id:
  484. return jsonify(failed("节点ID不能为空"))
  485. # 获取节点信息
  486. node_data = handle_id_unstructured(node_id)
  487. if not node_data:
  488. return jsonify(failed("节点不存在"))
  489. # 获取对象路径
  490. object_name = node_data.get("url")
  491. if not object_name:
  492. return jsonify(failed("文档路径不存在"))
  493. # 获取 MinIO 配置
  494. minio_client = get_minio_client()
  495. config = get_minio_config()
  496. bucket_name = config["MINIO_BUCKET"]
  497. # 从MinIO获取文件内容
  498. file_content = get_file_content(minio_client, bucket_name, object_name)
  499. # 解析文本内容
  500. parsed_data = parse_text(file_content)
  501. # 返回结果
  502. result = {
  503. "node": node_data,
  504. "parsed": parsed_data,
  505. "content": (
  506. file_content[:1000] + "..."
  507. if len(file_content) > 1000
  508. else file_content
  509. ),
  510. }
  511. return jsonify(success(result))
  512. except Exception as e:
  513. logger.error(f"非结构化文本查询失败: {str(e)}")
  514. return jsonify(failed(str(e)))
  515. # 文件上传
  516. @bp.route("/resource/upload", methods=["POST"])
  517. def upload_file():
  518. try:
  519. # 检查请求中是否有文件
  520. if "file" not in request.files:
  521. return jsonify(failed("没有找到上传的文件"))
  522. file = request.files["file"]
  523. # 检查文件名
  524. if not file.filename:
  525. return jsonify(failed("未选择文件"))
  526. # 保存文件名到本地变量(确保类型安全)
  527. filename = file.filename
  528. # 检查文件类型
  529. if not allowed_file(filename):
  530. return jsonify(failed("不支持的文件类型"))
  531. # 获取 MinIO 配置
  532. minio_client = get_minio_client()
  533. config = get_minio_config()
  534. # 上传到MinIO
  535. file_content = file.read()
  536. file_size = len(file_content)
  537. file_type = filename.rsplit(".", 1)[1].lower()
  538. # 提取文件名(不包含扩展名)
  539. filename_without_ext = filename.rsplit(".", 1)[0]
  540. # 生成紧凑的时间戳 (yyyyMMddHHmmss)
  541. import time
  542. timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
  543. # 生成唯一文件名
  544. object_name = (
  545. f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}"
  546. )
  547. # 上传文件
  548. minio_client.put_object(
  549. config["MINIO_BUCKET"],
  550. object_name,
  551. io.BytesIO(file_content),
  552. file_size,
  553. content_type=f"application/{file_type}",
  554. )
  555. # 返回结果
  556. return jsonify(
  557. success(
  558. {
  559. "filename": file.filename,
  560. "size": file_size,
  561. "type": file_type,
  562. "url": object_name,
  563. }
  564. )
  565. )
  566. except Exception as e:
  567. logger.error(f"文件上传失败: {str(e)}")
  568. return jsonify(failed(str(e)))
  569. # 文件下载显示
  570. @bp.route("/resource/display", methods=["POST"])
  571. def upload_file_display():
  572. response = None
  573. try:
  574. if not request.json:
  575. return jsonify(failed("请求数据不能为空"))
  576. object_name = request.json.get("url")
  577. if not object_name:
  578. return jsonify(failed("文件路径不能为空"))
  579. # 获取 MinIO 配置
  580. minio_client = get_minio_client()
  581. config = get_minio_config()
  582. # 获取文件内容
  583. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  584. file_data = response.read()
  585. # 获取文件名
  586. file_name = object_name.split("/")[-1]
  587. # 确定文件类型
  588. file_extension = file_name.split(".")[-1].lower()
  589. # 为不同文件类型设置合适的MIME类型
  590. mime_types = {
  591. "pdf": "application/pdf",
  592. "doc": "application/msword",
  593. "docx": (
  594. "application/vnd.openxmlformats-"
  595. "officedocument.wordprocessingml.document"
  596. ),
  597. "xls": "application/vnd.ms-excel",
  598. "xlsx": (
  599. "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
  600. ),
  601. "txt": "text/plain",
  602. "csv": "text/csv",
  603. }
  604. content_type = mime_types.get(file_extension, "application/octet-stream")
  605. # 返回结果
  606. return jsonify(
  607. success(
  608. {
  609. "filename": file_name,
  610. "type": file_extension,
  611. "contentType": content_type,
  612. "size": len(file_data),
  613. "url": f"/api/meta/resource/download?url={object_name}",
  614. }
  615. )
  616. )
  617. except S3Error as e:
  618. logger.error(f"MinIO操作失败: {str(e)}")
  619. return jsonify(failed(f"文件访问失败: {str(e)}"))
  620. except Exception as e:
  621. logger.error(f"文件显示信息获取失败: {str(e)}")
  622. return jsonify(failed(str(e)))
  623. finally:
  624. if response:
  625. response.close()
  626. response.release_conn()
  627. # 文件下载接口
  628. @bp.route("/resource/download", methods=["GET"])
  629. def download_file():
  630. response = None
  631. try:
  632. object_name = request.args.get("url")
  633. if not object_name:
  634. return jsonify(failed("文件路径不能为空"))
  635. # URL解码,处理特殊字符
  636. import urllib.parse
  637. object_name = urllib.parse.unquote(object_name)
  638. # 记录下载请求信息,便于调试
  639. logger.info(f"下载文件请求: {object_name}")
  640. # 获取 MinIO 配置
  641. minio_client = get_minio_client()
  642. config = get_minio_config()
  643. # 获取文件
  644. try:
  645. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  646. file_data = response.read()
  647. except S3Error as e:
  648. logger.error(f"MinIO获取文件失败: {str(e)}")
  649. return jsonify(failed(f"文件获取失败: {str(e)}"))
  650. # 获取文件名,并处理特殊字符
  651. file_name = object_name.split("/")[-1]
  652. # 直接从内存返回文件,不创建临时文件
  653. file_stream = io.BytesIO(file_data)
  654. # 返回文件
  655. return send_file(
  656. file_stream,
  657. as_attachment=True,
  658. download_name=file_name,
  659. mimetype="application/octet-stream",
  660. )
  661. except Exception as e:
  662. logger.error(f"文件下载失败: {str(e)}")
  663. return jsonify(failed(str(e)))
  664. finally:
  665. if response:
  666. response.close()
  667. response.release_conn()
  668. # 文本资源翻译
  669. @bp.route("/resource/translate", methods=["POST"])
  670. def text_resource_translate():
  671. try:
  672. if not request.json:
  673. return jsonify(failed("请求数据不能为空"))
  674. # 获取参数
  675. name_zh = request.json.get("name_zh", "")
  676. keyword = request.json.get("keyword", "")
  677. if not name_zh:
  678. return jsonify(failed("名称不能为空"))
  679. # 调用资源处理逻辑
  680. result = text_resource_solve(None, name_zh, keyword)
  681. return jsonify(success(result))
  682. except Exception as e:
  683. logger.error(f"文本资源翻译失败: {str(e)}")
  684. return jsonify(failed(str(e)))
  685. # 创建文本资源节点
  686. @bp.route("/resource/node", methods=["POST"])
  687. def text_resource_node():
  688. try:
  689. if not request.json:
  690. return jsonify(failed("请求数据不能为空"))
  691. # 获取参数
  692. name_zh = request.json.get("name_zh", "")
  693. name_en = request.json.get("name_en", "")
  694. keywords = request.json.get("keywords", [])
  695. keywords_en = request.json.get("keywords_en", [])
  696. object_name = request.json.get("url", "")
  697. if not name_zh or not name_en or not object_name:
  698. return jsonify(failed("参数不完整"))
  699. # 创建节点
  700. with neo4j_driver.get_session() as session:
  701. # 创建资源节点
  702. cypher = """
  703. CREATE (n:DataMeta {
  704. name_zh: $name_zh,
  705. name_en: $name_en,
  706. keywords: $keywords,
  707. keywords_en: $keywords_en,
  708. url: $object_name,
  709. create_time: $create_time,
  710. updateTime: $update_time
  711. })
  712. RETURN n
  713. """
  714. create_time = update_time = get_formatted_time()
  715. result = session.run(
  716. cypher,
  717. name_zh=name_zh,
  718. name_en=name_en,
  719. keywords=keywords,
  720. keywords_en=keywords_en,
  721. object_name=object_name,
  722. create_time=create_time,
  723. update_time=update_time,
  724. )
  725. record = result.single()
  726. if not record:
  727. return jsonify(failed("创建节点失败"))
  728. node = record["n"]
  729. # 为每个关键词创建标签节点并关联
  730. for i, keyword in enumerate(keywords):
  731. if keyword:
  732. # 创建标签节点
  733. tag_cypher = """
  734. MERGE (t:Tag {name_zh: $name_zh})
  735. ON CREATE SET t.name_en = $name_en,
  736. t.create_time = $create_time
  737. RETURN t
  738. """
  739. tag_result = session.run(
  740. tag_cypher,
  741. name_zh=keyword,
  742. name_en=keywords_en[i] if i < len(keywords_en) else "",
  743. create_time=create_time,
  744. )
  745. tag_record = tag_result.single()
  746. if not tag_record:
  747. continue
  748. tag_node = tag_record["t"]
  749. # 创建关系
  750. rel_cypher = """
  751. MATCH (n), (t)
  752. WHERE id(n) = $node_id AND id(t) = $tag_id
  753. CREATE (n)-[r:HAS_TAG]->(t)
  754. RETURN r
  755. """
  756. session.run(rel_cypher, node_id=node.id, tag_id=tag_node.id)
  757. # 返回创建的节点
  758. return jsonify(success(dict(node)))
  759. except Exception as e:
  760. logger.error(f"创建文本资源节点失败: {str(e)}")
  761. return jsonify(failed(str(e)))
  762. # 处理非结构化数据
  763. @bp.route("/unstructured/process", methods=["POST"])
  764. def processing_unstructured_data():
  765. try:
  766. if not request.json:
  767. return jsonify(failed("请求数据不能为空"))
  768. # 获取参数
  769. node_id = request.json.get("id")
  770. if not node_id:
  771. return jsonify(failed("节点ID不能为空"))
  772. # 获取 MinIO 配置
  773. minio_client = get_minio_client()
  774. config = get_minio_config()
  775. prefix = config["PREFIX"]
  776. # 调用处理逻辑
  777. result = solve_unstructured_data(node_id, minio_client, prefix)
  778. if result:
  779. return jsonify(success({"message": "处理成功"}))
  780. else:
  781. return jsonify(failed("处理失败"))
  782. except Exception as e:
  783. logger.error(f"处理非结构化数据失败: {str(e)}")
  784. return jsonify(failed(str(e)))
  785. # 创建文本图谱
  786. @bp.route("/text/graph", methods=["POST"])
  787. def create_text_graph():
  788. try:
  789. if not request.json:
  790. return jsonify(failed("请求数据不能为空"))
  791. # 获取参数
  792. node_id = request.json.get("id")
  793. entity_zh = request.json.get("entity_zh")
  794. entity_en = request.json.get("entity_en")
  795. if not all([node_id, entity_zh, entity_en]):
  796. return jsonify(failed("参数不完整"))
  797. # 创建图谱
  798. result = handle_txt_graph(node_id, entity_zh, entity_en)
  799. if result:
  800. return jsonify(success({"message": "图谱创建成功"}))
  801. else:
  802. return jsonify(failed("图谱创建失败"))
  803. except Exception as e:
  804. logger.error(f"创建文本图谱失败: {str(e)}")
  805. return jsonify(failed(str(e)))
  806. @bp.route("/config", methods=["GET"])
  807. @require_auth
  808. def get_meta_config():
  809. """获取元数据配置信息"""
  810. config = get_minio_config()
  811. return jsonify(
  812. {
  813. "bucket_name": config["MINIO_BUCKET"],
  814. "prefix": config["PREFIX"],
  815. "allowed_extensions": list(config["ALLOWED_EXTENSIONS"]),
  816. }
  817. )
  818. # 更新元数据
  819. @bp.route("/node/update", methods=["POST"])
  820. def meta_node_update():
  821. """
  822. 更新元数据节点
  823. 在更新前会进行冗余检测(如果修改了 name_zh/name_en):
  824. - 如果更新后的名称与其他节点完全匹配,返回错误
  825. - 如果存在疑似重复的元数据,创建审核记录并返回提示
  826. - 如果无重复,正常更新节点
  827. """
  828. try:
  829. if not request.json:
  830. return jsonify(failed("请求数据不能为空"))
  831. # 从请求中获取节点ID和更新数据
  832. node_id = request.json.get("id")
  833. if not node_id:
  834. return jsonify(failed("节点ID不能为空"))
  835. # 验证并转换节点ID为整数
  836. try:
  837. node_id = int(node_id)
  838. except (ValueError, TypeError):
  839. return jsonify(failed(f"节点ID必须为整数,当前值: {node_id}"))
  840. # 是否强制更新(跳过冗余检测)
  841. force_update = bool(request.json.get("force_update", False))
  842. # 更新节点
  843. with neo4j_driver.get_session() as session:
  844. # 检查节点是否存在并获取当前值
  845. check_cypher = """
  846. MATCH (n:DataMeta)
  847. WHERE id(n) = $node_id
  848. RETURN n
  849. """
  850. result = session.run(check_cypher, node_id=node_id)
  851. node = result.single()
  852. if not node or not node["n"]:
  853. return jsonify(failed("节点不存在"))
  854. # 获取当前节点属性
  855. current_node = dict(node["n"])
  856. # 处理每个可能的更新字段
  857. fields_to_update = {
  858. "name_zh": request.json.get("name_zh"),
  859. "category": request.json.get("category"),
  860. "alias": request.json.get("alias"),
  861. "affiliation": request.json.get("affiliation"),
  862. "data_type": request.json.get("data_type"),
  863. "describe": request.json.get("describe"),
  864. "status": request.json.get("status"),
  865. "name_en": request.json.get("name_en"),
  866. }
  867. # 计算更新后的值(用于冗余检测)
  868. updated_name_zh = (
  869. fields_to_update["name_zh"]
  870. if fields_to_update["name_zh"] is not None
  871. else current_node.get("name_zh", "")
  872. )
  873. updated_name_en = (
  874. fields_to_update["name_en"]
  875. if fields_to_update["name_en"] is not None
  876. else current_node.get("name_en", "")
  877. )
  878. updated_data_type = (
  879. fields_to_update["data_type"]
  880. if fields_to_update["data_type"] is not None
  881. else current_node.get("data_type", "varchar(255)")
  882. )
  883. # 处理标签
  884. tag = request.json.get("tag")
  885. tag_ids = normalize_tag_inputs(tag) if tag is not None else []
  886. # ========== 冗余检测(仅当修改了 name_zh 或 name_en 时)==========
  887. name_changed = (
  888. fields_to_update["name_zh"] is not None
  889. and fields_to_update["name_zh"] != current_node.get("name_zh")
  890. ) or (
  891. fields_to_update["name_en"] is not None
  892. and fields_to_update["name_en"] != current_node.get("name_en")
  893. )
  894. if name_changed and not force_update:
  895. redundancy_result = check_redundancy_for_update(
  896. node_id=node_id,
  897. name_zh=updated_name_zh,
  898. name_en=updated_name_en,
  899. data_type=updated_data_type,
  900. tag_ids=tag_ids,
  901. )
  902. # 存在完全匹配的其他元数据
  903. if redundancy_result["has_exact_match"]:
  904. exact_id = redundancy_result["exact_match_id"]
  905. logger.warning(
  906. f"更新后元数据与其他节点完全匹配: "
  907. f"node_id={node_id}, existing_id={exact_id}"
  908. )
  909. return jsonify(
  910. failed(
  911. f"更新后的元数据与已有节点(ID={exact_id})完全相同,"
  912. f"请检查是否需要合并或修改名称。"
  913. )
  914. )
  915. # 存在疑似重复的元数据,已创建审核记录
  916. if redundancy_result["review_created"]:
  917. candidates = redundancy_result["candidates"]
  918. candidate_names = [c.get("name_zh", "") for c in candidates[:3]]
  919. logger.info(
  920. f"更新元数据发现疑似重复: node_id={node_id}, "
  921. f"candidates={candidate_names}"
  922. )
  923. return jsonify(
  924. failed(
  925. f"发现疑似重复元数据,已创建审核记录。"
  926. f"疑似重复: {', '.join(candidate_names)}。"
  927. f"请前往元数据审核页面处理,或使用 force_update=true 强制更新。"
  928. )
  929. )
  930. # ========== 执行更新 ==========
  931. # 构建更新语句,只更新提供的属性
  932. update_cypher = """
  933. MATCH (n:DataMeta)
  934. WHERE id(n) = $node_id
  935. SET n.updateTime = $update_time
  936. """
  937. # 准备更新参数
  938. update_params = {"node_id": node_id, "update_time": get_formatted_time()}
  939. # 只更新提供了新值的字段
  940. for field, new_value in fields_to_update.items():
  941. if new_value is not None:
  942. # 特殊处理 data_type 字段映射
  943. if field == "data_type":
  944. update_cypher += f", n.data_type = ${field}\n"
  945. else:
  946. update_cypher += f", n.{field} = ${field}\n"
  947. update_params[field] = new_value
  948. update_cypher += "RETURN n"
  949. result = session.run(
  950. update_cypher, # type: ignore[arg-type]
  951. **update_params,
  952. )
  953. updated_node = result.single()
  954. if updated_node and updated_node["n"]:
  955. node_data = dict(updated_node["n"])
  956. node_data["id"] = updated_node["n"].id
  957. # 如果更新了标签,处理标签关系(支持列表)
  958. if tag is not None:
  959. # 先删除现有标签关系
  960. delete_tag_cypher = """
  961. MATCH (n:DataMeta)-[r:LABEL]->(t:DataLabel)
  962. WHERE id(n) = $node_id
  963. DELETE r
  964. """
  965. session.run(delete_tag_cypher, node_id=node_id)
  966. for tag_id in tag_ids:
  967. create_tag_cypher = """
  968. MATCH (n:DataMeta), (t:DataLabel)
  969. WHERE id(n) = $node_id AND id(t) = $tag_id
  970. MERGE (n)-[r:LABEL]->(t)
  971. RETURN r
  972. """
  973. session.run(create_tag_cypher, node_id=node_id, tag_id=tag_id)
  974. logger.info(f"成功更新元数据节点: ID={node_data['id']}")
  975. return jsonify(success(node_data))
  976. else:
  977. logger.error(f"更新元数据节点失败: ID={node_id}")
  978. return jsonify(failed("更新元数据节点失败"))
  979. except Exception as e:
  980. logger.error(f"更新元数据失败: {str(e)}")
  981. return jsonify(failed(str(e)))
  982. @bp.route("/review/list", methods=["POST"])
  983. def metadata_review_list():
  984. """
  985. 审核记录列表:疑似冗余/变动
  986. Body:
  987. - current: 页码(默认1)
  988. - size: 每页数量(默认10)
  989. - record_type: redundancy|change(可选)
  990. - status: pending|resolved|ignored(可选)
  991. - business_domain_id: 业务领域ID(可选)
  992. - keyword: 关键字(可选,匹配 new_meta.name_zh/name_en)
  993. """
  994. try:
  995. payload = request.get_json() or {}
  996. if not isinstance(payload, dict):
  997. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  998. def to_int(value, default):
  999. try:
  1000. return int(value)
  1001. except (TypeError, ValueError):
  1002. return default
  1003. page = to_int(payload.get("current", 1), 1)
  1004. page_size = to_int(payload.get("size", 10), 10)
  1005. record_type = payload.get("record_type")
  1006. status = payload.get("status")
  1007. business_domain_id = payload.get("business_domain_id")
  1008. keyword = (payload.get("keyword") or "").strip()
  1009. query = MetadataReviewRecord.query
  1010. if record_type:
  1011. query = query.filter(MetadataReviewRecord.record_type == record_type)
  1012. if status:
  1013. query = query.filter(MetadataReviewRecord.status == status)
  1014. if business_domain_id is not None and str(business_domain_id).strip() != "":
  1015. bd_id_int = int(business_domain_id)
  1016. query = query.filter(MetadataReviewRecord.business_domain_id == bd_id_int)
  1017. if keyword:
  1018. # 兼容:使用JSONB ->> 提取进行模糊匹配
  1019. name_zh_col = MetadataReviewRecord.new_meta["name_zh"].astext
  1020. name_en_col = MetadataReviewRecord.new_meta["name_en"].astext
  1021. query = query.filter(
  1022. or_(
  1023. name_zh_col.contains(keyword),
  1024. name_en_col.contains(keyword),
  1025. )
  1026. )
  1027. total = query.count()
  1028. records = (
  1029. query.order_by(MetadataReviewRecord.created_at.desc())
  1030. .offset((page - 1) * page_size)
  1031. .limit(page_size)
  1032. .all()
  1033. )
  1034. # 将 tag_ids 转换为 tags
  1035. records_data = [convert_tag_ids_to_tags(r.to_dict()) for r in records]
  1036. return jsonify(
  1037. success(
  1038. {
  1039. "records": records_data,
  1040. "total": total,
  1041. "size": page_size,
  1042. "current": page,
  1043. }
  1044. )
  1045. )
  1046. except Exception as e:
  1047. logger.error(f"审核记录列表查询失败: {str(e)}")
  1048. return jsonify(failed("审核记录列表查询失败", error=str(e)))
  1049. @bp.route("/review/detail", methods=["GET"])
  1050. def metadata_review_detail():
  1051. """
  1052. 审核记录详情
  1053. Query:
  1054. - id: 记录ID
  1055. """
  1056. try:
  1057. record_id = request.args.get("id")
  1058. if not record_id:
  1059. return jsonify(failed("缺少id参数"))
  1060. record = MetadataReviewRecord.query.get(int(record_id))
  1061. if not record:
  1062. return jsonify(failed("记录不存在"))
  1063. # 将 tag_ids 转换为 tags
  1064. data = convert_tag_ids_to_tags(record.to_dict())
  1065. # change 场景:返回受影响元数据的影响关系图谱(若有 meta_id)
  1066. impact_graph = None
  1067. if record.record_type == "change":
  1068. old_meta = record.old_meta or {}
  1069. meta_id = old_meta.get("meta_id")
  1070. if meta_id is not None and str(meta_id).strip() != "":
  1071. try:
  1072. impact_graph = meta_impact_graph(int(meta_id))
  1073. except Exception as e:
  1074. logger.warning(f"获取影响图谱失败: {e}")
  1075. data["impact_graph"] = impact_graph
  1076. return jsonify(success(data))
  1077. except Exception as e:
  1078. logger.error(f"审核记录详情查询失败: {str(e)}")
  1079. return jsonify(failed("审核记录详情查询失败", error=str(e)))
  1080. @bp.route("/review/resolve", methods=["POST"])
  1081. def metadata_review_resolve():
  1082. """
  1083. 处理审核记录
  1084. Body:
  1085. - id: 记录ID
  1086. - action: alias | create_new | accept_change | reject_change | ignore
  1087. - payload: 动作参数(可选)
  1088. - resolved_by: 处理人(可选)
  1089. - notes: 备注(可选)
  1090. action=alias:
  1091. payload: { candidate_meta_id: int }
  1092. 行为:为业务领域建立 INCLUDES 到 candidate_meta_id,关系上写入 alias_name_zh/alias_name_en
  1093. action=create_new:
  1094. payload: { new_name_zh: str }
  1095. 行为:创建新的 DataMeta(中文名区分)并关联业务领域
  1096. action=accept_change:
  1097. payload: { meta_id?: int }
  1098. 行为:把 new_meta 写回目标 DataMeta,并写入 metadata_version_history(PG)
  1099. action=reject_change/ignore:
  1100. 行为:仅更新审核记录状态
  1101. """
  1102. try:
  1103. payload = request.get_json() or {}
  1104. if not isinstance(payload, dict):
  1105. return jsonify(failed("请求数据格式错误,应为 JSON 对象"))
  1106. record_id = payload.get("id")
  1107. action = payload.get("action")
  1108. action_payload = payload.get("payload") or {}
  1109. resolved_by = payload.get("resolved_by")
  1110. notes = payload.get("notes")
  1111. if not record_id:
  1112. return jsonify(failed("id 不能为空"))
  1113. if not action:
  1114. return jsonify(failed("action 不能为空"))
  1115. record = MetadataReviewRecord.query.get(int(record_id))
  1116. if not record:
  1117. return jsonify(failed("记录不存在"))
  1118. if record.status != "pending":
  1119. return jsonify(failed("记录已处理,无法重复处理"))
  1120. # 需要业务领域上下文的动作
  1121. bd_id = record.business_domain_id
  1122. new_meta = record.new_meta or {}
  1123. if action == "alias":
  1124. candidate_meta_id = action_payload.get("candidate_meta_id")
  1125. if not bd_id:
  1126. return jsonify(failed("记录缺少 business_domain_id,无法执行 alias"))
  1127. if not candidate_meta_id:
  1128. return jsonify(failed("payload.candidate_meta_id 不能为空"))
  1129. # 写入 Neo4j:建立 INCLUDES,并记录别名
  1130. from app.services.neo4j_driver import neo4j_driver
  1131. alias_name_zh = (new_meta.get("name_zh") or "").strip()
  1132. alias_name_en = (new_meta.get("name_en") or "").strip()
  1133. with neo4j_driver.get_session() as session:
  1134. session.run(
  1135. """
  1136. MATCH (n:BusinessDomain), (m:DataMeta)
  1137. WHERE id(n) = $domain_id AND id(m) = $meta_id
  1138. MERGE (n)-[r:INCLUDES]->(m)
  1139. SET r.alias_name_zh = $alias_name_zh,
  1140. r.alias_name_en = $alias_name_en
  1141. """,
  1142. {
  1143. "domain_id": int(bd_id),
  1144. "meta_id": int(candidate_meta_id),
  1145. "alias_name_zh": alias_name_zh,
  1146. "alias_name_en": alias_name_en,
  1147. },
  1148. )
  1149. update_review_record_resolution(
  1150. record,
  1151. action="alias",
  1152. payload={"candidate_meta_id": int(candidate_meta_id)},
  1153. resolved_by=resolved_by,
  1154. notes=notes,
  1155. )
  1156. db.session.commit()
  1157. return jsonify(success(record.to_dict()))
  1158. if action == "create_new":
  1159. new_name_zh = (action_payload.get("new_name_zh") or "").strip()
  1160. if not bd_id:
  1161. return jsonify(
  1162. failed("记录缺少 business_domain_id,无法执行 create_new")
  1163. )
  1164. if not new_name_zh:
  1165. return jsonify(failed("payload.new_name_zh 不能为空"))
  1166. from app.core.meta_data import get_formatted_time
  1167. from app.services.neo4j_driver import neo4j_driver
  1168. with neo4j_driver.get_session() as session:
  1169. # 创建新 DataMeta(避免覆盖旧节点)
  1170. result = session.run(
  1171. """
  1172. CREATE (m:DataMeta {
  1173. name_zh: $name_zh,
  1174. name_en: $name_en,
  1175. data_type: $data_type,
  1176. create_time: $create_time,
  1177. status: true
  1178. })
  1179. RETURN m
  1180. """,
  1181. {
  1182. "name_zh": new_name_zh,
  1183. "name_en": (new_meta.get("name_en") or "").strip(),
  1184. "data_type": (new_meta.get("data_type") or "varchar(255)"),
  1185. "create_time": get_formatted_time(),
  1186. },
  1187. ).single()
  1188. if not result or not result.get("m"):
  1189. return jsonify(failed("创建新元数据失败"))
  1190. new_meta_id = int(result["m"].id)
  1191. session.run(
  1192. """
  1193. MATCH (n:BusinessDomain), (m:DataMeta)
  1194. WHERE id(n) = $domain_id AND id(m) = $meta_id
  1195. MERGE (n)-[:INCLUDES]->(m)
  1196. """,
  1197. {"domain_id": int(bd_id), "meta_id": new_meta_id},
  1198. )
  1199. update_review_record_resolution(
  1200. record,
  1201. action="create_new",
  1202. payload={"new_name_zh": new_name_zh},
  1203. resolved_by=resolved_by,
  1204. notes=notes,
  1205. )
  1206. db.session.commit()
  1207. return jsonify(success(record.to_dict()))
  1208. if action == "accept_change":
  1209. old_meta = record.old_meta or {}
  1210. meta_id = action_payload.get("meta_id") or old_meta.get("meta_id")
  1211. if not meta_id:
  1212. return jsonify(failed("无法确定需要更新的 meta_id"))
  1213. from app.core.meta_data import get_formatted_time
  1214. from app.services.neo4j_driver import neo4j_driver
  1215. before_snapshot = old_meta.get("snapshot") or {}
  1216. after_snapshot = new_meta
  1217. # 写入 Neo4j:更新 DataMeta 属性,并尝试同步标签集合
  1218. with neo4j_driver.get_session() as session:
  1219. name_zh_val = (
  1220. after_snapshot.get("name_zh")
  1221. or before_snapshot.get("name_zh")
  1222. or ""
  1223. ).strip()
  1224. name_en_val = (after_snapshot.get("name_en") or "").strip()
  1225. data_type_val = after_snapshot.get("data_type") or "varchar(255)"
  1226. session.run(
  1227. """
  1228. MATCH (m:DataMeta)
  1229. WHERE id(m) = $meta_id
  1230. SET m.name_zh = $name_zh,
  1231. m.name_en = $name_en,
  1232. m.data_type = $data_type,
  1233. m.updateTime = $update_time,
  1234. m.status = true
  1235. """,
  1236. {
  1237. "meta_id": int(meta_id),
  1238. "name_zh": name_zh_val,
  1239. "name_en": name_en_val,
  1240. "data_type": data_type_val,
  1241. "update_time": get_formatted_time(),
  1242. },
  1243. )
  1244. tag_ids = after_snapshot.get("tag_ids") or []
  1245. tag_ids = [int(t) for t in tag_ids if t is not None]
  1246. if tag_ids:
  1247. session.run(
  1248. """
  1249. MATCH (m:DataMeta)-[r:LABEL]->(:DataLabel)
  1250. WHERE id(m) = $meta_id
  1251. DELETE r
  1252. """,
  1253. {"meta_id": int(meta_id)},
  1254. )
  1255. session.run(
  1256. """
  1257. MATCH (m:DataMeta)
  1258. WHERE id(m) = $meta_id
  1259. WITH m
  1260. UNWIND $tag_ids AS tid
  1261. MATCH (t:DataLabel) WHERE id(t) = tid
  1262. MERGE (m)-[:LABEL]->(t)
  1263. """,
  1264. {"meta_id": int(meta_id), "tag_ids": tag_ids},
  1265. )
  1266. # 写入版本历史(PG)
  1267. history = MetadataVersionHistory()
  1268. history.meta_id = int(meta_id) if meta_id is not None else 0
  1269. history.change_source = "ddl"
  1270. history.before_snapshot = (
  1271. before_snapshot if before_snapshot is not None else {}
  1272. )
  1273. history.after_snapshot = (
  1274. after_snapshot if after_snapshot is not None else {}
  1275. )
  1276. history.created_by = resolved_by if resolved_by is not None else ""
  1277. db.session.add(history)
  1278. update_review_record_resolution(
  1279. record,
  1280. action="accept_change",
  1281. payload={"meta_id": int(meta_id)},
  1282. resolved_by=resolved_by,
  1283. notes=notes,
  1284. )
  1285. db.session.commit()
  1286. return jsonify(success(record.to_dict()))
  1287. if action in ("reject_change", "ignore"):
  1288. update_review_record_resolution(
  1289. record,
  1290. action=action,
  1291. payload=action_payload,
  1292. resolved_by=resolved_by,
  1293. notes=notes,
  1294. )
  1295. db.session.commit()
  1296. return jsonify(success(record.to_dict()))
  1297. return jsonify(failed(f"不支持的action: {action}"))
  1298. except Exception as e:
  1299. logger.error(f"处理审核记录失败: {str(e)}")
  1300. db.session.rollback()
  1301. return jsonify(failed("处理审核记录失败", error=str(e)))