""" 数据服务 API 路由 提供数据产品列表、数据预览、Excel下载等接口 """ import json import logging from flask import request, send_file from app.api.data_service import bp from app.core.data_service.data_product_service import DataProductService from app.core.graph.graph_operations import MyEncoder from app.models.result import failed, success logger = logging.getLogger(__name__) # ==================== 数据产品列表接口 ==================== @bp.route("/products", methods=["GET"]) def get_products(): """ 获取数据产品列表 Query Parameters: page: 页码,默认 1 page_size: 每页数量,默认 20 search: 搜索关键词 status: 状态过滤 (active/inactive/error) """ try: page = request.args.get("page", 1, type=int) page_size = request.args.get("page_size", 20, type=int) search = request.args.get("search", "") status = request.args.get("status") result = DataProductService.get_data_products( page=page, page_size=page_size, search=search, status=status, ) res = success(result, "获取数据产品列表成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据产品列表失败: {str(e)}") res = failed(f"获取数据产品列表失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/products/", methods=["GET"]) def get_product(product_id: int): """ 获取数据产品详情 Path Parameters: product_id: 数据产品ID """ try: product = DataProductService.get_product_by_id(product_id) if not product: res = failed("数据产品不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(product.to_dict(), "获取数据产品详情成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据产品详情失败: {str(e)}") res = failed(f"获取数据产品详情失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 数据预览接口 ==================== @bp.route("/products//preview", methods=["GET"]) def get_product_preview(product_id: int): """ 获取数据产品的数据预览(默认200条) Path Parameters: product_id: 数据产品ID Query Parameters: limit: 预览数据条数,默认200,最大1000 """ try: limit = request.args.get("limit", 200, type=int) # 限制最大预览条数 limit = min(limit, 1000) result = DataProductService.get_product_preview( product_id=product_id, limit=limit, ) # 自动标记为已查看 DataProductService.mark_as_viewed(product_id) res = success(result, "获取数据预览成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.warning(f"获取数据预览参数错误: {str(ve)}") res = failed(str(ve), code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据预览失败: {str(e)}") res = failed(f"获取数据预览失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== Excel下载接口 ==================== @bp.route("/products//download", methods=["GET"]) def download_product_excel(product_id: int): """ 下载数据产品数据为Excel文件 Path Parameters: product_id: 数据产品ID Query Parameters: limit: 导出数据条数,默认200,最大10000 """ try: limit = request.args.get("limit", 200, type=int) # 限制最大导出条数 limit = min(limit, 10000) excel_file, filename = DataProductService.export_to_excel( product_id=product_id, limit=limit, ) # 标记为已查看 DataProductService.mark_as_viewed(product_id) return send_file( excel_file, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name=filename, ) except ValueError as ve: logger.warning(f"下载Excel参数错误: {str(ve)}") res = failed(str(ve), code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"下载Excel失败: {str(e)}") res = failed(f"下载Excel失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 标记已查看接口 ==================== @bp.route("/products//viewed", methods=["POST"]) def mark_product_viewed(product_id: int): """ 标记数据产品为已查看(消除更新提示) Path Parameters: product_id: 数据产品ID """ try: product = DataProductService.mark_as_viewed(product_id) if not product: res = failed("数据产品不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(product.to_dict(), "标记已查看成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"标记已查看失败: {str(e)}") res = failed(f"标记已查看失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 刷新统计信息接口 ==================== @bp.route("/products//refresh", methods=["POST"]) def refresh_product_stats(product_id: int): """ 刷新数据产品的统计信息 Path Parameters: product_id: 数据产品ID """ try: product = DataProductService.refresh_product_stats(product_id) if not product: res = failed("数据产品不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(product.to_dict(), "刷新统计信息成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"刷新统计信息失败: {str(e)}") res = failed(f"刷新统计信息失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 删除数据产品接口 ==================== @bp.route("/products/", methods=["DELETE"]) def delete_product(product_id: int): """ 删除数据产品 Path Parameters: product_id: 数据产品ID """ try: result = DataProductService.delete_product(product_id) if not result: res = failed("数据产品不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success({}, "删除数据产品成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"删除数据产品失败: {str(e)}") res = failed(f"删除数据产品失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 手动注册数据产品接口 ==================== @bp.route("/products", methods=["POST"]) def register_product(): """ 手动注册数据产品 Request Body: product_name: 数据产品名称(必填) product_name_en: 数据产品英文名(必填) target_table: 目标表名(必填) target_schema: 目标schema(可选,默认public) description: 描述(可选) """ try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 验证必填字段 required_fields = ["product_name", "product_name_en", "target_table"] for field in required_fields: if not data.get(field): res = failed(f"缺少必填字段: {field}", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) product = DataProductService.register_data_product( product_name=data["product_name"], product_name_en=data["product_name_en"], target_table=data["target_table"], target_schema=data.get("target_schema", "public"), description=data.get("description"), source_dataflow_id=data.get("source_dataflow_id"), source_dataflow_name=data.get("source_dataflow_name"), created_by=data.get("created_by", "manual"), ) res = success(product.to_dict(), "注册数据产品成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"注册数据产品失败: {str(e)}") res = failed(f"注册数据产品失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder)