from flask import request, jsonify from app.api.data_model import bp from app.models.result import success, failed from app.api.graph.routes import MyEncoder from app.core.data_model import model as model_functions import json import logging # Configure logger logger = logging.getLogger(__name__) # 2024.09.11 数据模型血缘关系(传入数据资源id) @bp.route('/model/data/relation', methods=['POST']) def data_model_relation(): try: # 传入请求参数 receiver = request.get_json() resource_ids = receiver['id'] # 给定一个数据资源的id response_data = model_functions.handle_model_relation(resource_ids) res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 传入一个数据资源的id,返回多个有血缘关系的数据资源 @bp.route('/model/relatives/relation', methods=['POST']) def data_relatives_relation(): try: # 传入请求参数 receiver = request.get_json() page = int(receiver.get('current', 1)) page_size = int(receiver.get('size', 10)) id = receiver['id'] name_zh_filter = receiver.get('name_zh', None) category = receiver.get('category', None) time = receiver.get('time', None) # 计算跳过的记录的数量 skip_count = (page - 1) * page_size data, total = model_functions.model_resource_list(skip_count, page_size, name_zh_filter, id, category, time) response_data = {'records': data, 'total': total, 'size': page_size, 'current': page} res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # DDL数据模型保存 @bp.route('/data/model/save', methods=['POST']) def data_model_save(): # 传入请求参数 receiver = request.get_json() data_model = receiver['name_zh'] id_list = receiver['id_list'] data_source = receiver.get('data_source') # 获取data_source节点ID # resource_id和meta_id构成json格式 result = json.dumps(id_list, ensure_ascii=False) try: # 从DDL中选取保存数据模型(支持data_source参数) result_list = [receiver['name_en']] id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver) model_functions.handle_no_meta_data_model(id_list, receiver, data_model_node) model_functions.calculate_model_level(id) # 查询节点的实际属性(data_model_node 可能只是整数ID) from app.services.neo4j_driver import neo4j_driver with neo4j_driver.get_session() as session: node_query = """ MATCH (n:DataModel) WHERE id(n) = $node_id RETURN n.name_zh as name_zh, n.name_en as name_en, n.description as description, n.category as category, n.create_time as create_time, n.level as level, n.tag as tag, n.leader as leader, n.origin as origin, n.frequency as frequency, n.organization as organization, n.data_sensitivity as data_sensitivity, n.status as status """ node_result = session.run(node_query, node_id=int(id)) node_record = node_result.single() # 构建响应数据 - data_model包装格式 response_data = { "data_model": { "id": id, "name_zh": node_record['name_zh'] if node_record else None, "name_en": node_record['name_en'] if node_record else None, "description": node_record['description'] if node_record else None, "category": node_record['category'] if node_record else None, "create_time": node_record['create_time'] if node_record else None, "level": node_record['level'] if node_record else None, "tag": node_record['tag'] if node_record else None, "leader": node_record['leader'] if node_record else None, "origin": node_record['origin'] if node_record else None, "frequency": node_record['frequency'] if node_record else None, "organization": node_record['organization'] if node_record else None, "data_sensitivity": node_record['data_sensitivity'] if node_record else None, "status": node_record['status'] if node_record else None, "data_source": data_source # 数据源节点ID } } res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 新建数据模型请求接口(从数据资源中选取) # @bp.route('/model/data/search', methods=['POST']) @bp.route('/data/search', methods=['POST']) def data_model_search(): # 传入请求参数 receiver = request.get_json() data_model = receiver['name_zh'] id_list = receiver['id_list'] data_source = receiver.get('data_source') # 获取data_source节点ID # resource_id和meta_id构成json格式 result = json.dumps(id_list, ensure_ascii=False) try: # 从数据资源中选取保存数据模型(支持data_source参数) from app.core.meta_data import translate_and_parse result_list = translate_and_parse(data_model) id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver) if id_list: model_functions.resource_handle_meta_data_model(id_list, id) model_functions.calculate_model_level(id) # 查询节点的实际属性(data_model_node 可能只是整数ID) from app.services.neo4j_driver import neo4j_driver with neo4j_driver.get_session() as session: node_query = """ MATCH (n:DataModel) WHERE id(n) = $node_id RETURN n.name_zh as name_zh, n.name_en as name_en, n.description as description, n.category as category, n.create_time as create_time, n.level as level, n.tag as tag, n.leader as leader, n.origin as origin, n.frequency as frequency, n.organization as organization, n.data_sensitivity as data_sensitivity, n.status as status """ node_result = session.run(node_query, node_id=int(id)) node_record = node_result.single() # 构建响应数据 - data_model包装格式 response_data = { "data_model": { "id": id, "name_zh": node_record['name_zh'] if node_record else None, "name_en": node_record['name_en'] if node_record else None, "description": node_record['description'] if node_record else None, "category": node_record['category'] if node_record else None, "create_time": node_record['create_time'] if node_record else None, "level": node_record['level'] if node_record else None, "tag": node_record['tag'] if node_record else None, "leader": node_record['leader'] if node_record else None, "origin": node_record['origin'] if node_record else None, "frequency": node_record['frequency'] if node_record else None, "organization": node_record['organization'] if node_record else None, "data_sensitivity": node_record['data_sensitivity'] if node_record else None, "status": node_record['status'] if node_record else None, "data_source": data_source # 数据源节点ID } } res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 新建数据模型请求接口(从数据模型中选取) @bp.route('/model/data/model/add', methods=['POST']) def data_model_model_add(): # 传入请求参数 receiver = request.get_json() data_model = receiver['name_zh'] id_list = receiver['id_list'] data_source = receiver.get('data_source') # 获取data_source节点ID # model_id和meta_id构成json格式 result = json.dumps(id_list, ensure_ascii=False) try: # 从数据模型中选取保存数据模型 # handle_data_model 已经处理了 data_source 关系创建(支持 int/dict/string 格式) from app.core.meta_data import translate_and_parse result_list = translate_and_parse(data_model) node_id, data_model_node = model_functions.handle_data_model(data_model, result_list, result, receiver) model_functions.model_handle_meta_data_model(id_list, node_id) model_functions.calculate_model_level(node_id) # 查询节点的实际属性(data_model_node 可能只是整数ID) from app.services.neo4j_driver import neo4j_driver with neo4j_driver.get_session() as session: node_query = """ MATCH (n:DataModel) WHERE id(n) = $node_id RETURN n.name_zh as name_zh, n.name_en as name_en, n.description as description, n.category as category, n.create_time as create_time, n.level as level, n.tag as tag, n.leader as leader, n.origin as origin, n.frequency as frequency, n.organization as organization, n.data_sensitivity as data_sensitivity, n.status as status """ node_result = session.run(node_query, node_id=int(node_id)) node_record = node_result.single() # 构建响应数据 - data_model包装格式 response_data = { "data_model": { "id": node_id, "name_zh": node_record['name_zh'] if node_record else None, "name_en": node_record['name_en'] if node_record else None, "description": node_record['description'] if node_record else None, "category": node_record['category'] if node_record else None, "create_time": node_record['create_time'] if node_record else None, "level": node_record['level'] if node_record else None, "tag": node_record['tag'] if node_record else None, "leader": node_record['leader'] if node_record else None, "origin": node_record['origin'] if node_record else None, "frequency": node_record['frequency'] if node_record else None, "organization": node_record['organization'] if node_record else None, "data_sensitivity": node_record['data_sensitivity'] if node_record else None, "status": node_record['status'] if node_record else None, "data_source": data_source # 数据源节点ID,与name_zh在同一级别 } } res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 数据模型-详情接口 @bp.route('/data/model/detail', methods=['POST']) def data_model_detail(): try: # 传入请求参数 receiver = request.get_json() # 直接使用字符串ID,不做类型转换 id = receiver.get('id') print(f"Received id from frontend: {id}") result_data = model_functions.handle_id_model(id) # handle_id_model 返回的数据格式是 {"data_model": {...}} # 提取内部的 data_model 数据 model_data = result_data.get("data_model", {}) # 查询关联的数据源信息 from app.services.neo4j_driver import neo4j_driver data_source_id = None try: model_id = int(id) with neo4j_driver.get_session() as session: # 查询数据模型关联的数据源节点 ds_cypher = """ MATCH (m:DataModel)-[:COME_FROM]->(ds:DataSource) WHERE id(m) = $model_id RETURN id(ds) as ds_id """ ds_result = session.run(ds_cypher, model_id=model_id) ds_record = ds_result.single() if ds_record: # 如果存在数据源关联,只返回ID data_source_id = ds_record['ds_id'] logger.info(f"找到数据模型关联的数据源: model_id={model_id}, data_source_id={data_source_id}") else: logger.info(f"数据模型未关联数据源: model_id={model_id}") except Exception as e: # 数据源查询失败不应该影响主流程 logger.error(f"查询数据源关联失败(不中断主流程): {str(e)}") # 删除 childrenId 字段(如果存在) if 'childrenId' in model_data: del model_data['childrenId'] # 添加 data_source 字段 model_data['data_source'] = data_source_id # 构建响应数据 - data_model包装格式 response_data = { "data_model": model_data } res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: import traceback traceback.print_exc() res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 删除数据模型 @bp.route('/data/model/delete', methods=['POST']) def data_model_delete(): try: # 传入请求参数 receiver = request.get_json() id = receiver.get('id') print(f"Deleting data model with id: {id}") # 首先删除数据模型节点 from app.services.neo4j_driver import neo4j_driver query = """ MATCH (n:DataModel) where id(n) = $nodeId detach delete n """ with neo4j_driver.get_session() as session: session.run(query, nodeId=id) res = success({}, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: import traceback traceback.print_exc() res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 列表 数据模型查询 @bp.route('/data/model/list', methods=['POST']) def data_model_list(): try: # 传入请求参数 receiver = request.get_json() page = int(receiver.get('current', 1)) page_size = int(receiver.get('size', 10)) name_zh_filter = receiver.get('name_zh', None) name_en_filter = receiver.get('name_en', None) category = receiver.get('category', None) tag = receiver.get('tag', None) level = receiver.get('level', None) # 计算跳过的记录的数量 skip_count = (page - 1) * page_size data, total = model_functions.model_list(skip_count, page_size, name_en_filter, name_zh_filter, category, tag, level) response_data = {'records': data, 'total': total, 'size': page_size, 'current': page} res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: res = failed({}, {"error": f"{e}"}) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 数据模型的图谱(血缘关系Kinship+影响关系Impact+所有关系all+社区关系community) @bp.route('/data/model/graph/all', methods=['POST']) def data_model_graph_all(): try: # 传入请求参数 receiver = request.get_json() type = receiver['type'] # kinship/impact/all/community if type == 'community': # 社区图谱查询,提取tag参数 tag = receiver.get('tag', None) result = model_functions.model_community(tag) else: # 非社区查询时才提取nodeid和meta参数 nodeid = receiver.get('id') meta = receiver.get('meta', False) # true/false 是否返回元数据 if type == 'kinship': result = model_functions.model_kinship_graph(nodeid, meta) elif type == 'impact': result = model_functions.model_impact_graph(nodeid, meta) else: result = model_functions.model_all_graph(nodeid, meta) return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) # 数据模型的列表图谱 @bp.route('/data/model/list/graph', methods=['POST']) def data_model_list_graph(): try: # 传入请求参数 receiver = request.get_json() if not receiver or 'tag' not in receiver: raise ValueError("Missing 'tag' parameter in request body") nodeid = receiver['tag'] # 构建查询条件 params = {} where_clause = "" if nodeid is not None: where_clause = "MATCH (n)-[:LABEL]->(la) WHERE id(la) = $nodeId" params['nodeId'] = nodeid from app.services.neo4j_driver import neo4j_driver query = f""" MATCH (n:DataModel) OPTIONAL MATCH (n)-[:child]->(child) {where_clause} WITH collect(DISTINCT {{id: toString(id(n)), text: n.name_zh, type: split(labels(n)[0], '_')[1]}}) AS nodes, collect(DISTINCT {{id: toString(id(child)), text: child.name_zh, type: split(labels(child)[0], '_')[1]}}) AS nodes2, collect(DISTINCT {{from: toString(id(n)), to: toString(id(child)), text: '下级'}}) AS lines RETURN nodes + nodes2 AS nodes, lines AS lines """ with neo4j_driver.get_session() as session: result = session.run(query, **params) data = result.data() if len(data) > 0: # 过滤掉空节点(即 id 为 null 的节点) nodes = [] for node in data[0]['nodes']: if node['id'] != 'null': nodes.append(node) lines = data[0]['lines'] response_data = { 'nodes': nodes, 'edges': lines } return json.dumps(success(response_data, "success"), ensure_ascii=False, cls=MyEncoder) else: return json.dumps(success({'nodes': [], 'edges': []}, "No data found"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) # 更新数据模型 @bp.route('/data/model/update', methods=['POST']) def data_model_update(): try: # 传入请求参数 receiver = request.get_json() result = model_functions.data_model_edit(receiver) return json.dumps(success(result, "success"), ensure_ascii=False, cls=MyEncoder) except Exception as e: return json.dumps(failed({}, str(e)), ensure_ascii=False, cls=MyEncoder) # 数据模型关联元数据搜索 @bp.route('/search', methods=['POST']) def data_model_metadata_search(): """数据模型关联元数据搜索""" try: # 获取分页和筛选参数 page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) model_id = request.json.get('id') name_en_filter = request.json.get('name_en') name_zh_filter = request.json.get('name_zh') category_filter = request.json.get('category') tag_filter = request.json.get('tag') if model_id is None: return json.dumps(failed({}, "模型ID不能为空"), ensure_ascii=False, cls=MyEncoder) # 确保传入的ID为整数 try: model_id = int(model_id) except (ValueError, TypeError): return json.dumps(failed({}, f"模型ID必须为整数, 收到的是: {model_id}"), ensure_ascii=False, cls=MyEncoder) # 记录请求信息 logger.info(f"获取数据模型关联元数据请求,ID: {model_id}") # 调用业务逻辑查询关联元数据 metadata_list, total_count = model_functions.model_search_list( model_id, page, page_size, name_en_filter, name_zh_filter, category_filter, tag_filter ) # 返回结果 response_data = { "records": metadata_list, "total": total_count, "size": page_size, "current": page } res = success(response_data, "success") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"数据模型关联元数据搜索失败: {str(e)}") res = failed({}, str(e)) return json.dumps(res, ensure_ascii=False, cls=MyEncoder)