||
- """
- 数据服务 API 路由
- 提供数据产品列表、数据预览、Excel下载等接口
- """
- import json
- import logging
- from flask import request, send_file
- from app.api.data_service import bp
- from app.core.data_service.data_product_service import DataProductService
- from app.core.graph.graph_operations import MyEncoder
- from app.models.result import failed, success
- logger = logging.getLogger(__name__)
- # ==================== 数据产品列表接口 ====================
- @bp.route("/products", methods=["GET"])
- def get_products():
- """
- 获取数据产品列表
- Query Parameters:
- page: 页码,默认 1
- page_size: 每页数量,默认 20
- search: 搜索关键词
- status: 状态过滤 (active/inactive/error)
- """
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 20, type=int)
- search = request.args.get("search", "")
- status = request.args.get("status")
- result = DataProductService.get_data_products(
- page=page,
- page_size=page_size,
- search=search,
- status=status,
- )
- res = success(result, "获取数据产品列表成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据产品列表失败: {str(e)}")
- res = failed(f"获取数据产品列表失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/products/<int:product_id>", methods=["GET"])
- def get_product(product_id: int):
- """
- 获取数据产品详情
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.get_product_by_id(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "获取数据产品详情成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据产品详情失败: {str(e)}")
- res = failed(f"获取数据产品详情失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 数据预览接口 ====================
- @bp.route("/products/<int:product_id>/preview", methods=["GET"])
- def get_product_preview(product_id: int):
- """
- 获取数据产品的数据预览(默认200条)
- Path Parameters:
- product_id: 数据产品ID
- Query Parameters:
- limit: 预览数据条数,默认200,最大1000
- """
- try:
- limit = request.args.get("limit", 200, type=int)
- # 限制最大预览条数
- limit = min(limit, 1000)
- result = DataProductService.get_product_preview(
- product_id=product_id,
- limit=limit,
- )
- # 自动标记为已查看
- DataProductService.mark_as_viewed(product_id)
- res = success(result, "获取数据预览成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"获取数据预览参数错误: {str(ve)}")
- res = failed(str(ve), code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据预览失败: {str(e)}")
- res = failed(f"获取数据预览失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== Excel下载接口 ====================
- @bp.route("/products/<int:product_id>/download", methods=["GET"])
- def download_product_excel(product_id: int):
- """
- 下载数据产品数据为Excel文件
- Path Parameters:
- product_id: 数据产品ID
- Query Parameters:
- limit: 导出数据条数,默认200,最大10000
- """
- try:
- limit = request.args.get("limit", 200, type=int)
- # 限制最大导出条数
- limit = min(limit, 10000)
- excel_file, filename = DataProductService.export_to_excel(
- product_id=product_id,
- limit=limit,
- )
- # 标记为已查看
- DataProductService.mark_as_viewed(product_id)
- return send_file(
- excel_file,
- mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- as_attachment=True,
- download_name=filename,
- )
- except ValueError as ve:
- logger.warning(f"下载Excel参数错误: {str(ve)}")
- res = failed(str(ve), code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"下载Excel失败: {str(e)}")
- res = failed(f"下载Excel失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 标记已查看接口 ====================
- @bp.route("/products/<int:product_id>/viewed", methods=["POST"])
- def mark_product_viewed(product_id: int):
- """
- 标记数据产品为已查看(消除更新提示)
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.mark_as_viewed(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "标记已查看成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"标记已查看失败: {str(e)}")
- res = failed(f"标记已查看失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 刷新统计信息接口 ====================
- @bp.route("/products/<int:product_id>/refresh", methods=["POST"])
- def refresh_product_stats(product_id: int):
- """
- 刷新数据产品的统计信息
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.refresh_product_stats(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "刷新统计信息成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"刷新统计信息失败: {str(e)}")
- res = failed(f"刷新统计信息失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 删除数据产品接口 ====================
- @bp.route("/products/<int:product_id>", methods=["DELETE"])
- def delete_product(product_id: int):
- """
- 删除数据产品
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- result = DataProductService.delete_product(product_id)
- if not result:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success({}, "删除数据产品成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"删除数据产品失败: {str(e)}")
- res = failed(f"删除数据产品失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 手动注册数据产品接口 ====================
- @bp.route("/products", methods=["POST"])
- def register_product():
- """
- 手动注册数据产品
- Request Body:
- product_name: 数据产品名称(必填)
- product_name_en: 数据产品英文名(必填)
- target_table: 目标表名(必填)
- target_schema: 目标schema(可选,默认public)
- description: 描述(可选)
- """
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 验证必填字段
- required_fields = ["product_name", "product_name_en", "target_table"]
- for field in required_fields:
- if not data.get(field):
- res = failed(f"缺少必填字段: {field}", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- product = DataProductService.register_data_product(
- product_name=data["product_name"],
- product_name_en=data["product_name_en"],
- target_table=data["target_table"],
- target_schema=data.get("target_schema", "public"),
- description=data.get("description"),
- source_dataflow_id=data.get("source_dataflow_id"),
- source_dataflow_name=data.get("source_dataflow_name"),
- created_by=data.get("created_by", "manual"),
- )
- res = success(product.to_dict(), "注册数据产品成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"注册数据产品失败: {str(e)}")
- res = failed(f"注册数据产品失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
|