""" 数据服务 API 路由 提供数据产品列表、数据预览、Excel下载等接口 提供数据订单创建、分析、审批等接口 """ import json import logging from flask import request, send_file from app.api.data_service import bp from app.core.data_service.data_product_service import ( DataOrderService, DataProductService, ) from app.core.graph.graph_operations import MyEncoder from app.models.result import failed, success logger = logging.getLogger(__name__) # ==================== 数据产品列表接口 ==================== @bp.route("/products", methods=["GET"]) def get_products(): """ 获取数据产品列表 Query Parameters: page: 页码,默认 1 page_size: 每页数量,默认 20 search: 搜索关键词 status: 状态过滤 (active/inactive/error) """ try: page = request.args.get("page", 1, type=int) page_size = request.args.get("page_size", 20, type=int) search = request.args.get("search", "") status = request.args.get("status") result = DataProductService.get_data_products( page=page, page_size=page_size, search=search, status=status, ) res = success(result, "获取数据产品列表成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据产品列表失败: {str(e)}") res = failed(f"获取数据产品列表失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/products/", methods=["GET"]) def get_product(product_id: int): """ 获取数据产品详情 Path Parameters: product_id: 数据产品ID """ try: product = DataProductService.get_product_by_id(product_id) if not product: res = failed("数据产品不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(product.to_dict(), "获取数据产品详情成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据产品详情失败: {str(e)}") res = failed(f"获取数据产品详情失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 数据预览接口 ==================== @bp.route("/products//preview", methods=["GET"]) def get_product_preview(product_id: int): """ 获取数据产品的数据预览(默认200条) Path Parameters: product_id: 数据产品ID Query Parameters: limit: 预览数据条数,默认200,最大1000 """ try: limit = request.args.get("limit", 200, type=int) # 限制最大预览条数 limit = min(limit, 1000) result = DataProductService.get_product_preview( product_id=product_id, limit=limit, ) # 自动标记为已查看 DataProductService.mark_as_viewed(product_id) res = success(result, "获取数据预览成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.warning(f"获取数据预览参数错误: {str(ve)}") res = failed(str(ve), code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据预览失败: {str(e)}") res = failed(f"获取数据预览失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 数据加工可视化接口 ==================== @bp.route("/products//lineage-visualization", methods=["POST"]) def get_lineage_visualization(product_id: int): """ 获取数据产品的血缘可视化数据 通过数据产品关联的 BusinessDomain 节点,追溯其 INPUT/OUTPUT 血缘关系, 直到到达具有 DataResource 标签的源节点。同时将样例数据的键值映射到各节点字段。 Path Parameters: product_id: 数据产品ID Request Body: sample_data: 单条样例数据(JSON对象,key为中文字段名) Returns: nodes: 节点列表,包含 BusinessDomain 和 DataFlow 节点 lines: 关系列表,包含 INPUT 和 OUTPUT 关系 lineage_depth: 血缘追溯深度 """ try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) sample_data = data.get("sample_data") if not sample_data or not isinstance(sample_data, dict): res = failed("sample_data 必须是非空的 JSON 对象", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) result = DataProductService.get_data_lineage_visualization( product_id=product_id, sample_data=sample_data, ) res = success(result, "获取血缘可视化数据成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.warning(f"获取血缘可视化参数错误: {str(ve)}") res = failed(str(ve), code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取血缘可视化数据失败: {str(e)}") res = failed(f"获取血缘可视化数据失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== Excel下载接口 ==================== @bp.route("/products//download", methods=["GET"]) def download_product_excel(product_id: int): """ 下载数据产品数据为Excel文件 Path Parameters: product_id: 数据产品ID Query Parameters: limit: 导出数据条数,默认200,最大10000 """ try: limit = request.args.get("limit", 200, type=int) # 限制最大导出条数 limit = min(limit, 10000) excel_file, filename = DataProductService.export_to_excel( product_id=product_id, limit=limit, ) # 标记为已查看 DataProductService.mark_as_viewed(product_id) return send_file( excel_file, mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", as_attachment=True, download_name=filename, ) except ValueError as ve: logger.warning(f"下载Excel参数错误: {str(ve)}") res = failed(str(ve), code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"下载Excel失败: {str(e)}") res = failed(f"下载Excel失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 标记已查看接口 ==================== @bp.route("/products//viewed", methods=["POST"]) def mark_product_viewed(product_id: int): """ 标记数据产品为已查看(消除更新提示) Path Parameters: product_id: 数据产品ID """ try: product = DataProductService.mark_as_viewed(product_id) if not product: res = failed("数据产品不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(product.to_dict(), "标记已查看成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"标记已查看失败: {str(e)}") res = failed(f"标记已查看失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 刷新统计信息接口 ==================== @bp.route("/products//refresh", methods=["POST"]) def refresh_product_stats(product_id: int): """ 刷新数据产品的统计信息 Path Parameters: product_id: 数据产品ID """ try: product = DataProductService.refresh_product_stats(product_id) if not product: res = failed("数据产品不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(product.to_dict(), "刷新统计信息成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"刷新统计信息失败: {str(e)}") res = failed(f"刷新统计信息失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 删除数据产品接口 ==================== @bp.route("/products/", methods=["DELETE"]) def delete_product(product_id: int): """ 删除数据产品 Path Parameters: product_id: 数据产品ID """ try: result = DataProductService.delete_product(product_id) if not result: res = failed("数据产品不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success({}, "删除数据产品成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"删除数据产品失败: {str(e)}") res = failed(f"删除数据产品失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 手动注册数据产品接口 ==================== @bp.route("/products", methods=["POST"]) def register_product(): """ 手动注册数据产品 Request Body: product_name: 数据产品名称(必填) product_name_en: 数据产品英文名(必填) target_table: 目标表名(必填) target_schema: 目标schema(可选,默认public) description: 描述(可选) """ try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 验证必填字段 required_fields = ["product_name", "product_name_en", "target_table"] for field in required_fields: if not data.get(field): res = failed(f"缺少必填字段: {field}", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) product = DataProductService.register_data_product( product_name=data["product_name"], product_name_en=data["product_name_en"], target_table=data["target_table"], target_schema=data.get("target_schema", "public"), description=data.get("description"), source_dataflow_id=data.get("source_dataflow_id"), source_dataflow_name=data.get("source_dataflow_name"), created_by=data.get("created_by", "manual"), ) res = success(product.to_dict(), "注册数据产品成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"注册数据产品失败: {str(e)}") res = failed(f"注册数据产品失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # ==================== 数据订单接口 ==================== @bp.route("/orderlist", methods=["GET"]) def get_orders(): """ 获取数据订单列表 Query Parameters: page: 页码,默认 1 page_size: 每页数量,默认 20 search: 搜索关键词 status: 状态过滤 (pending/analyzing/processing/completed/rejected等) """ try: page = request.args.get("page", 1, type=int) page_size = request.args.get("page_size", 20, type=int) search = request.args.get("search", "") status = request.args.get("status") result = DataOrderService.get_orders( page=page, page_size=page_size, search=search, status=status, ) res = success(result, "获取数据订单列表成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据订单列表失败: {str(e)}") res = failed(f"获取数据订单列表失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/orders//detail", methods=["GET"]) def get_order(order_id: int): """ 获取数据订单详情 Path Parameters: order_id: 数据订单ID """ try: order = DataOrderService.get_order_by_id(order_id) if not order: res = failed("数据订单不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(order.to_dict(), "获取数据订单详情成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"获取数据订单详情失败: {str(e)}") res = failed(f"获取数据订单详情失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/neworder", methods=["POST"]) def create_order(): """ 创建数据订单 Request Body: title: 订单标题(必填) description: 需求描述(必填) created_by: 创建人(可选,默认user) """ try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) # 验证必填字段 required_fields = ["title", "description"] for field in required_fields: if not data.get(field): res = failed(f"缺少必填字段: {field}", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) order = DataOrderService.create_order( title=data["title"], description=data["description"], created_by=data.get("created_by", "user"), ) res = success(order.to_dict(), "创建数据订单成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"创建数据订单失败: {str(e)}") res = failed(f"创建数据订单失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/orders//update", methods=["PUT"]) def update_order(order_id: int): """ 更新数据订单(支持修改描述和提取结果) 只允许在 pending、manual_review、need_supplement 状态下修改 Path Parameters: order_id: 数据订单ID Request Body: title: 订单标题(可选) description: 需求描述(可选) extracted_domains: 提取的业务领域列表(可选) extracted_fields: 提取的数据字段列表(可选) extraction_purpose: 数据用途(可选) """ try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) order = DataOrderService.update_order( order_id=order_id, title=data.get("title"), description=data.get("description"), extracted_domains=data.get("extracted_domains"), extracted_fields=data.get("extracted_fields"), extraction_purpose=data.get("extraction_purpose"), ) if not order: res = failed("数据订单不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(order.to_dict(), "更新数据订单成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.warning(f"更新数据订单参数错误: {str(ve)}") res = failed(str(ve), code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"更新数据订单失败: {str(e)}") res = failed(f"更新数据订单失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/orders//analyze", methods=["POST"]) def analyze_order(order_id: int): """ 分析数据订单(提取实体并检测图谱连通性) Path Parameters: order_id: 数据订单ID """ try: order = DataOrderService.analyze_order(order_id) if not order: res = failed("数据订单不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(order.to_dict(), "数据订单分析完成") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"分析数据订单失败: {str(e)}") res = failed(f"分析数据订单失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/orders//approve", methods=["POST"]) def approve_order(order_id: int): """ 审批通过数据订单,并自动生成 BusinessDomain 和 DataFlow 资源 只允许从 pending_approval 或 manual_review 状态审批 Path Parameters: order_id: 数据订单ID Request Body: processed_by: 处理人(可选,默认admin) Returns: order: 更新后的订单信息 generated_resources: 生成的资源信息(包含 dataflow_id、target_business_domain_id 等) """ try: data = request.get_json() or {} processed_by = data.get("processed_by", "admin") result = DataOrderService.approve_order(order_id, processed_by) res = success(result, "数据订单审批通过,资源已生成") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.warning(f"审批数据订单参数错误: {str(ve)}") res = failed(str(ve), code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"审批数据订单失败: {str(e)}") res = failed(f"审批数据订单失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/orders//reject", methods=["POST"]) def reject_order(order_id: int): """ 驳回数据订单 Path Parameters: order_id: 数据订单ID Request Body: reason: 驳回原因(必填) processed_by: 处理人(可选,默认admin) """ try: data = request.get_json() if not data: res = failed("请求数据不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) reason = data.get("reason") if not reason: res = failed("驳回原因不能为空", code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) processed_by = data.get("processed_by", "admin") order = DataOrderService.reject_order(order_id, reason, processed_by) if not order: res = failed("数据订单不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(order.to_dict(), "数据订单已驳回") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"驳回数据订单失败: {str(e)}") res = failed(f"驳回数据订单失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/orders//onboard", methods=["POST"]) def onboard_order(order_id: int): """ 数据工厂回调:设置订单为数据产品就绪状态 只允许从 processing 状态转换为 onboard 状态 Path Parameters: order_id: 数据订单ID Request Body: product_id: 生成的数据产品ID(可选) dataflow_id: 数据流ID(可选) processed_by: 处理人(可选,默认n8n-workflow) """ try: data = request.get_json() or {} order = DataOrderService.set_order_onboard( order_id=order_id, product_id=data.get("product_id"), dataflow_id=data.get("dataflow_id"), processed_by=data.get("processed_by", "n8n-workflow"), ) if not order: res = failed("数据订单不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(order.to_dict(), "数据订单已设置为数据产品就绪状态") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.warning(f"设置订单onboard状态参数错误: {str(ve)}") res = failed(str(ve), code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"设置订单onboard状态失败: {str(e)}") res = failed(f"设置订单onboard状态失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/orders//complete", methods=["POST"]) def complete_order(order_id: int): """ 标记数据订单为最终完成状态 只允许从 onboard(数据产品就绪)状态标记完成 Path Parameters: order_id: 数据订单ID Request Body: processed_by: 处理人(可选,默认user) """ try: data = request.get_json() or {} order = DataOrderService.complete_order( order_id=order_id, processed_by=data.get("processed_by", "user"), ) if not order: res = failed("数据订单不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success(order.to_dict(), "数据订单已完成") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except ValueError as ve: logger.warning(f"完成数据订单参数错误: {str(ve)}") res = failed(str(ve), code=400) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"完成数据订单失败: {str(e)}") res = failed(f"完成数据订单失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) @bp.route("/orders//delete", methods=["PUT"]) def delete_order(order_id: int): """ 删除数据订单(软删除) Path Parameters: order_id: 数据订单ID """ try: result = DataOrderService.delete_order(order_id) if not result: res = failed("数据订单不存在", code=404) return json.dumps(res, ensure_ascii=False, cls=MyEncoder) res = success({}, "删除数据订单成功") return json.dumps(res, ensure_ascii=False, cls=MyEncoder) except Exception as e: logger.error(f"删除数据订单失败: {str(e)}") res = failed(f"删除数据订单失败: {str(e)}") return json.dumps(res, ensure_ascii=False, cls=MyEncoder)