from flask import request from app import app from app.models.result import success, failed from app.routes.graph_routes import MyEncoder from app.core.data_model import model as model_functions import json # 2024.09.11 数据模型血缘关系(传入数据资源id) @app.route('/model/data/relation', methods=['POST']) def data_model_relation(): try: # 传入请求参数 receiver = request.get_json() resource_ids = receiver['id'] # 给定一个数据资源的id response_data = model_functions.handle_model_relation(resource_ids) res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 传入一个数据资源的id,返回多个有血缘关系的数据资源 @app.route('/model/relatives/relation', methods=['POST']) def data_relatives_relation(): try: # 传入请求参数 receiver = request.get_json() page = int(receiver.get('current', 1)) page_size = int(receiver.get('size', 10)) id = receiver['id'] name_filter = receiver.get('name', None) category = receiver.get('category', None) time = receiver.get('time', None) # 计算跳过的记录的数量 skip_count = (page - 1) * page_size data, total = model_functions.model_resource_list(skip_count, page_size, name_filter, id, category, time) response_data = {'records': data, 'total': total, 'size': page_size, 'current': page} res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # DDL数据模型保存 @app.route('/data/model/save', methods=['POST']) def data_model_save(): # 传入请求参数 receiver = request.get_json() data_model = receiver['name'] id_list = receiver['id_list'] # resource_id和meta_id构成json格式 result = json.dumps(id_list, ensure_ascii=False) try: # 从DDL中选取保存数据模型 result_list = [receiver['en_name']] id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver) model_functions.handle_no_meta_data_model(id_list, receiver, data_model_node) model_functions.calculate_model_level(id) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 新建数据模型请求接口(从数据资源中选取) @app.route('/model/data/search', methods=['POST']) def data_model_search(): # 传入请求参数 receiver = request.get_json() data_model = receiver['name'] id_list = receiver['id_list'] # resource_id和meta_id构成json格式 result = json.dumps(id_list, ensure_ascii=False) try: from app.core.meta_data import translate_and_parse result_list = translate_and_parse(data_model) id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver) if id_list: model_functions.resource_handle_meta_data_model(id_list, id) model_functions.calculate_model_level(id) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 新建数据模型请求接口(从数据模型中选取) @app.route('/model/data/model/add', methods=['POST']) def data_model_model_add(): # 传入请求参数 receiver = request.get_json() data_model = receiver['name'] id_list = receiver['id_list'] # model_id和meta_id构成json格式 result = json.dumps(id_list, ensure_ascii=False) try: from app.core.meta_data import translate_and_parse result_list = translate_and_parse(data_model) id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver) model_functions.model_handle_meta_data_model(id_list, id) model_functions.calculate_model_level(id) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 数据模型-详情接口 @app.route('/data/model/detail', methods=['POST']) def data_model_detail(): try: # 传入请求参数 receiver = request.get_json() id = int(receiver.get('id')) response_data = model_functions.handle_id_model(id) res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 删除数据模型 @app.route('/data/model/delete', methods=['POST']) def data_model_delete(): try: # 传入请求参数 receiver = request.get_json() id = receiver['id'] # 首先删除数据模型节点 from app.services.neo4j_driver import neo4j_driver query = """ MATCH (n:data_model) where id(n) = $nodeId detach delete n """ with neo4j_driver.get_session() as session: session.run(query, nodeId=id) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 列表 数据模型查询 @app.route('/data/model/list', methods=['POST']) def data_model_list(): try: # 传入请求参数 receiver = request.get_json() page = int(receiver.get('current', 1)) page_size = int(receiver.get('size', 10)) name_filter = receiver.get('name', None) en_name_filter = receiver.get('en_name', None) category = receiver.get('category', None) tag = receiver.get('tag', None) level = receiver.get('level', None) # 计算跳过的记录的数量 skip_count = (page - 1) * page_size data, total = model_functions.model_list(skip_count, page_size, en_name_filter, name_filter, category, tag, level) response_data = {'records': data, 'total': total, 'size': page_size, 'current': page} res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 数据模型的图谱(血缘关系Kinship+影响关系Impact+所有关系all) @app.route('/data/model/graph/all', methods=['POST']) def data_model_graph_all(): try: # 传入请求参数 receiver = request.get_json() nodeid = receiver['id'] type = receiver['type'] # kinship/impact/all meta = receiver['meta'] # true/false 是否返回元数据 if type == 'kinship': result = model_functions.model_kinship_graph(nodeid, meta) elif type == 'impact': result = model_functions.model_impact_graph(nodeid, meta) else: result = model_functions.model_all_graph(nodeid, meta) return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) # 数据模型的列表图谱 @app.route('/data/model/list/graph', methods=['POST']) def data_model_list_graph(): try: # 传入请求参数 receiver = request.get_json() if not receiver or 'tag' not in receiver: raise ValueError("Missing 'tag' parameter in request body") nodeid = receiver['tag'] # 构建查询条件 params = {} where_clause = "" if nodeid is not None: where_clause = "MATCH (n)-[:label]->(la) WHERE id(la) = $nodeId" params['nodeId'] = nodeid from app.services.neo4j_driver import neo4j_driver query = f""" MATCH (n:data_model) OPTIONAL MATCH (n)-[:child]->(child) {where_clause} WITH collect(DISTINCT {{id: toString(id(n)), text: n.name, type: split(labels(n)[0], '_')[1]}}) AS nodes, collect(DISTINCT {{id: toString(id(child)), text: child.name, type: split(labels(child)[0], '_')[1]}}) AS nodes2, collect(DISTINCT {{from: toString(id(n)), to: toString(id(child)), text: '下级'}}) AS lines RETURN nodes + nodes2 AS nodes, lines AS lines """ with neo4j_driver.get_session() as session: result = session.run(query, **params) data = result.data() if len(data) > 0: # 过滤掉空节点(即 id 为 null 的节点) nodes = [] for node in data[0]['nodes']: if node['id'] != 'null': nodes.append(node) lines = data[0]['lines'] response_data = { 'nodes': nodes, 'edges': lines } return json.dumps(success(response_data, "success"), ensure_ascii=False, cls=MyEncoder) else: return json.dumps(success({'nodes': [], 'edges': []}, "No data found"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) # 更新数据模型 @app.route('/data/model/update', methods=['POST']) def data_model_update(): try: # 传入请求参数 receiver = request.get_json() result = model_functions.data_model_edit(receiver) return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder)