123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- from flask import request, jsonify
- from app.api.data_model import bp
- from app.models.result import success, failed
- from app.api.graph.routes import MyEncoder
- from app.core.data_model import model as model_functions
- import json
- import logging
- # Configure logger
- logger = logging.getLogger(__name__)
- # 2024.09.11 数据模型血缘关系(传入数据资源id)
- @bp.route('/model/data/relation', methods=['POST'])
- def data_model_relation():
- try:
- # 传入请求参数
- receiver = request.get_json()
- resource_ids = receiver['id'] # 给定一个数据资源的id
- response_data = model_functions.handle_model_relation(resource_ids)
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 传入一个数据资源的id,返回多个有血缘关系的数据资源
- @bp.route('/model/relatives/relation', methods=['POST'])
- def data_relatives_relation():
- try:
- # 传入请求参数
- receiver = request.get_json()
- page = int(receiver.get('current', 1))
- page_size = int(receiver.get('size', 10))
- id = receiver['id']
- name_filter = receiver.get('name', None)
- category = receiver.get('category', None)
- time = receiver.get('time', None)
- # 计算跳过的记录的数量
- skip_count = (page - 1) * page_size
- data, total = model_functions.model_resource_list(skip_count, page_size, name_filter, id, category, time)
- response_data = {'records': data, 'total': total, 'size': page_size, 'current': page}
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # DDL数据模型保存
- @bp.route('/data/model/save', methods=['POST'])
- def data_model_save():
- # 传入请求参数
- receiver = request.get_json()
- data_model = receiver['name']
- id_list = receiver['id_list']
- # resource_id和meta_id构成json格式
- result = json.dumps(id_list, ensure_ascii=False)
- try:
- # 从DDL中选取保存数据模型
- result_list = [receiver['en_name']]
- id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver)
- model_functions.handle_no_meta_data_model(id_list, receiver, data_model_node)
- model_functions.calculate_model_level(id)
- res = success({}, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 新建数据模型请求接口(从数据资源中选取)
- # @bp.route('/model/data/search', methods=['POST'])
- @bp.route('/data/search', methods=['POST'])
- def data_model_search():
- # 传入请求参数
- receiver = request.get_json()
- data_model = receiver['name']
- id_list = receiver['id_list']
- # resource_id和meta_id构成json格式
- result = json.dumps(id_list, ensure_ascii=False)
- try:
- from app.core.meta_data import translate_and_parse
- result_list = translate_and_parse(data_model)
- id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver)
- if id_list:
- model_functions.resource_handle_meta_data_model(id_list, id)
- model_functions.calculate_model_level(id)
- res = success({}, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 新建数据模型请求接口(从数据模型中选取)
- @bp.route('/model/data/model/add', methods=['POST'])
- def data_model_model_add():
- # 传入请求参数
- receiver = request.get_json()
- data_model = receiver['name']
- id_list = receiver['id_list']
- # model_id和meta_id构成json格式
- result = json.dumps(id_list, ensure_ascii=False)
- try:
- from app.core.meta_data import translate_and_parse
- result_list = translate_and_parse(data_model)
- node_id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver)
- model_functions.model_handle_meta_data_model(id_list, node_id)
- model_functions.calculate_model_level(node_id)
- # 构建响应数据
- response_data = {
- "id": node_id,
- "name": data_model_node.get('name'),
- "en_name": data_model_node.get('en_name'),
- "description": data_model_node.get('description'),
- "category": data_model_node.get('category'),
- "time": data_model_node.get('time'),
- "level": data_model_node.get('level'),
- "tag": data_model_node.get('tag'),
- "childrenId": data_model_node.get('childrenId', []),
- "leader": data_model_node.get('leader'),
- "origin": data_model_node.get('origin'),
- "frequency": data_model_node.get('frequency'),
- "organization": data_model_node.get('organization'),
- "data_sensitivity": data_model_node.get('data_sensitivity'),
- "status": data_model_node.get('status')
- }
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 数据模型-详情接口
- @bp.route('/data/model/detail', methods=['POST'])
- def data_model_detail():
- try:
- # 传入请求参数
- receiver = request.get_json()
- # 直接使用字符串ID,不做类型转换
- id = receiver.get('id')
- print(f"Received id from frontend: {id}")
- response_data = model_functions.handle_id_model(id)
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- import traceback
- traceback.print_exc()
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 删除数据模型
- @bp.route('/data/model/delete', methods=['POST'])
- def data_model_delete():
- try:
- # 传入请求参数
- receiver = request.get_json()
- id = receiver.get('id')
- print(f"Deleting data model with id: {id}")
- # 首先删除数据模型节点
- from app.services.neo4j_driver import neo4j_driver
- query = """
- MATCH (n:DataModel) where id(n) = $nodeId
- detach delete n
- """
- with neo4j_driver.get_session() as session:
- session.run(query, nodeId=id)
- res = success({}, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- import traceback
- traceback.print_exc()
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 列表 数据模型查询
- @bp.route('/data/model/list', methods=['POST'])
- def data_model_list():
- try:
- # 传入请求参数
- receiver = request.get_json()
- page = int(receiver.get('current', 1))
- page_size = int(receiver.get('size', 10))
- name_filter = receiver.get('name', None)
- en_name_filter = receiver.get('en_name', None)
- category = receiver.get('category', None)
- tag = receiver.get('tag', None)
- level = receiver.get('level', None)
- # 计算跳过的记录的数量
- skip_count = (page - 1) * page_size
- data, total = model_functions.model_list(skip_count, page_size, en_name_filter,
- name_filter, category, tag, level)
- response_data = {'records': data, 'total': total, 'size': page_size, 'current': page}
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- res = failed({}, {"error": f"{e}"})
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 数据模型的图谱(血缘关系Kinship+影响关系Impact+所有关系all+社区关系community)
- @bp.route('/data/model/graph/all', methods=['POST'])
- def data_model_graph_all():
- try:
- # 传入请求参数
- receiver = request.get_json()
- type = receiver['type'] # kinship/impact/all/community
- if type == 'community':
- # 社区图谱查询,提取tag参数
- tag = receiver.get('tag', None)
- result = model_functions.model_community(tag)
- else:
- # 非社区查询时才提取nodeid和meta参数
- nodeid = receiver.get('id')
- meta = receiver.get('meta', False) # true/false 是否返回元数据
-
- if type == 'kinship':
- result = model_functions.model_kinship_graph(nodeid, meta)
- elif type == 'impact':
- result = model_functions.model_impact_graph(nodeid, meta)
- else:
- result = model_functions.model_all_graph(nodeid, meta)
- return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder)
- # 数据模型的列表图谱
- @bp.route('/data/model/list/graph', methods=['POST'])
- def data_model_list_graph():
- try:
- # 传入请求参数
- receiver = request.get_json()
- if not receiver or 'tag' not in receiver:
- raise ValueError("Missing 'tag' parameter in request body")
- nodeid = receiver['tag']
- # 构建查询条件
- params = {}
- where_clause = ""
- if nodeid is not None:
- where_clause = "MATCH (n)-[:LABEL]->(la) WHERE id(la) = $nodeId"
- params['nodeId'] = nodeid
- from app.services.neo4j_driver import neo4j_driver
- query = f"""
- MATCH (n:DataModel)
- OPTIONAL MATCH (n)-[:child]->(child)
- {where_clause}
- WITH
- collect(DISTINCT {{id: toString(id(n)), text: n.name, type: split(labels(n)[0], '_')[1]}}) AS nodes,
- collect(DISTINCT {{id: toString(id(child)), text: child.name, type: split(labels(child)[0], '_')[1]}}) AS nodes2,
- collect(DISTINCT {{from: toString(id(n)), to: toString(id(child)), text: '下级'}}) AS lines
- RETURN nodes + nodes2 AS nodes, lines AS lines
- """
- with neo4j_driver.get_session() as session:
- result = session.run(query, **params)
- data = result.data()
-
- if len(data) > 0:
- # 过滤掉空节点(即 id 为 null 的节点)
- nodes = []
- for node in data[0]['nodes']:
- if node['id'] != 'null':
- nodes.append(node)
-
- lines = data[0]['lines']
-
- response_data = {
- 'nodes': nodes,
- 'edges': lines
- }
-
- return json.dumps(success(response_data, "success"), ensure_ascii=False, cls=MyEncoder)
- else:
- return json.dumps(success({'nodes': [], 'edges': []}, "No data found"), ensure_ascii=False, cls=MyEncoder)
-
- except Exception as e:
- return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder)
- # 更新数据模型
- @bp.route('/data/model/update', methods=['POST'])
- def data_model_update():
- try:
- # 传入请求参数
- receiver = request.get_json()
-
- result = model_functions.data_model_edit(receiver)
-
- return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder)
- # 数据模型关联元数据搜索
- @bp.route('/search', methods=['POST'])
- def data_model_metadata_search():
- """数据模型关联元数据搜索"""
- try:
- # 获取分页和筛选参数
- page = int(request.json.get('current', 1))
- page_size = int(request.json.get('size', 10))
- model_id = request.json.get('id')
-
- en_name_filter = request.json.get('en_name')
- name_filter = request.json.get('name')
- category_filter = request.json.get('category')
- tag_filter = request.json.get('tag')
-
- if model_id is None:
- return json.dumps(failed({}, "模型ID不能为空"), ensure_ascii=False, cls=MyEncoder)
-
- # 确保传入的ID为整数
- try:
- model_id = int(model_id)
- except (ValueError, TypeError):
- return json.dumps(failed({}, f"模型ID必须为整数, 收到的是: {model_id}"), ensure_ascii=False, cls=MyEncoder)
-
- # 记录请求信息
- logger.info(f"获取数据模型关联元数据请求,ID: {model_id}")
-
- # 调用业务逻辑查询关联元数据
- metadata_list, total_count = model_functions.model_search_list(
- model_id,
- page,
- page_size,
- en_name_filter,
- name_filter,
- category_filter,
- tag_filter
- )
-
- # 返回结果
- response_data = {
- "records": metadata_list,
- "total": total_count,
- "size": page_size,
- "current": page
- }
- res = success(response_data, "success")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"数据模型关联元数据搜索失败: {str(e)}")
- res = failed({}, str(e))
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
|