| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- """
- 数据服务 API 路由
- 提供数据产品列表、数据预览、Excel下载等接口
- """
- import json
- import logging
- from flask import request, send_file
- from app.api.data_service import bp
- from app.core.data_service.data_product_service import DataProductService
- from app.core.graph.graph_operations import MyEncoder
- from app.models.result import failed, success
- logger = logging.getLogger(__name__)
- # ==================== 数据产品列表接口 ====================
- @bp.route("/products", methods=["GET"])
- def get_products():
- """
- 获取数据产品列表
- Query Parameters:
- page: 页码,默认 1
- page_size: 每页数量,默认 20
- search: 搜索关键词
- status: 状态过滤 (active/inactive/error)
- """
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 20, type=int)
- search = request.args.get("search", "")
- status = request.args.get("status")
- result = DataProductService.get_data_products(
- page=page,
- page_size=page_size,
- search=search,
- status=status,
- )
- res = success(result, "获取数据产品列表成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据产品列表失败: {str(e)}")
- res = failed(f"获取数据产品列表失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/products/<int:product_id>", methods=["GET"])
- def get_product(product_id: int):
- """
- 获取数据产品详情
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.get_product_by_id(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "获取数据产品详情成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据产品详情失败: {str(e)}")
- res = failed(f"获取数据产品详情失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 数据预览接口 ====================
- @bp.route("/products/<int:product_id>/preview", methods=["GET"])
- def get_product_preview(product_id: int):
- """
- 获取数据产品的数据预览(默认200条)
- Path Parameters:
- product_id: 数据产品ID
- Query Parameters:
- limit: 预览数据条数,默认200,最大1000
- """
- try:
- limit = request.args.get("limit", 200, type=int)
- # 限制最大预览条数
- limit = min(limit, 1000)
- result = DataProductService.get_product_preview(
- product_id=product_id,
- limit=limit,
- )
- # 自动标记为已查看
- DataProductService.mark_as_viewed(product_id)
- res = success(result, "获取数据预览成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"获取数据预览参数错误: {str(ve)}")
- res = failed(str(ve), code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据预览失败: {str(e)}")
- res = failed(f"获取数据预览失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== Excel下载接口 ====================
- @bp.route("/products/<int:product_id>/download", methods=["GET"])
- def download_product_excel(product_id: int):
- """
- 下载数据产品数据为Excel文件
- Path Parameters:
- product_id: 数据产品ID
- Query Parameters:
- limit: 导出数据条数,默认200,最大10000
- """
- try:
- limit = request.args.get("limit", 200, type=int)
- # 限制最大导出条数
- limit = min(limit, 10000)
- excel_file, filename = DataProductService.export_to_excel(
- product_id=product_id,
- limit=limit,
- )
- # 标记为已查看
- DataProductService.mark_as_viewed(product_id)
- return send_file(
- excel_file,
- mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- as_attachment=True,
- download_name=filename,
- )
- except ValueError as ve:
- logger.warning(f"下载Excel参数错误: {str(ve)}")
- res = failed(str(ve), code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"下载Excel失败: {str(e)}")
- res = failed(f"下载Excel失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 标记已查看接口 ====================
- @bp.route("/products/<int:product_id>/viewed", methods=["POST"])
- def mark_product_viewed(product_id: int):
- """
- 标记数据产品为已查看(消除更新提示)
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.mark_as_viewed(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "标记已查看成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"标记已查看失败: {str(e)}")
- res = failed(f"标记已查看失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 刷新统计信息接口 ====================
- @bp.route("/products/<int:product_id>/refresh", methods=["POST"])
- def refresh_product_stats(product_id: int):
- """
- 刷新数据产品的统计信息
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.refresh_product_stats(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "刷新统计信息成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"刷新统计信息失败: {str(e)}")
- res = failed(f"刷新统计信息失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 删除数据产品接口 ====================
- @bp.route("/products/<int:product_id>", methods=["DELETE"])
- def delete_product(product_id: int):
- """
- 删除数据产品
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- result = DataProductService.delete_product(product_id)
- if not result:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success({}, "删除数据产品成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"删除数据产品失败: {str(e)}")
- res = failed(f"删除数据产品失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 手动注册数据产品接口 ====================
- @bp.route("/products", methods=["POST"])
- def register_product():
- """
- 手动注册数据产品
- Request Body:
- product_name: 数据产品名称(必填)
- product_name_en: 数据产品英文名(必填)
- target_table: 目标表名(必填)
- target_schema: 目标schema(可选,默认public)
- description: 描述(可选)
- """
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 验证必填字段
- required_fields = ["product_name", "product_name_en", "target_table"]
- for field in required_fields:
- if not data.get(field):
- res = failed(f"缺少必填字段: {field}", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- product = DataProductService.register_data_product(
- product_name=data["product_name"],
- product_name_en=data["product_name_en"],
- target_table=data["target_table"],
- target_schema=data.get("target_schema", "public"),
- description=data.get("description"),
- source_dataflow_id=data.get("source_dataflow_id"),
- source_dataflow_name=data.get("source_dataflow_name"),
- created_by=data.get("created_by", "manual"),
- )
- res = success(product.to_dict(), "注册数据产品成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"注册数据产品失败: {str(e)}")
- res = failed(f"注册数据产品失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
|