routes.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. """
  2. 数据服务 API 路由
  3. 提供数据产品列表、数据预览、Excel下载等接口
  4. """
  5. import json
  6. import logging
  7. from flask import request, send_file
  8. from app.api.data_service import bp
  9. from app.core.data_service.data_product_service import DataProductService
  10. from app.core.graph.graph_operations import MyEncoder
  11. from app.models.result import failed, success
  12. logger = logging.getLogger(__name__)
  13. # ==================== 数据产品列表接口 ====================
  14. @bp.route("/products", methods=["GET"])
  15. def get_products():
  16. """
  17. 获取数据产品列表
  18. Query Parameters:
  19. page: 页码,默认 1
  20. page_size: 每页数量,默认 20
  21. search: 搜索关键词
  22. status: 状态过滤 (active/inactive/error)
  23. """
  24. try:
  25. page = request.args.get("page", 1, type=int)
  26. page_size = request.args.get("page_size", 20, type=int)
  27. search = request.args.get("search", "")
  28. status = request.args.get("status")
  29. result = DataProductService.get_data_products(
  30. page=page,
  31. page_size=page_size,
  32. search=search,
  33. status=status,
  34. )
  35. res = success(result, "获取数据产品列表成功")
  36. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  37. except Exception as e:
  38. logger.error(f"获取数据产品列表失败: {str(e)}")
  39. res = failed(f"获取数据产品列表失败: {str(e)}")
  40. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  41. @bp.route("/products/<int:product_id>", methods=["GET"])
  42. def get_product(product_id: int):
  43. """
  44. 获取数据产品详情
  45. Path Parameters:
  46. product_id: 数据产品ID
  47. """
  48. try:
  49. product = DataProductService.get_product_by_id(product_id)
  50. if not product:
  51. res = failed("数据产品不存在", code=404)
  52. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  53. res = success(product.to_dict(), "获取数据产品详情成功")
  54. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  55. except Exception as e:
  56. logger.error(f"获取数据产品详情失败: {str(e)}")
  57. res = failed(f"获取数据产品详情失败: {str(e)}")
  58. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  59. # ==================== 数据预览接口 ====================
  60. @bp.route("/products/<int:product_id>/preview", methods=["GET"])
  61. def get_product_preview(product_id: int):
  62. """
  63. 获取数据产品的数据预览(默认200条)
  64. Path Parameters:
  65. product_id: 数据产品ID
  66. Query Parameters:
  67. limit: 预览数据条数,默认200,最大1000
  68. """
  69. try:
  70. limit = request.args.get("limit", 200, type=int)
  71. # 限制最大预览条数
  72. limit = min(limit, 1000)
  73. result = DataProductService.get_product_preview(
  74. product_id=product_id,
  75. limit=limit,
  76. )
  77. # 自动标记为已查看
  78. DataProductService.mark_as_viewed(product_id)
  79. res = success(result, "获取数据预览成功")
  80. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  81. except ValueError as ve:
  82. logger.warning(f"获取数据预览参数错误: {str(ve)}")
  83. res = failed(str(ve), code=404)
  84. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  85. except Exception as e:
  86. logger.error(f"获取数据预览失败: {str(e)}")
  87. res = failed(f"获取数据预览失败: {str(e)}")
  88. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  89. # ==================== Excel下载接口 ====================
  90. @bp.route("/products/<int:product_id>/download", methods=["GET"])
  91. def download_product_excel(product_id: int):
  92. """
  93. 下载数据产品数据为Excel文件
  94. Path Parameters:
  95. product_id: 数据产品ID
  96. Query Parameters:
  97. limit: 导出数据条数,默认200,最大10000
  98. """
  99. try:
  100. limit = request.args.get("limit", 200, type=int)
  101. # 限制最大导出条数
  102. limit = min(limit, 10000)
  103. excel_file, filename = DataProductService.export_to_excel(
  104. product_id=product_id,
  105. limit=limit,
  106. )
  107. # 标记为已查看
  108. DataProductService.mark_as_viewed(product_id)
  109. return send_file(
  110. excel_file,
  111. mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  112. as_attachment=True,
  113. download_name=filename,
  114. )
  115. except ValueError as ve:
  116. logger.warning(f"下载Excel参数错误: {str(ve)}")
  117. res = failed(str(ve), code=404)
  118. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  119. except Exception as e:
  120. logger.error(f"下载Excel失败: {str(e)}")
  121. res = failed(f"下载Excel失败: {str(e)}")
  122. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  123. # ==================== 标记已查看接口 ====================
  124. @bp.route("/products/<int:product_id>/viewed", methods=["POST"])
  125. def mark_product_viewed(product_id: int):
  126. """
  127. 标记数据产品为已查看(消除更新提示)
  128. Path Parameters:
  129. product_id: 数据产品ID
  130. """
  131. try:
  132. product = DataProductService.mark_as_viewed(product_id)
  133. if not product:
  134. res = failed("数据产品不存在", code=404)
  135. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  136. res = success(product.to_dict(), "标记已查看成功")
  137. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  138. except Exception as e:
  139. logger.error(f"标记已查看失败: {str(e)}")
  140. res = failed(f"标记已查看失败: {str(e)}")
  141. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  142. # ==================== 刷新统计信息接口 ====================
  143. @bp.route("/products/<int:product_id>/refresh", methods=["POST"])
  144. def refresh_product_stats(product_id: int):
  145. """
  146. 刷新数据产品的统计信息
  147. Path Parameters:
  148. product_id: 数据产品ID
  149. """
  150. try:
  151. product = DataProductService.refresh_product_stats(product_id)
  152. if not product:
  153. res = failed("数据产品不存在", code=404)
  154. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  155. res = success(product.to_dict(), "刷新统计信息成功")
  156. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  157. except Exception as e:
  158. logger.error(f"刷新统计信息失败: {str(e)}")
  159. res = failed(f"刷新统计信息失败: {str(e)}")
  160. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  161. # ==================== 删除数据产品接口 ====================
  162. @bp.route("/products/<int:product_id>", methods=["DELETE"])
  163. def delete_product(product_id: int):
  164. """
  165. 删除数据产品
  166. Path Parameters:
  167. product_id: 数据产品ID
  168. """
  169. try:
  170. result = DataProductService.delete_product(product_id)
  171. if not result:
  172. res = failed("数据产品不存在", code=404)
  173. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  174. res = success({}, "删除数据产品成功")
  175. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  176. except Exception as e:
  177. logger.error(f"删除数据产品失败: {str(e)}")
  178. res = failed(f"删除数据产品失败: {str(e)}")
  179. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  180. # ==================== 手动注册数据产品接口 ====================
  181. @bp.route("/products", methods=["POST"])
  182. def register_product():
  183. """
  184. 手动注册数据产品
  185. Request Body:
  186. product_name: 数据产品名称(必填)
  187. product_name_en: 数据产品英文名(必填)
  188. target_table: 目标表名(必填)
  189. target_schema: 目标schema(可选,默认public)
  190. description: 描述(可选)
  191. """
  192. try:
  193. data = request.get_json()
  194. if not data:
  195. res = failed("请求数据不能为空", code=400)
  196. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  197. # 验证必填字段
  198. required_fields = ["product_name", "product_name_en", "target_table"]
  199. for field in required_fields:
  200. if not data.get(field):
  201. res = failed(f"缺少必填字段: {field}", code=400)
  202. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  203. product = DataProductService.register_data_product(
  204. product_name=data["product_name"],
  205. product_name_en=data["product_name_en"],
  206. target_table=data["target_table"],
  207. target_schema=data.get("target_schema", "public"),
  208. description=data.get("description"),
  209. source_dataflow_id=data.get("source_dataflow_id"),
  210. source_dataflow_name=data.get("source_dataflow_name"),
  211. created_by=data.get("created_by", "manual"),
  212. )
  213. res = success(product.to_dict(), "注册数据产品成功")
  214. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  215. except Exception as e:
  216. logger.error(f"注册数据产品失败: {str(e)}")
  217. res = failed(f"注册数据产品失败: {str(e)}")
  218. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)