||
- """
- 数据服务 API 路由
- 提供数据产品列表、数据预览、Excel下载等接口
- 提供数据订单创建、分析、审批等接口
- """
- import json
- import logging
- from flask import request, send_file
- from app.api.data_service import bp
- from app.core.data_service.data_product_service import (
- DataOrderService,
- DataProductService,
- )
- from app.core.graph.graph_operations import MyEncoder
- from app.models.result import failed, success
- logger = logging.getLogger(__name__)
- # ==================== 数据产品列表接口 ====================
- @bp.route("/products", methods=["GET"])
- def get_products():
- """
- 获取数据产品列表
- Query Parameters:
- page: 页码,默认 1
- page_size: 每页数量,默认 20
- search: 搜索关键词
- status: 状态过滤 (active/inactive/error)
- """
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 20, type=int)
- search = request.args.get("search", "")
- status = request.args.get("status")
- result = DataProductService.get_data_products(
- page=page,
- page_size=page_size,
- search=search,
- status=status,
- )
- res = success(result, "获取数据产品列表成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据产品列表失败: {str(e)}")
- res = failed(f"获取数据产品列表失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/products/<int:product_id>", methods=["GET"])
- def get_product(product_id: int):
- """
- 获取数据产品详情
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.get_product_by_id(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "获取数据产品详情成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据产品详情失败: {str(e)}")
- res = failed(f"获取数据产品详情失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 数据预览接口 ====================
- @bp.route("/products/<int:product_id>/preview", methods=["GET"])
- def get_product_preview(product_id: int):
- """
- 获取数据产品的数据预览(默认200条)
- Path Parameters:
- product_id: 数据产品ID
- Query Parameters:
- limit: 预览数据条数,默认200,最大1000
- """
- try:
- limit = request.args.get("limit", 200, type=int)
- # 限制最大预览条数
- limit = min(limit, 1000)
- result = DataProductService.get_product_preview(
- product_id=product_id,
- limit=limit,
- )
- # 自动标记为已查看
- DataProductService.mark_as_viewed(product_id)
- res = success(result, "获取数据预览成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"获取数据预览参数错误: {str(ve)}")
- res = failed(str(ve), code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据预览失败: {str(e)}")
- res = failed(f"获取数据预览失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 数据加工可视化接口 ====================
- @bp.route("/products/<int:product_id>/lineage-visualization", methods=["POST"])
- def get_lineage_visualization(product_id: int):
- """
- 获取数据产品的血缘可视化数据
- 通过数据产品关联的 BusinessDomain 节点,追溯其 INPUT/OUTPUT 血缘关系,
- 直到到达具有 DataResource 标签的源节点。同时将样例数据的键值映射到各节点字段。
- Path Parameters:
- product_id: 数据产品ID
- Request Body:
- sample_data: 单条样例数据(JSON对象,key为中文字段名)
- Returns:
- nodes: 节点列表,包含 BusinessDomain 和 DataFlow 节点
- lines: 关系列表,包含 INPUT 和 OUTPUT 关系
- lineage_depth: 血缘追溯深度
- """
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- sample_data = data.get("sample_data")
- if not sample_data or not isinstance(sample_data, dict):
- res = failed("sample_data 必须是非空的 JSON 对象", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- result = DataProductService.get_data_lineage_visualization(
- product_id=product_id,
- sample_data=sample_data,
- )
- res = success(result, "获取血缘可视化数据成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"获取血缘可视化参数错误: {str(ve)}")
- res = failed(str(ve), code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取血缘可视化数据失败: {str(e)}")
- res = failed(f"获取血缘可视化数据失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== Excel下载接口 ====================
- @bp.route("/products/<int:product_id>/download", methods=["GET"])
- def download_product_excel(product_id: int):
- """
- 下载数据产品数据为Excel文件
- Path Parameters:
- product_id: 数据产品ID
- Query Parameters:
- limit: 导出数据条数,默认200,最大10000
- """
- try:
- limit = request.args.get("limit", 200, type=int)
- # 限制最大导出条数
- limit = min(limit, 10000)
- excel_file, filename = DataProductService.export_to_excel(
- product_id=product_id,
- limit=limit,
- )
- # 标记为已查看
- DataProductService.mark_as_viewed(product_id)
- return send_file(
- excel_file,
- mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- as_attachment=True,
- download_name=filename,
- )
- except ValueError as ve:
- logger.warning(f"下载Excel参数错误: {str(ve)}")
- res = failed(str(ve), code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"下载Excel失败: {str(e)}")
- res = failed(f"下载Excel失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 标记已查看接口 ====================
- @bp.route("/products/<int:product_id>/viewed", methods=["POST"])
- def mark_product_viewed(product_id: int):
- """
- 标记数据产品为已查看(消除更新提示)
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.mark_as_viewed(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "标记已查看成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"标记已查看失败: {str(e)}")
- res = failed(f"标记已查看失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 刷新统计信息接口 ====================
- @bp.route("/products/<int:product_id>/refresh", methods=["POST"])
- def refresh_product_stats(product_id: int):
- """
- 刷新数据产品的统计信息
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- product = DataProductService.refresh_product_stats(product_id)
- if not product:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(product.to_dict(), "刷新统计信息成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"刷新统计信息失败: {str(e)}")
- res = failed(f"刷新统计信息失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 删除数据产品接口 ====================
- @bp.route("/products/<int:product_id>", methods=["DELETE"])
- def delete_product(product_id: int):
- """
- 删除数据产品
- Path Parameters:
- product_id: 数据产品ID
- """
- try:
- result = DataProductService.delete_product(product_id)
- if not result:
- res = failed("数据产品不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success({}, "删除数据产品成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"删除数据产品失败: {str(e)}")
- res = failed(f"删除数据产品失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 手动注册数据产品接口 ====================
- @bp.route("/products", methods=["POST"])
- def register_product():
- """
- 手动注册数据产品
- Request Body:
- product_name: 数据产品名称(必填)
- product_name_en: 数据产品英文名(必填)
- target_table: 目标表名(必填)
- target_schema: 目标schema(可选,默认public)
- description: 描述(可选)
- """
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 验证必填字段
- required_fields = ["product_name", "product_name_en", "target_table"]
- for field in required_fields:
- if not data.get(field):
- res = failed(f"缺少必填字段: {field}", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- product = DataProductService.register_data_product(
- product_name=data["product_name"],
- product_name_en=data["product_name_en"],
- target_table=data["target_table"],
- target_schema=data.get("target_schema", "public"),
- description=data.get("description"),
- source_dataflow_id=data.get("source_dataflow_id"),
- source_dataflow_name=data.get("source_dataflow_name"),
- created_by=data.get("created_by", "manual"),
- )
- res = success(product.to_dict(), "注册数据产品成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"注册数据产品失败: {str(e)}")
- res = failed(f"注册数据产品失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # ==================== 数据订单接口 ====================
- @bp.route("/orderlist", methods=["GET"])
- def get_orders():
- """
- 获取数据订单列表
- Query Parameters:
- page: 页码,默认 1
- page_size: 每页数量,默认 20
- search: 搜索关键词
- status: 状态过滤 (pending/analyzing/processing/completed/rejected等)
- """
- try:
- page = request.args.get("page", 1, type=int)
- page_size = request.args.get("page_size", 20, type=int)
- search = request.args.get("search", "")
- status = request.args.get("status")
- result = DataOrderService.get_orders(
- page=page,
- page_size=page_size,
- search=search,
- status=status,
- )
- res = success(result, "获取数据订单列表成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据订单列表失败: {str(e)}")
- res = failed(f"获取数据订单列表失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/orders/<int:order_id>/detail", methods=["GET"])
- def get_order(order_id: int):
- """
- 获取数据订单详情
- Path Parameters:
- order_id: 数据订单ID
- """
- try:
- order = DataOrderService.get_order_by_id(order_id)
- if not order:
- res = failed("数据订单不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(order.to_dict(), "获取数据订单详情成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"获取数据订单详情失败: {str(e)}")
- res = failed(f"获取数据订单详情失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/neworder", methods=["POST"])
- def create_order():
- """
- 创建数据订单
- Request Body:
- title: 订单标题(必填)
- description: 需求描述(必填)
- created_by: 创建人(可选,默认user)
- """
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- # 验证必填字段
- required_fields = ["title", "description"]
- for field in required_fields:
- if not data.get(field):
- res = failed(f"缺少必填字段: {field}", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- order = DataOrderService.create_order(
- title=data["title"],
- description=data["description"],
- created_by=data.get("created_by", "user"),
- )
- res = success(order.to_dict(), "创建数据订单成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"创建数据订单失败: {str(e)}")
- res = failed(f"创建数据订单失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/orders/<int:order_id>/update", methods=["PUT"])
- def update_order(order_id: int):
- """
- 更新数据订单(支持修改描述和提取结果)
- 只允许在 pending、manual_review、need_supplement 状态下修改
- Path Parameters:
- order_id: 数据订单ID
- Request Body:
- title: 订单标题(可选)
- description: 需求描述(可选)
- extracted_domains: 提取的业务领域列表(可选)
- extracted_fields: 提取的数据字段列表(可选)
- extraction_purpose: 数据用途(可选)
- """
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- order = DataOrderService.update_order(
- order_id=order_id,
- title=data.get("title"),
- description=data.get("description"),
- extracted_domains=data.get("extracted_domains"),
- extracted_fields=data.get("extracted_fields"),
- extraction_purpose=data.get("extraction_purpose"),
- )
- if not order:
- res = failed("数据订单不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(order.to_dict(), "更新数据订单成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"更新数据订单参数错误: {str(ve)}")
- res = failed(str(ve), code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"更新数据订单失败: {str(e)}")
- res = failed(f"更新数据订单失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/orders/<int:order_id>/analyze", methods=["POST"])
- def analyze_order(order_id: int):
- """
- 分析数据订单(提取实体并检测图谱连通性)
- Path Parameters:
- order_id: 数据订单ID
- """
- try:
- order = DataOrderService.analyze_order(order_id)
- if not order:
- res = failed("数据订单不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(order.to_dict(), "数据订单分析完成")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"分析数据订单失败: {str(e)}")
- res = failed(f"分析数据订单失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/orders/<int:order_id>/approve", methods=["POST"])
- def approve_order(order_id: int):
- """
- 审批通过数据订单,并自动生成 BusinessDomain 和 DataFlow 资源
- 只允许从 pending_approval 或 manual_review 状态审批
- Path Parameters:
- order_id: 数据订单ID
- Request Body:
- processed_by: 处理人(可选,默认admin)
- Returns:
- order: 更新后的订单信息
- generated_resources: 生成的资源信息(包含 dataflow_id、target_business_domain_id 等)
- """
- try:
- data = request.get_json() or {}
- processed_by = data.get("processed_by", "admin")
- result = DataOrderService.approve_order(order_id, processed_by)
- res = success(result, "数据订单审批通过,资源已生成")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"审批数据订单参数错误: {str(ve)}")
- res = failed(str(ve), code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"审批数据订单失败: {str(e)}")
- res = failed(f"审批数据订单失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/orders/<int:order_id>/reject", methods=["POST"])
- def reject_order(order_id: int):
- """
- 驳回数据订单
- Path Parameters:
- order_id: 数据订单ID
- Request Body:
- reason: 驳回原因(必填)
- processed_by: 处理人(可选,默认admin)
- """
- try:
- data = request.get_json()
- if not data:
- res = failed("请求数据不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- reason = data.get("reason")
- if not reason:
- res = failed("驳回原因不能为空", code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- processed_by = data.get("processed_by", "admin")
- order = DataOrderService.reject_order(order_id, reason, processed_by)
- if not order:
- res = failed("数据订单不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(order.to_dict(), "数据订单已驳回")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"驳回数据订单失败: {str(e)}")
- res = failed(f"驳回数据订单失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/orders/<int:order_id>/onboard", methods=["POST"])
- def onboard_order(order_id: int):
- """
- 数据工厂回调:设置订单为数据产品就绪状态
- 只允许从 processing 状态转换为 onboard 状态
- Path Parameters:
- order_id: 数据订单ID
- Request Body:
- product_id: 生成的数据产品ID(可选)
- dataflow_id: 数据流ID(可选)
- processed_by: 处理人(可选,默认n8n-workflow)
- """
- try:
- data = request.get_json() or {}
- order = DataOrderService.set_order_onboard(
- order_id=order_id,
- product_id=data.get("product_id"),
- dataflow_id=data.get("dataflow_id"),
- processed_by=data.get("processed_by", "n8n-workflow"),
- )
- if not order:
- res = failed("数据订单不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(order.to_dict(), "数据订单已设置为数据产品就绪状态")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"设置订单onboard状态参数错误: {str(ve)}")
- res = failed(str(ve), code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"设置订单onboard状态失败: {str(e)}")
- res = failed(f"设置订单onboard状态失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/orders/<int:order_id>/complete", methods=["POST"])
- def complete_order(order_id: int):
- """
- 标记数据订单为最终完成状态
- 只允许从 onboard(数据产品就绪)状态标记完成
- Path Parameters:
- order_id: 数据订单ID
- Request Body:
- processed_by: 处理人(可选,默认user)
- """
- try:
- data = request.get_json() or {}
- order = DataOrderService.complete_order(
- order_id=order_id,
- processed_by=data.get("processed_by", "user"),
- )
- if not order:
- res = failed("数据订单不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success(order.to_dict(), "数据订单已完成")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except ValueError as ve:
- logger.warning(f"完成数据订单参数错误: {str(ve)}")
- res = failed(str(ve), code=400)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"完成数据订单失败: {str(e)}")
- res = failed(f"完成数据订单失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- @bp.route("/orders/<int:order_id>/delete", methods=["PUT"])
- def delete_order(order_id: int):
- """
- 删除数据订单(软删除)
- Path Parameters:
- order_id: 数据订单ID
- """
- try:
- result = DataOrderService.delete_order(order_id)
- if not result:
- res = failed("数据订单不存在", code=404)
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- res = success({}, "删除数据订单成功")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
- except Exception as e:
- logger.error(f"删除数据订单失败: {str(e)}")
- res = failed(f"删除数据订单失败: {str(e)}")
- return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
|