routes.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. from flask import request, jsonify
  2. from app.api.data_model import bp
  3. from app.models.result import success, failed
  4. from app.api.graph.routes import MyEncoder
  5. from app.core.data_model import model as model_functions
  6. import json
  7. import logging
  8. # Configure logger
  9. logger = logging.getLogger(__name__)
  10. # 2024.09.11 数据模型血缘关系(传入数据资源id)
  11. @bp.route('/model/data/relation', methods=['POST'])
  12. def data_model_relation():
  13. try:
  14. # 传入请求参数
  15. receiver = request.get_json()
  16. resource_ids = receiver['id'] # 给定一个数据资源的id
  17. response_data = model_functions.handle_model_relation(resource_ids)
  18. res = success(response_data, "success")
  19. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  20. except Exception as e:
  21. res = failed(str(e), 500, {})
  22. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  23. # 传入一个数据资源的id,返回多个有血缘关系的数据资源
  24. @bp.route('/model/relatives/relation', methods=['POST'])
  25. def data_relatives_relation():
  26. try:
  27. # 传入请求参数
  28. receiver = request.get_json()
  29. page = int(receiver.get('current', 1))
  30. page_size = int(receiver.get('size', 10))
  31. id = receiver['id']
  32. name_zh_filter = receiver.get('name_zh', None)
  33. category = receiver.get('category', None)
  34. time = receiver.get('time', None)
  35. # 计算跳过的记录的数量
  36. skip_count = (page - 1) * page_size
  37. data, total = model_functions.model_resource_list(skip_count, page_size, name_zh_filter, id, category, time)
  38. response_data = {'records': data, 'total': total, 'size': page_size, 'current': page}
  39. res = success(response_data, "success")
  40. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  41. except Exception as e:
  42. res = failed(str(e), 500, {})
  43. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  44. # DDL数据模型保存
  45. @bp.route('/data/model/save', methods=['POST'])
  46. def data_model_save():
  47. # 传入请求参数
  48. receiver = request.get_json()
  49. data_model = receiver['name_zh']
  50. # DDL新增时,id_list(包含resource_id)不是必填项
  51. id_list = receiver.get('id_list', [])
  52. data_source = receiver.get('data_source') # 获取data_source节点ID
  53. # resource_id和meta_id构成json格式
  54. result = json.dumps(id_list, ensure_ascii=False)
  55. try:
  56. # 从DDL中选取保存数据模型(支持data_source参数)
  57. result_list = [receiver['name_en']]
  58. id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver)
  59. # 只有在id_list不为空时才处理资源关系
  60. if id_list:
  61. model_functions.handle_no_meta_data_model(id_list, receiver, data_model_node)
  62. model_functions.calculate_model_level(id)
  63. # 创建BusinessDomain节点及其关联关系
  64. try:
  65. bd_id, bd_node = model_functions.handle_businessdomain_node(
  66. data_model, result_list, result, receiver, id_list
  67. )
  68. logger.info(f"成功创建BusinessDomain节点,ID: {bd_id}")
  69. except Exception as bd_error:
  70. # BusinessDomain创建失败不应该影响主流程
  71. logger.error(f"创建BusinessDomain节点失败(不中断主流程): {str(bd_error)}")
  72. # 查询节点的实际属性(data_model_node 可能只是整数ID)
  73. from app.services.neo4j_driver import neo4j_driver
  74. with neo4j_driver.get_session() as session:
  75. node_query = """
  76. MATCH (n:DataModel) WHERE id(n) = $node_id
  77. RETURN n.name_zh as name_zh, n.name_en as name_en, n.description as description,
  78. n.category as category, n.create_time as create_time, n.level as level,
  79. n.tag as tag, n.leader as leader, n.origin as origin, n.frequency as frequency,
  80. n.organization as organization, n.data_sensitivity as data_sensitivity, n.status as status
  81. """
  82. node_result = session.run(node_query, node_id=int(id))
  83. node_record = node_result.single()
  84. # 构建响应数据 - data_model包装格式
  85. response_data = {
  86. "data_model": {
  87. "id": id,
  88. "name_zh": node_record['name_zh'] if node_record else None,
  89. "name_en": node_record['name_en'] if node_record else None,
  90. "description": node_record['description'] if node_record else None,
  91. "category": node_record['category'] if node_record else None,
  92. "create_time": node_record['create_time'] if node_record else None,
  93. "level": node_record['level'] if node_record else None,
  94. "tag": node_record['tag'] if node_record else None,
  95. "leader": node_record['leader'] if node_record else None,
  96. "origin": node_record['origin'] if node_record else None,
  97. "frequency": node_record['frequency'] if node_record else None,
  98. "organization": node_record['organization'] if node_record else None,
  99. "data_sensitivity": node_record['data_sensitivity'] if node_record else None,
  100. "status": node_record['status'] if node_record else None,
  101. "data_source": data_source # 数据源节点ID
  102. }
  103. }
  104. res = success(response_data, "success")
  105. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  106. except Exception as e:
  107. res = failed(str(e), 500, {})
  108. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  109. # 新建数据模型请求接口(从数据资源中选取)
  110. # @bp.route('/model/data/search', methods=['POST'])
  111. @bp.route('/data/search', methods=['POST'])
  112. def data_model_search():
  113. # 传入请求参数
  114. receiver = request.get_json()
  115. data_model = receiver['name_zh']
  116. id_list = receiver['id_list']
  117. data_source = receiver.get('data_source') # 获取data_source节点ID
  118. # resource_id和meta_id构成json格式
  119. result = json.dumps(id_list, ensure_ascii=False)
  120. try:
  121. # 从数据资源中选取保存数据模型(支持data_source参数)
  122. from app.core.meta_data import translate_and_parse
  123. result_list = translate_and_parse(data_model)
  124. id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver)
  125. if id_list:
  126. model_functions.resource_handle_meta_data_model(id_list, id)
  127. model_functions.calculate_model_level(id)
  128. # 查询节点的实际属性(data_model_node 可能只是整数ID)
  129. from app.services.neo4j_driver import neo4j_driver
  130. with neo4j_driver.get_session() as session:
  131. node_query = """
  132. MATCH (n:DataModel) WHERE id(n) = $node_id
  133. RETURN n.name_zh as name_zh, n.name_en as name_en, n.description as description,
  134. n.category as category, n.create_time as create_time, n.level as level,
  135. n.tag as tag, n.leader as leader, n.origin as origin, n.frequency as frequency,
  136. n.organization as organization, n.data_sensitivity as data_sensitivity, n.status as status
  137. """
  138. node_result = session.run(node_query, node_id=int(id))
  139. node_record = node_result.single()
  140. # 构建响应数据 - data_model包装格式
  141. response_data = {
  142. "data_model": {
  143. "id": id,
  144. "name_zh": node_record['name_zh'] if node_record else None,
  145. "name_en": node_record['name_en'] if node_record else None,
  146. "description": node_record['description'] if node_record else None,
  147. "category": node_record['category'] if node_record else None,
  148. "create_time": node_record['create_time'] if node_record else None,
  149. "level": node_record['level'] if node_record else None,
  150. "tag": node_record['tag'] if node_record else None,
  151. "leader": node_record['leader'] if node_record else None,
  152. "origin": node_record['origin'] if node_record else None,
  153. "frequency": node_record['frequency'] if node_record else None,
  154. "organization": node_record['organization'] if node_record else None,
  155. "data_sensitivity": node_record['data_sensitivity'] if node_record else None,
  156. "status": node_record['status'] if node_record else None,
  157. "data_source": data_source # 数据源节点ID
  158. }
  159. }
  160. res = success(response_data, "success")
  161. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  162. except Exception as e:
  163. res = failed(str(e), 500, {})
  164. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  165. # 新建数据模型请求接口(从数据模型中选取)
  166. @bp.route('/model/data/model/add', methods=['POST'])
  167. def data_model_model_add():
  168. # 传入请求参数
  169. receiver = request.get_json()
  170. data_model = receiver['name_zh']
  171. id_list = receiver['id_list']
  172. data_source = receiver.get('data_source') # 获取data_source节点ID
  173. # model_id和meta_id构成json格式
  174. result = json.dumps(id_list, ensure_ascii=False)
  175. try:
  176. # 从数据模型中选取保存数据模型
  177. # handle_data_model 已经处理了 data_source 关系创建(支持 int/dict/string 格式)
  178. from app.core.meta_data import translate_and_parse
  179. result_list = translate_and_parse(data_model)
  180. node_id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver)
  181. model_functions.model_handle_meta_data_model(id_list, node_id)
  182. model_functions.calculate_model_level(node_id)
  183. # 查询节点的实际属性(data_model_node 可能只是整数ID)
  184. from app.services.neo4j_driver import neo4j_driver
  185. with neo4j_driver.get_session() as session:
  186. node_query = """
  187. MATCH (n:DataModel) WHERE id(n) = $node_id
  188. RETURN n.name_zh as name_zh, n.name_en as name_en, n.description as description,
  189. n.category as category, n.create_time as create_time, n.level as level,
  190. n.tag as tag, n.leader as leader, n.origin as origin, n.frequency as frequency,
  191. n.organization as organization, n.data_sensitivity as data_sensitivity, n.status as status
  192. """
  193. node_result = session.run(node_query, node_id=int(node_id))
  194. node_record = node_result.single()
  195. # 构建响应数据 - data_model包装格式
  196. response_data = {
  197. "data_model": {
  198. "id": node_id,
  199. "name_zh": node_record['name_zh'] if node_record else None,
  200. "name_en": node_record['name_en'] if node_record else None,
  201. "description": node_record['description'] if node_record else None,
  202. "category": node_record['category'] if node_record else None,
  203. "create_time": node_record['create_time'] if node_record else None,
  204. "level": node_record['level'] if node_record else None,
  205. "tag": node_record['tag'] if node_record else None,
  206. "leader": node_record['leader'] if node_record else None,
  207. "origin": node_record['origin'] if node_record else None,
  208. "frequency": node_record['frequency'] if node_record else None,
  209. "organization": node_record['organization'] if node_record else None,
  210. "data_sensitivity": node_record['data_sensitivity'] if node_record else None,
  211. "status": node_record['status'] if node_record else None,
  212. "data_source": data_source # 数据源节点ID,与name_zh在同一级别
  213. }
  214. }
  215. res = success(response_data, "success")
  216. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  217. except Exception as e:
  218. res = failed(str(e), 500, {})
  219. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  220. # 数据模型-详情接口
  221. @bp.route('/data/model/detail', methods=['POST'])
  222. def data_model_detail():
  223. try:
  224. # 传入请求参数
  225. receiver = request.get_json()
  226. # 直接使用字符串ID,不做类型转换
  227. id = receiver.get('id')
  228. print(f"Received id from frontend: {id}")
  229. result_data = model_functions.handle_id_model(id)
  230. # handle_id_model 返回的数据格式是 {"data_model": {...}}
  231. # 提取内部的 data_model 数据
  232. model_data = result_data.get("data_model", {})
  233. # 查询关联的数据源信息
  234. from app.services.neo4j_driver import neo4j_driver
  235. data_source_id = None
  236. try:
  237. model_id = int(id)
  238. with neo4j_driver.get_session() as session:
  239. # 查询数据模型关联的数据源节点
  240. ds_cypher = """
  241. MATCH (m:DataModel)-[:COME_FROM]->(ds:DataSource)
  242. WHERE id(m) = $model_id
  243. RETURN id(ds) as ds_id
  244. """
  245. ds_result = session.run(ds_cypher, model_id=model_id)
  246. ds_record = ds_result.single()
  247. if ds_record:
  248. # 如果存在数据源关联,只返回ID
  249. data_source_id = ds_record['ds_id']
  250. logger.info(f"找到数据模型关联的数据源: model_id={model_id}, data_source_id={data_source_id}")
  251. else:
  252. logger.info(f"数据模型未关联数据源: model_id={model_id}")
  253. except Exception as e:
  254. # 数据源查询失败不应该影响主流程
  255. logger.error(f"查询数据源关联失败(不中断主流程): {str(e)}")
  256. # 删除 childrenId 字段(如果存在)
  257. if 'childrenId' in model_data:
  258. del model_data['childrenId']
  259. # 添加 data_source 字段
  260. model_data['data_source'] = data_source_id
  261. # 构建响应数据 - data_model包装格式
  262. response_data = {
  263. "data_model": model_data
  264. }
  265. res = success(response_data, "success")
  266. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  267. except Exception as e:
  268. import traceback
  269. traceback.print_exc()
  270. res = failed(str(e), 500, {})
  271. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  272. # 删除数据模型
  273. @bp.route('/data/model/delete', methods=['POST'])
  274. def data_model_delete():
  275. try:
  276. # 传入请求参数
  277. receiver = request.get_json()
  278. id = receiver.get('id')
  279. print(f"Deleting data model with id: {id}")
  280. # 首先删除数据模型节点
  281. from app.services.neo4j_driver import neo4j_driver
  282. query = """
  283. MATCH (n:DataModel) where id(n) = $nodeId
  284. detach delete n
  285. """
  286. with neo4j_driver.get_session() as session:
  287. session.run(query, nodeId=id)
  288. res = success({}, "success")
  289. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  290. except Exception as e:
  291. import traceback
  292. traceback.print_exc()
  293. res = failed(str(e), 500, {})
  294. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  295. # 列表 数据模型查询
  296. @bp.route('/data/model/list', methods=['POST'])
  297. def data_model_list():
  298. try:
  299. # 传入请求参数
  300. receiver = request.get_json()
  301. page = int(receiver.get('current', 1))
  302. page_size = int(receiver.get('size', 10))
  303. name_zh_filter = receiver.get('name_zh', None)
  304. name_en_filter = receiver.get('name_en', None)
  305. category = receiver.get('category', None)
  306. tag = receiver.get('tag', None)
  307. level = receiver.get('level', None)
  308. # 计算跳过的记录的数量
  309. skip_count = (page - 1) * page_size
  310. data, total = model_functions.model_list(skip_count, page_size, name_en_filter,
  311. name_zh_filter, category, tag, level)
  312. response_data = {'records': data, 'total': total, 'size': page_size, 'current': page}
  313. res = success(response_data, "success")
  314. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  315. except Exception as e:
  316. res = failed(str(e), 500, {})
  317. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  318. # 数据模型的图谱(血缘关系Kinship+影响关系Impact+所有关系all+社区关系community)
  319. @bp.route('/data/model/graph/all', methods=['POST'])
  320. def data_model_graph_all():
  321. try:
  322. # 传入请求参数
  323. receiver = request.get_json()
  324. type = receiver['type'] # kinship/impact/all/community
  325. if type == 'community':
  326. # 社区图谱查询,提取tag参数
  327. tag = receiver.get('tag', None)
  328. result = model_functions.model_community(tag)
  329. else:
  330. # 非社区查询时才提取nodeid和meta参数
  331. nodeid = receiver.get('id')
  332. meta = receiver.get('meta', False) # true/false 是否返回元数据
  333. if type == 'kinship':
  334. result = model_functions.model_kinship_graph(nodeid, meta)
  335. elif type == 'impact':
  336. result = model_functions.model_impact_graph(nodeid, meta)
  337. else:
  338. result = model_functions.model_all_graph(nodeid, meta)
  339. return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder)
  340. except Exception as e:
  341. return json.dumps(failed(str(e), 500, {}), ensure_ascii=False, cls=MyEncoder)
  342. # 数据模型的列表图谱
  343. @bp.route('/data/model/list/graph', methods=['POST'])
  344. def data_model_list_graph():
  345. try:
  346. # 传入请求参数
  347. receiver = request.get_json()
  348. if not receiver or 'tag' not in receiver:
  349. raise ValueError("Missing 'tag' parameter in request body")
  350. nodeid = receiver['tag']
  351. # 构建查询条件
  352. params = {}
  353. where_clause = ""
  354. if nodeid is not None:
  355. where_clause = "MATCH (n)-[:LABEL]->(la) WHERE id(la) = $nodeId"
  356. params['nodeId'] = nodeid
  357. from app.services.neo4j_driver import neo4j_driver
  358. query = f"""
  359. MATCH (n:DataModel)
  360. OPTIONAL MATCH (n)-[:child]->(child)
  361. {where_clause}
  362. WITH
  363. collect(DISTINCT {{id: toString(id(n)), text: n.name_zh, type: split(labels(n)[0], '_')[1]}}) AS nodes,
  364. collect(DISTINCT {{id: toString(id(child)), text: child.name_zh, type: split(labels(child)[0], '_')[1]}}) AS nodes2,
  365. collect(DISTINCT {{from: toString(id(n)), to: toString(id(child)), text: '下级'}}) AS lines
  366. RETURN nodes + nodes2 AS nodes, lines AS lines
  367. """
  368. with neo4j_driver.get_session() as session:
  369. result = session.run(query, **params)
  370. data = result.data()
  371. if len(data) > 0:
  372. # 过滤掉空节点(即 id 为 null 的节点)
  373. nodes = []
  374. for node in data[0]['nodes']:
  375. if node['id'] != 'null':
  376. nodes.append(node)
  377. lines = data[0]['lines']
  378. response_data = {
  379. 'nodes': nodes,
  380. 'edges': lines
  381. }
  382. return json.dumps(success(response_data, "success"), ensure_ascii=False, cls=MyEncoder)
  383. else:
  384. return json.dumps(success({'nodes': [], 'edges': []}, "No data found"), ensure_ascii=False, cls=MyEncoder)
  385. except Exception as e:
  386. return json.dumps(failed(str(e), 500, {}), ensure_ascii=False, cls=MyEncoder)
  387. # 更新数据模型
  388. @bp.route('/data/model/update', methods=['POST'])
  389. def data_model_update():
  390. try:
  391. # 传入请求参数
  392. receiver = request.get_json()
  393. result = model_functions.data_model_edit(receiver)
  394. return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder)
  395. except Exception as e:
  396. return json.dumps(failed(str(e), 500, {}), ensure_ascii=False, cls=MyEncoder)
  397. # 数据模型关联元数据搜索
  398. @bp.route('/search', methods=['POST'])
  399. def data_model_metadata_search():
  400. """数据模型关联元数据搜索"""
  401. try:
  402. # 获取分页和筛选参数
  403. receiver = request.get_json()
  404. page = int(receiver.get('current', 1))
  405. page_size = int(receiver.get('size', 10))
  406. model_id = receiver.get('id')
  407. name_en_filter = receiver.get('name_en')
  408. name_zh_filter = receiver.get('name_zh')
  409. category_filter = receiver.get('category')
  410. tag_filter = receiver.get('tag')
  411. if model_id is None:
  412. return json.dumps(failed("模型ID不能为空", 400, {}), ensure_ascii=False, cls=MyEncoder)
  413. # 确保传入的ID为整数
  414. try:
  415. model_id = int(model_id)
  416. except (ValueError, TypeError):
  417. return json.dumps(failed(f"模型ID必须为整数, 收到的是: {model_id}", 400, {}), ensure_ascii=False, cls=MyEncoder)
  418. # 记录请求信息
  419. logger.info(f"获取数据模型关联元数据请求,ID: {model_id}")
  420. # 调用业务逻辑查询关联元数据
  421. metadata_list, total_count = model_functions.model_search_list(
  422. model_id,
  423. page,
  424. page_size,
  425. name_en_filter,
  426. name_zh_filter,
  427. category_filter,
  428. tag_filter
  429. )
  430. # 返回结果
  431. response_data = {
  432. "records": metadata_list,
  433. "total": total_count,
  434. "size": page_size,
  435. "current": page
  436. }
  437. res = success(response_data, "success")
  438. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  439. except Exception as e:
  440. logger.error(f"数据模型关联元数据搜索失败: {str(e)}")
  441. res = failed(str(e), 500, {})
  442. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)