routes.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. """
  2. 数据服务 API 路由
  3. 提供数据产品列表、数据预览、Excel下载等接口
  4. 提供数据订单创建、分析、审批等接口
  5. """
  6. import json
  7. import logging
  8. from flask import request, send_file
  9. from app.api.data_service import bp
  10. from app.core.data_service.data_product_service import (
  11. DataOrderService,
  12. DataProductService,
  13. )
  14. from app.core.graph.graph_operations import MyEncoder
  15. from app.models.result import failed, success
  16. logger = logging.getLogger(__name__)
  17. # ==================== 数据产品列表接口 ====================
  18. @bp.route("/products", methods=["GET"])
  19. def get_products():
  20. """
  21. 获取数据产品列表
  22. Query Parameters:
  23. page: 页码,默认 1
  24. page_size: 每页数量,默认 20
  25. search: 搜索关键词
  26. status: 状态过滤 (active/inactive/error)
  27. """
  28. try:
  29. page = request.args.get("page", 1, type=int)
  30. page_size = request.args.get("page_size", 20, type=int)
  31. search = request.args.get("search", "")
  32. status = request.args.get("status")
  33. result = DataProductService.get_data_products(
  34. page=page,
  35. page_size=page_size,
  36. search=search,
  37. status=status,
  38. )
  39. res = success(result, "获取数据产品列表成功")
  40. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  41. except Exception as e:
  42. logger.error(f"获取数据产品列表失败: {str(e)}")
  43. res = failed(f"获取数据产品列表失败: {str(e)}")
  44. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  45. @bp.route("/products/<int:product_id>", methods=["GET"])
  46. def get_product(product_id: int):
  47. """
  48. 获取数据产品详情
  49. Path Parameters:
  50. product_id: 数据产品ID
  51. """
  52. try:
  53. product = DataProductService.get_product_by_id(product_id)
  54. if not product:
  55. res = failed("数据产品不存在", code=404)
  56. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  57. res = success(product.to_dict(), "获取数据产品详情成功")
  58. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  59. except Exception as e:
  60. logger.error(f"获取数据产品详情失败: {str(e)}")
  61. res = failed(f"获取数据产品详情失败: {str(e)}")
  62. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  63. # ==================== 数据预览接口 ====================
  64. @bp.route("/products/<int:product_id>/preview", methods=["GET"])
  65. def get_product_preview(product_id: int):
  66. """
  67. 获取数据产品的数据预览(默认200条)
  68. Path Parameters:
  69. product_id: 数据产品ID
  70. Query Parameters:
  71. limit: 预览数据条数,默认200,最大1000
  72. """
  73. try:
  74. limit = request.args.get("limit", 200, type=int)
  75. # 限制最大预览条数
  76. limit = min(limit, 1000)
  77. result = DataProductService.get_product_preview(
  78. product_id=product_id,
  79. limit=limit,
  80. )
  81. # 自动标记为已查看
  82. DataProductService.mark_as_viewed(product_id)
  83. res = success(result, "获取数据预览成功")
  84. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  85. except ValueError as ve:
  86. logger.warning(f"获取数据预览参数错误: {str(ve)}")
  87. res = failed(str(ve), code=404)
  88. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  89. except Exception as e:
  90. logger.error(f"获取数据预览失败: {str(e)}")
  91. res = failed(f"获取数据预览失败: {str(e)}")
  92. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  93. # ==================== 数据加工可视化接口 ====================
  94. @bp.route("/products/<int:product_id>/lineage-visualization", methods=["POST"])
  95. def get_lineage_visualization(product_id: int):
  96. """
  97. 获取数据产品的血缘可视化数据
  98. 通过数据产品关联的 BusinessDomain 节点,追溯其 INPUT/OUTPUT 血缘关系,
  99. 直到到达具有 DataResource 标签的源节点。同时将样例数据的键值映射到各节点字段。
  100. Path Parameters:
  101. product_id: 数据产品ID
  102. Request Body:
  103. sample_data: 单条样例数据(JSON对象,key为中文字段名)
  104. Returns:
  105. nodes: 节点列表,包含 BusinessDomain 和 DataFlow 节点
  106. lines: 关系列表,包含 INPUT 和 OUTPUT 关系
  107. lineage_depth: 血缘追溯深度
  108. """
  109. try:
  110. data = request.get_json()
  111. if not data:
  112. res = failed("请求数据不能为空", code=400)
  113. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  114. sample_data = data.get("sample_data")
  115. if not sample_data or not isinstance(sample_data, dict):
  116. res = failed("sample_data 必须是非空的 JSON 对象", code=400)
  117. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  118. result = DataProductService.get_data_lineage_visualization(
  119. product_id=product_id,
  120. sample_data=sample_data,
  121. )
  122. res = success(result, "获取血缘可视化数据成功")
  123. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  124. except ValueError as ve:
  125. logger.warning(f"获取血缘可视化参数错误: {str(ve)}")
  126. res = failed(str(ve), code=404)
  127. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  128. except Exception as e:
  129. logger.error(f"获取血缘可视化数据失败: {str(e)}")
  130. res = failed(f"获取血缘可视化数据失败: {str(e)}")
  131. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  132. # ==================== Excel下载接口 ====================
  133. @bp.route("/products/<int:product_id>/download", methods=["GET"])
  134. def download_product_excel(product_id: int):
  135. """
  136. 下载数据产品数据为Excel文件
  137. Path Parameters:
  138. product_id: 数据产品ID
  139. Query Parameters:
  140. limit: 导出数据条数,默认200,最大10000
  141. """
  142. try:
  143. limit = request.args.get("limit", 200, type=int)
  144. # 限制最大导出条数
  145. limit = min(limit, 10000)
  146. excel_file, filename = DataProductService.export_to_excel(
  147. product_id=product_id,
  148. limit=limit,
  149. )
  150. # 标记为已查看
  151. DataProductService.mark_as_viewed(product_id)
  152. return send_file(
  153. excel_file,
  154. mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
  155. as_attachment=True,
  156. download_name=filename,
  157. )
  158. except ValueError as ve:
  159. logger.warning(f"下载Excel参数错误: {str(ve)}")
  160. res = failed(str(ve), code=404)
  161. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  162. except Exception as e:
  163. logger.error(f"下载Excel失败: {str(e)}")
  164. res = failed(f"下载Excel失败: {str(e)}")
  165. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  166. # ==================== 标记已查看接口 ====================
  167. @bp.route("/products/<int:product_id>/viewed", methods=["POST"])
  168. def mark_product_viewed(product_id: int):
  169. """
  170. 标记数据产品为已查看(消除更新提示)
  171. Path Parameters:
  172. product_id: 数据产品ID
  173. """
  174. try:
  175. product = DataProductService.mark_as_viewed(product_id)
  176. if not product:
  177. res = failed("数据产品不存在", code=404)
  178. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  179. res = success(product.to_dict(), "标记已查看成功")
  180. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  181. except Exception as e:
  182. logger.error(f"标记已查看失败: {str(e)}")
  183. res = failed(f"标记已查看失败: {str(e)}")
  184. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  185. # ==================== 刷新统计信息接口 ====================
  186. @bp.route("/products/<int:product_id>/refresh", methods=["POST"])
  187. def refresh_product_stats(product_id: int):
  188. """
  189. 刷新数据产品的统计信息
  190. Path Parameters:
  191. product_id: 数据产品ID
  192. """
  193. try:
  194. product = DataProductService.refresh_product_stats(product_id)
  195. if not product:
  196. res = failed("数据产品不存在", code=404)
  197. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  198. res = success(product.to_dict(), "刷新统计信息成功")
  199. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  200. except Exception as e:
  201. logger.error(f"刷新统计信息失败: {str(e)}")
  202. res = failed(f"刷新统计信息失败: {str(e)}")
  203. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  204. # ==================== 删除数据产品接口 ====================
  205. @bp.route("/products/<int:product_id>", methods=["DELETE"])
  206. def delete_product(product_id: int):
  207. """
  208. 删除数据产品
  209. Path Parameters:
  210. product_id: 数据产品ID
  211. """
  212. try:
  213. result = DataProductService.delete_product(product_id)
  214. if not result:
  215. res = failed("数据产品不存在", code=404)
  216. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  217. res = success({}, "删除数据产品成功")
  218. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  219. except Exception as e:
  220. logger.error(f"删除数据产品失败: {str(e)}")
  221. res = failed(f"删除数据产品失败: {str(e)}")
  222. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  223. # ==================== 手动注册数据产品接口 ====================
  224. @bp.route("/products", methods=["POST"])
  225. def register_product():
  226. """
  227. 手动注册数据产品
  228. Request Body:
  229. product_name: 数据产品名称(必填)
  230. product_name_en: 数据产品英文名(必填)
  231. target_table: 目标表名(必填)
  232. target_schema: 目标schema(可选,默认public)
  233. description: 描述(可选)
  234. """
  235. try:
  236. data = request.get_json()
  237. if not data:
  238. res = failed("请求数据不能为空", code=400)
  239. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  240. # 验证必填字段
  241. required_fields = ["product_name", "product_name_en", "target_table"]
  242. for field in required_fields:
  243. if not data.get(field):
  244. res = failed(f"缺少必填字段: {field}", code=400)
  245. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  246. product = DataProductService.register_data_product(
  247. product_name=data["product_name"],
  248. product_name_en=data["product_name_en"],
  249. target_table=data["target_table"],
  250. target_schema=data.get("target_schema", "public"),
  251. description=data.get("description"),
  252. source_dataflow_id=data.get("source_dataflow_id"),
  253. source_dataflow_name=data.get("source_dataflow_name"),
  254. created_by=data.get("created_by", "manual"),
  255. )
  256. res = success(product.to_dict(), "注册数据产品成功")
  257. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  258. except Exception as e:
  259. logger.error(f"注册数据产品失败: {str(e)}")
  260. res = failed(f"注册数据产品失败: {str(e)}")
  261. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  262. # ==================== 数据订单接口 ====================
  263. @bp.route("/orderlist", methods=["GET"])
  264. def get_orders():
  265. """
  266. 获取数据订单列表
  267. Query Parameters:
  268. page: 页码,默认 1
  269. page_size: 每页数量,默认 20
  270. search: 搜索关键词
  271. status: 状态过滤 (pending/analyzing/processing/completed/rejected等)
  272. """
  273. try:
  274. page = request.args.get("page", 1, type=int)
  275. page_size = request.args.get("page_size", 20, type=int)
  276. search = request.args.get("search", "")
  277. status = request.args.get("status")
  278. result = DataOrderService.get_orders(
  279. page=page,
  280. page_size=page_size,
  281. search=search,
  282. status=status,
  283. )
  284. res = success(result, "获取数据订单列表成功")
  285. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  286. except Exception as e:
  287. logger.error(f"获取数据订单列表失败: {str(e)}")
  288. res = failed(f"获取数据订单列表失败: {str(e)}")
  289. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  290. @bp.route("/orders/<int:order_id>", methods=["GET"])
  291. def get_order(order_id: int):
  292. """
  293. 获取数据订单详情
  294. Path Parameters:
  295. order_id: 数据订单ID
  296. """
  297. try:
  298. order = DataOrderService.get_order_by_id(order_id)
  299. if not order:
  300. res = failed("数据订单不存在", code=404)
  301. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  302. res = success(order.to_dict(), "获取数据订单详情成功")
  303. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  304. except Exception as e:
  305. logger.error(f"获取数据订单详情失败: {str(e)}")
  306. res = failed(f"获取数据订单详情失败: {str(e)}")
  307. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  308. @bp.route("/neworder", methods=["POST"])
  309. def create_order():
  310. """
  311. 创建数据订单
  312. Request Body:
  313. title: 订单标题(必填)
  314. description: 需求描述(必填)
  315. created_by: 创建人(可选,默认user)
  316. """
  317. try:
  318. data = request.get_json()
  319. if not data:
  320. res = failed("请求数据不能为空", code=400)
  321. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  322. # 验证必填字段
  323. required_fields = ["title", "description"]
  324. for field in required_fields:
  325. if not data.get(field):
  326. res = failed(f"缺少必填字段: {field}", code=400)
  327. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  328. order = DataOrderService.create_order(
  329. title=data["title"],
  330. description=data["description"],
  331. created_by=data.get("created_by", "user"),
  332. )
  333. res = success(order.to_dict(), "创建数据订单成功")
  334. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  335. except Exception as e:
  336. logger.error(f"创建数据订单失败: {str(e)}")
  337. res = failed(f"创建数据订单失败: {str(e)}")
  338. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  339. @bp.route("/orders/<int:order_id>/analyze", methods=["POST"])
  340. def analyze_order(order_id: int):
  341. """
  342. 分析数据订单(提取实体并检测图谱连通性)
  343. Path Parameters:
  344. order_id: 数据订单ID
  345. """
  346. try:
  347. order = DataOrderService.analyze_order(order_id)
  348. if not order:
  349. res = failed("数据订单不存在", code=404)
  350. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  351. res = success(order.to_dict(), "数据订单分析完成")
  352. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  353. except Exception as e:
  354. logger.error(f"分析数据订单失败: {str(e)}")
  355. res = failed(f"分析数据订单失败: {str(e)}")
  356. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  357. @bp.route("/orders/<int:order_id>/approve", methods=["POST"])
  358. def approve_order(order_id: int):
  359. """
  360. 审批通过数据订单
  361. Path Parameters:
  362. order_id: 数据订单ID
  363. Request Body:
  364. processed_by: 处理人(可选,默认admin)
  365. """
  366. try:
  367. data = request.get_json() or {}
  368. processed_by = data.get("processed_by", "admin")
  369. order = DataOrderService.approve_order(order_id, processed_by)
  370. if not order:
  371. res = failed("数据订单不存在", code=404)
  372. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  373. res = success(order.to_dict(), "数据订单审批通过")
  374. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  375. except ValueError as ve:
  376. logger.warning(f"审批数据订单参数错误: {str(ve)}")
  377. res = failed(str(ve), code=400)
  378. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  379. except Exception as e:
  380. logger.error(f"审批数据订单失败: {str(e)}")
  381. res = failed(f"审批数据订单失败: {str(e)}")
  382. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  383. @bp.route("/orders/<int:order_id>/reject", methods=["POST"])
  384. def reject_order(order_id: int):
  385. """
  386. 驳回数据订单
  387. Path Parameters:
  388. order_id: 数据订单ID
  389. Request Body:
  390. reason: 驳回原因(必填)
  391. processed_by: 处理人(可选,默认admin)
  392. """
  393. try:
  394. data = request.get_json()
  395. if not data:
  396. res = failed("请求数据不能为空", code=400)
  397. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  398. reason = data.get("reason")
  399. if not reason:
  400. res = failed("驳回原因不能为空", code=400)
  401. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  402. processed_by = data.get("processed_by", "admin")
  403. order = DataOrderService.reject_order(order_id, reason, processed_by)
  404. if not order:
  405. res = failed("数据订单不存在", code=404)
  406. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  407. res = success(order.to_dict(), "数据订单已驳回")
  408. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  409. except Exception as e:
  410. logger.error(f"驳回数据订单失败: {str(e)}")
  411. res = failed(f"驳回数据订单失败: {str(e)}")
  412. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  413. @bp.route("/orders/<int:order_id>/complete", methods=["POST"])
  414. def complete_order(order_id: int):
  415. """
  416. 完成数据订单
  417. Path Parameters:
  418. order_id: 数据订单ID
  419. Request Body:
  420. product_id: 生成的数据产品ID(可选)
  421. dataflow_id: 生成的数据流ID(可选)
  422. processed_by: 处理人(可选,默认system)
  423. """
  424. try:
  425. data = request.get_json() or {}
  426. order = DataOrderService.complete_order(
  427. order_id=order_id,
  428. product_id=data.get("product_id"),
  429. dataflow_id=data.get("dataflow_id"),
  430. processed_by=data.get("processed_by", "system"),
  431. )
  432. if not order:
  433. res = failed("数据订单不存在", code=404)
  434. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  435. res = success(order.to_dict(), "数据订单已完成")
  436. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  437. except Exception as e:
  438. logger.error(f"完成数据订单失败: {str(e)}")
  439. res = failed(f"完成数据订单失败: {str(e)}")
  440. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  441. @bp.route("/orders/<int:order_id>", methods=["DELETE"])
  442. def delete_order(order_id: int):
  443. """
  444. 删除数据订单
  445. Path Parameters:
  446. order_id: 数据订单ID
  447. """
  448. try:
  449. result = DataOrderService.delete_order(order_id)
  450. if not result:
  451. res = failed("数据订单不存在", code=404)
  452. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  453. res = success({}, "删除数据订单成功")
  454. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)
  455. except Exception as e:
  456. logger.error(f"删除数据订单失败: {str(e)}")
  457. res = failed(f"删除数据订单失败: {str(e)}")
  458. return json.dumps(res, ensure_ascii=False, cls=MyEncoder)