""" Business Domain API 路由模块 提供业务领域相关的 RESTful API 接口 """ import io import json import time import logging import traceback import urllib.parse from flask import request, jsonify, current_app, send_file from minio import Minio from minio.error import S3Error from app.api.business_domain import bp from app.models.result import success, failed from app.services.neo4j_driver import neo4j_driver from app.core.llm.ddl_parser import DDLParser from app.core.business_domain import ( business_domain_list, get_business_domain_by_id, delete_business_domain, update_business_domain, save_business_domain, business_domain_graph_all, business_domain_search_list, business_domain_compose, business_domain_label_list ) logger = logging.getLogger("app") def get_minio_client(): """获取 MinIO 客户端实例""" return Minio( current_app.config['MINIO_HOST'], access_key=current_app.config['MINIO_USER'], secret_key=current_app.config['MINIO_PASSWORD'], secure=current_app.config['MINIO_SECURE'] ) def get_minio_config(): """获取 MinIO 配置""" return { 'MINIO_BUCKET': current_app.config['MINIO_BUCKET'], 'PREFIX': current_app.config.get( 'BUSINESS_DOMAIN_PREFIX', 'business_domain' ), 'ALLOWED_EXTENSIONS': current_app.config['ALLOWED_EXTENSIONS'] } def allowed_file(filename): """检查文件扩展名是否允许""" if '.' not in filename: return False ext = filename.rsplit('.', 1)[1].lower() return ext in get_minio_config()['ALLOWED_EXTENSIONS'] @bp.route('/list', methods=['POST']) def bd_list(): """ 获取业务领域列表 请求参数 (JSON): - current: 当前页码,默认1 - size: 每页大小,默认10 - name_en: 英文名称过滤条件(可选) - name_zh: 中文名称过滤条件(可选) - type: 类型过滤条件,默认'all'表示不过滤(可选) - category: 分类过滤条件(可选) - tag: 标签过滤条件(可选) 返回: - success: 是否成功 - message: 消息 - data: - records: 业务领域列表 - total: 总数量 - size: 每页大小 - current: 当前页码 """ try: # 获取分页和筛选参数 if not request.json: return jsonify(failed('请求数据不能为空')) page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) name_en_filter = request.json.get('name_en') name_zh_filter = request.json.get('name_zh') type_filter = request.json.get('type', 'all') category_filter = request.json.get('category') tag_filter = request.json.get('tag') # 调用业务逻辑查询业务领域列表 domains, total_count = business_domain_list( page, page_size, name_en_filter, name_zh_filter, type_filter, category_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": domains, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取业务领域列表失败: {str(e)}") return jsonify(failed("获取业务领域列表失败", error=str(e))) @bp.route('/detail', methods=['POST']) def bd_detail(): """ 获取业务领域详情 请求参数 (JSON): - id: 业务领域节点ID(必填) 返回: - success: 是否成功 - message: 消息 - data: 业务领域详情 """ try: # 获取参数 if not request.json: return jsonify(failed('请求数据不能为空')) domain_id = request.json.get('id') if domain_id is None: return jsonify(failed("业务领域ID不能为空")) # 确保传入的ID为整数 try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}")) # 调用业务逻辑查询业务领域详情 domain_data = get_business_domain_by_id(domain_id) if not domain_data: return jsonify(failed("业务领域不存在")) return jsonify(success(domain_data)) except Exception as e: logger.error(f"获取业务领域详情失败: {str(e)}") return jsonify(failed("获取业务领域详情失败", error=str(e))) @bp.route('/delete', methods=['POST']) def bd_delete(): """ 删除业务领域 请求参数 (JSON): - id: 业务领域节点ID(必填) 返回: - success: 是否成功 - message: 消息 - data: 删除结果 """ try: # 获取参数 if not request.json: return jsonify(failed("请求数据不能为空")) domain_id = request.json.get('id') if domain_id is None: return jsonify(failed("业务领域ID不能为空")) # 调用业务逻辑删除业务领域 result = delete_business_domain(domain_id) if result: return jsonify(success({"message": "业务领域删除成功"})) else: return jsonify(failed("业务领域删除失败")) except Exception as e: logger.error(f"删除业务领域失败: {str(e)}") return jsonify(failed("删除业务领域失败", error=str(e))) @bp.route('/save', methods=['POST']) def bd_save(): """ 保存业务领域(新建或更新) 请求参数 (JSON): - id: 业务领域节点ID(可选,有则更新,无则新建) - name_zh: 中文名称(新建时必填) - name_en: 英文名称(新建时必填) - describe: 描述(可选) - type: 类型(可选) - category: 分类(可选) - tag: 标签ID(可选) - data_source: 数据源ID(可选) - 其他属性字段... 返回: - success: 是否成功 - message: 消息 - data: 保存后的业务领域数据 """ try: # 获取保存数据 data = request.json if not data: return jsonify(failed("请求数据不能为空")) # 新建时校验必填字段 if not data.get("id"): if not data.get("name_zh") or not data.get("name_en"): return jsonify(failed("新建时 name_zh 和 name_en 为必填项")) # 调用业务逻辑保存业务领域 saved_data = save_business_domain(data) return jsonify(success(saved_data)) except Exception as e: logger.error(f"保存业务领域失败: {str(e)}") return jsonify(failed("保存业务领域失败", error=str(e))) @bp.route('/update', methods=['POST']) def bd_update(): """ 更新业务领域 请求参数 (JSON): - id: 业务领域节点ID(必填) - name_zh: 中文名称(可选) - name_en: 英文名称(可选) - describe: 描述(可选) - tag: 标签ID(可选) - data_source: 数据源ID(可选) - 其他属性字段... 返回: - success: 是否成功 - message: 消息 - data: 更新后的业务领域数据 """ try: # 获取更新数据 data = request.json if not data or "id" not in data: return jsonify(failed("参数不完整")) # 调用业务逻辑更新业务领域 updated_data = update_business_domain(data) return jsonify(success(updated_data)) except Exception as e: logger.error(f"更新业务领域失败: {str(e)}") return jsonify(failed("更新业务领域失败", error=str(e))) @bp.route('/upload', methods=['POST']) def bd_upload(): """ 上传业务领域相关文件 请求参数 (multipart/form-data): - file: 上传的文件(必填) 返回: - success: 是否成功 - message: 消息 - data: - filename: 原始文件名 - size: 文件大小(字节) - type: 文件类型 - url: 文件存储路径 """ try: # 检查请求中是否有文件 if 'file' not in request.files: return jsonify(failed("没有找到上传的文件")) file = request.files['file'] # 检查文件名 if file.filename == '': return jsonify(failed("未选择文件")) # 检查文件类型 if not allowed_file(file.filename): return jsonify(failed("不支持的文件类型")) # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 读取文件内容 file_content = file.read() file_size = len(file_content) filename = file.filename or '' file_type = filename.rsplit('.', 1)[1].lower() # 提取文件名(不包含扩展名) filename_without_ext = filename.rsplit('.', 1)[0] # 生成紧凑的时间戳 (yyyyMMddHHmmss) timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime()) # 生成唯一文件名 prefix = config['PREFIX'] object_name = ( f"{prefix}/{filename_without_ext}_{timestamp}.{file_type}" ) # 上传文件到 MinIO minio_client.put_object( config['MINIO_BUCKET'], object_name, io.BytesIO(file_content), file_size, content_type=f"application/{file_type}" ) logger.info(f"文件上传成功: {object_name}, 大小: {file_size}") # 返回结果 return jsonify(success({ "filename": file.filename, "size": file_size, "type": file_type, "url": object_name })) except Exception as e: logger.error(f"文件上传失败: {str(e)}") return jsonify(failed("文件上传失败", error=str(e))) @bp.route('/download', methods=['GET']) def bd_download(): """ 下载业务领域相关文件 请求参数 (URL Query): - url: 文件存储路径(必填) 返回: - 文件流(作为附件下载) """ response = None try: # 获取文件路径参数 object_name = request.args.get('url') if not object_name: return jsonify(failed("文件路径不能为空")) # URL解码,处理特殊字符 object_name = urllib.parse.unquote(object_name) # 记录下载请求信息,便于调试 logger.info(f"下载文件请求: {object_name}") # 获取 MinIO 配置 minio_client = get_minio_client() config = get_minio_config() # 获取文件 try: response = minio_client.get_object( config['MINIO_BUCKET'], object_name ) file_data = response.read() except S3Error as e: logger.error(f"MinIO获取文件失败: {str(e)}") return jsonify(failed(f"文件获取失败: {str(e)}")) # 获取文件名 file_name = object_name.split('/')[-1] # 直接从内存返回文件,不创建临时文件 file_stream = io.BytesIO(file_data) # 返回文件 return send_file( file_stream, as_attachment=True, download_name=file_name, mimetype="application/octet-stream" ) except Exception as e: logger.error(f"文件下载失败: {str(e)}") return jsonify(failed("文件下载失败", error=str(e))) finally: if response: response.close() response.release_conn() @bp.route('/graphall', methods=['POST']) def bd_graph_all(): """ 获取业务领域完整关系图谱 请求参数 (JSON): - id: 业务领域节点ID(必填) - meta: 是否包含元数据节点,默认True(可选) 返回: - success: 是否成功 - message: 消息 - data: - nodes: 节点列表 - lines: 关系列表 """ try: # 获取参数 if not request.json: return jsonify(failed('请求数据不能为空')) domain_id = request.json.get('id') include_meta = request.json.get('meta', True) if domain_id is None: return jsonify(failed("业务领域ID不能为空")) # 确保传入的ID为整数 try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed( f"业务领域ID必须为整数, 收到的是: {domain_id}" )) # 调用业务逻辑获取完整图谱 graph_data = business_domain_graph_all(domain_id, include_meta) return jsonify(success(graph_data)) except Exception as e: logger.error(f"获取业务领域图谱失败: {str(e)}") return jsonify(failed("获取业务领域图谱失败", error=str(e))) @bp.route('/ddlparse', methods=['POST']) def bd_ddl_parse(): """ 解析DDL语句,用于业务领域创建 请求参数: - file: SQL文件(multipart/form-data,可选) - sql: SQL内容(JSON,可选) 至少提供其中一种方式 返回: - success: 是否成功 - message: 消息 - data: 解析后的DDL列表,包含表信息和字段信息 """ try: # 获取参数 - 支持两种方式:上传文件或JSON sql_content = '' # 检查是否有文件上传 if 'file' in request.files: file = request.files['file'] # 检查文件是否存在且文件名不为空 if file and file.filename: # 检查是否是SQL文件 if not file.filename.lower().endswith('.sql'): return jsonify(failed("只接受SQL文件")) # 读取文件内容 sql_content = file.read().decode('utf-8') logger.info( f"从上传的文件中读取SQL内容,文件名: {file.filename}" ) # 如果没有文件上传,检查是否有JSON输入 elif request.is_json and request.json: sql_content = request.json.get('sql', '') # 如果两种方式都没有提供SQL内容,则返回错误 if not sql_content: return jsonify(failed( "SQL内容不能为空,请上传SQL文件或提供SQL内容" )) parser = DDLParser() # 提取创建表的DDL语句 ddl_list = parser.parse_ddl(sql_content) if not ddl_list: return jsonify(failed("未找到有效的CREATE TABLE语句")) # 处理表的存在状态 if isinstance(ddl_list, list): # 新格式:数组格式 # 获取所有表名 table_names = [] for table_item in ddl_list: if isinstance(table_item, dict) and 'table_info' in table_item: table_name = table_item['table_info'].get('name_en') if table_name: table_names.append(table_name) # 首先为所有表设置默认的exist状态 for table_item in ddl_list: if isinstance(table_item, dict): table_item["exist"] = False if table_names: try: # 查询业务领域是否存在 with neo4j_driver.get_session() as session: table_query = """ UNWIND $names AS name OPTIONAL MATCH (n:BusinessDomain {name_en: name}) RETURN name, n IS NOT NULL AS exists """ table_results = session.run( table_query, names=table_names ) # 创建存在状态映射 exist_map = {} for record in table_results: table_name = record["name"] exists = record["exists"] exist_map[table_name] = exists # 更新存在的表的状态 for table_item in ddl_list: if (isinstance(table_item, dict) and 'table_info' in table_item): info = table_item['table_info'] t_name = info.get('name_en') if t_name and t_name in exist_map: table_item["exist"] = exist_map[t_name] except Exception as e: logger.error(f"检查业务领域存在状态失败: {str(e)}") # 如果查询失败,所有表保持默认的False状态 elif isinstance(ddl_list, dict): # 兼容旧格式:字典格式(以表名为key) table_names = list(ddl_list.keys()) # 首先为所有表设置默认的exist状态 for table_name in table_names: if isinstance(ddl_list[table_name], dict): ddl_list[table_name]["exist"] = False else: logger.warning( f"表 {table_name} 的值不是字典类型: " f"{type(ddl_list[table_name])}" ) if table_names: try: # 查询业务领域是否存在 with neo4j_driver.get_session() as session: table_query = """ UNWIND $names AS name OPTIONAL MATCH (n:BusinessDomain {name_en: name}) RETURN name, n IS NOT NULL AS exists """ table_results = session.run( table_query, names=table_names ) # 更新存在的表的状态 for record in table_results: table_name = record["name"] exists = record["exists"] is_valid = ( table_name in ddl_list and isinstance(ddl_list[table_name], dict) ) if is_valid: ddl_list[table_name]["exist"] = exists except Exception as e: logger.error(f"检查业务领域存在状态失败: {str(e)}") # 如果查询失败,所有表保持默认的False状态 logger.debug( f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}" ) return jsonify(success(ddl_list)) except Exception as e: logger.error(f"解析DDL语句失败: {str(e)}") logger.error(traceback.format_exc()) return jsonify(failed("解析DDL语句失败", error=str(e))) @bp.route('/search', methods=['POST']) def bd_search(): """ 搜索业务领域关联的元数据 请求参数 (JSON): - id: 业务领域节点ID(必填) - current: 当前页码,默认1 - size: 每页大小,默认10 - name_en: 英文名称过滤条件(可选) - name_zh: 中文名称过滤条件(可选) - category: 分类过滤条件(可选) - tag: 标签过滤条件(可选) 返回: - success: 是否成功 - message: 消息 - data: - records: 元数据列表 - total: 总数量 - size: 每页大小 - current: 当前页码 """ try: # 获取分页和筛选参数 if not request.json: return jsonify(failed('请求数据不能为空')) page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) domain_id = request.json.get('id') name_en_filter = request.json.get('name_en') name_zh_filter = request.json.get('name_zh') category_filter = request.json.get('category') tag_filter = request.json.get('tag') if domain_id is None: return jsonify(failed("业务领域ID不能为空")) # 确保传入的ID为整数 try: domain_id = int(domain_id) except (ValueError, TypeError): return jsonify(failed( f"业务领域ID必须为整数, 收到的是: {domain_id}" )) # 记录请求信息 logger.info(f"获取业务领域关联元数据请求,ID: {domain_id}") # 调用业务逻辑查询关联元数据 metadata_list, total_count = business_domain_search_list( domain_id, page, page_size, name_en_filter, name_zh_filter, category_filter, tag_filter ) # 返回结果 return jsonify(success({ "records": metadata_list, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"业务领域关联元数据搜索失败: {str(e)}") return jsonify(failed("业务领域关联元数据搜索失败", error=str(e))) @bp.route('/compose', methods=['POST']) def bd_compose(): """ 从已有业务领域中组合创建新的业务领域 请求参数 (JSON): - name_zh: 中文名称(必填) - name_en: 英文名称(可选,不提供则自动翻译) - id_list: 关联的业务领域和元数据列表(必填) 格式: [{"domain_id": 123, "metaData": [{"id": 456}, ...]}] - describe: 描述(可选) - type: 类型(可选) - category: 分类(可选) - tag: 标签ID(可选) - data_source: 数据源ID(可选) 返回: - success: 是否成功 - message: 消息 - data: 创建后的业务领域数据 """ try: # 获取请求数据 data = request.json if not data: return jsonify(failed("请求数据不能为空")) # 校验必填字段 if not data.get("name_zh"): return jsonify(failed("name_zh 为必填项")) if not data.get("id_list"): return jsonify(failed("id_list 为必填项")) # 调用业务逻辑组合创建业务领域 result_data = business_domain_compose(data) # 构建响应数据 response_data = { "business_domain": result_data } return jsonify(success(response_data)) except Exception as e: logger.error(f"组合创建业务领域失败: {str(e)}") return jsonify(failed("组合创建业务领域失败", error=str(e))) @bp.route('/labellist', methods=['POST']) def bd_label_list(): """ 获取数据标签列表(用于业务领域关联) 请求参数 (JSON): - current: 当前页码,默认1 - size: 每页大小,默认10 - name_en: 英文名称过滤条件(可选) - name_zh: 中文名称过滤条件(可选) - category: 分类过滤条件(可选) - group: 分组过滤条件(可选) 返回: - success: 是否成功 - message: 消息 - data: - records: 标签列表 - total: 总数量 - size: 每页大小 - current: 当前页码 """ try: # 获取分页和筛选参数 if not request.json: return jsonify(failed('请求数据不能为空')) page = int(request.json.get('current', 1)) page_size = int(request.json.get('size', 10)) name_en_filter = request.json.get('name_en') name_zh_filter = request.json.get('name_zh') category_filter = request.json.get('category') group_filter = request.json.get('group') # 调用业务逻辑查询标签列表 labels, total_count = business_domain_label_list( page, page_size, name_en_filter, name_zh_filter, category_filter, group_filter ) # 返回结果 return jsonify(success({ "records": labels, "total": total_count, "size": page_size, "current": page })) except Exception as e: logger.error(f"获取标签列表失败: {str(e)}") return jsonify(failed("获取标签列表失败", error=str(e)))