routes.py 18 KB


  1. """
  2. Business Domain API 路由模块
  3. 提供业务领域相关的 RESTful API 接口
  4. """
  5. import io
  6. import json
  7. import time
  8. import logging
  9. import traceback
  10. import urllib.parse
  11. from flask import request, jsonify, current_app, send_file
  12. from minio import Minio
  13. from minio.error import S3Error
  14. from app.api.business_domain import bp
  15. from app.models.result import success, failed
  16. from app.services.neo4j_driver import neo4j_driver
  17. from app.core.llm.ddl_parser import DDLParser
  18. from app.core.business_domain import (
  19. business_domain_list,
  20. get_business_domain_by_id,
  21. delete_business_domain,
  22. update_business_domain,
  23. save_business_domain,
  24. business_domain_graph_all,
  25. business_domain_search_list,
  26. business_domain_compose,
  27. business_domain_label_list,
  28. )
  29. logger = logging.getLogger("app")
  30. # ----------------------- MinIO helpers -----------------------
  31. def get_minio_client():
  32. """获取 MinIO 客户端实例"""
  33. return Minio(
  34. current_app.config["MINIO_HOST"],
  35. access_key=current_app.config["MINIO_USER"],
  36. secret_key=current_app.config["MINIO_PASSWORD"],
  37. secure=current_app.config["MINIO_SECURE"],
  38. )
  39. def get_minio_config():
  40. """获取 MinIO 配置"""
  41. return {
  42. "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
  43. "PREFIX": current_app.config.get(
  44. "BUSINESS_DOMAIN_PREFIX", "business_domain"
  45. ),
  46. "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
  47. }
  48. def allowed_file(filename):
  49. """检查文件扩展名是否允许"""
  50. if "." not in filename:
  51. return False
  52. ext = filename.rsplit(".", 1)[1].lower()
  53. return ext in get_minio_config()["ALLOWED_EXTENSIONS"]
  54. # ----------------------- Business Domain APIs -----------------------
  55. @bp.route("/list", methods=["POST"])
  56. def bd_list():
  57. """获取业务领域列表"""
  58. try:
  59. if not request.json:
  60. return jsonify(failed("请求数据不能为空"))
  61. page = int(request.json.get("current", 1))
  62. page_size = int(request.json.get("size", 10))
  63. name_en_filter = request.json.get("name_en")
  64. name_zh_filter = request.json.get("name_zh")
  65. type_filter = request.json.get("type", "all")
  66. category_filter = request.json.get("category")
  67. tag_filter = request.json.get("tag")
  68. domains, total_count = business_domain_list(
  69. page,
  70. page_size,
  71. name_en_filter,
  72. name_zh_filter,
  73. type_filter,
  74. category_filter,
  75. tag_filter,
  76. )
  77. return jsonify(
  78. success(
  79. {
  80. "records": domains,
  81. "total": total_count,
  82. "size": page_size,
  83. "current": page,
  84. }
  85. )
  86. )
  87. except Exception as e:
  88. logger.error(f"获取业务领域列表失败: {str(e)}")
  89. return jsonify(failed("获取业务领域列表失败", error=str(e)))
  90. @bp.route("/detail", methods=["POST"])
  91. def bd_detail():
  92. """获取业务领域详情"""
  93. try:
  94. if not request.json:
  95. return jsonify(failed("请求数据不能为空"))
  96. domain_id = request.json.get("id")
  97. if domain_id is None:
  98. return jsonify(failed("业务领域ID不能为空"))
  99. try:
  100. domain_id = int(domain_id)
  101. except (ValueError, TypeError):
  102. return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
  103. domain_data = get_business_domain_by_id(domain_id)
  104. if not domain_data:
  105. return jsonify(failed("业务领域不存在"))
  106. return jsonify(success(domain_data))
  107. except Exception as e:
  108. logger.error(f"获取业务领域详情失败: {str(e)}")
  109. return jsonify(failed("获取业务领域详情失败", error=str(e)))
  110. @bp.route("/delete", methods=["POST"])
  111. def bd_delete():
  112. """删除业务领域"""
  113. try:
  114. if not request.json:
  115. return jsonify(failed("请求数据不能为空"))
  116. domain_id = request.json.get("id")
  117. if domain_id is None:
  118. return jsonify(failed("业务领域ID不能为空"))
  119. result = delete_business_domain(domain_id)
  120. if result:
  121. return jsonify(success({"message": "业务领域删除成功"}))
  122. return jsonify(failed("业务领域删除失败"))
  123. except Exception as e:
  124. logger.error(f"删除业务领域失败: {str(e)}")
  125. return jsonify(failed("删除业务领域失败", error=str(e)))
  126. @bp.route("/save", methods=["POST"])
  127. def bd_save():
  128. """保存业务领域(新建或更新)"""
  129. try:
  130. data = request.json
  131. if not data:
  132. return jsonify(failed("请求数据不能为空"))
  133. if not data.get("id"):
  134. if not data.get("name_zh") or not data.get("name_en"):
  135. return jsonify(failed("新建时 name_zh 和 name_en 为必填项"))
  136. saved_data = save_business_domain(data)
  137. return jsonify(success(saved_data))
  138. except Exception as e:
  139. logger.error(f"保存业务领域失败: {str(e)}")
  140. return jsonify(failed("保存业务领域失败", error=str(e)))
  141. @bp.route("/update", methods=["POST"])
  142. def bd_update():
  143. """更新业务领域"""
  144. try:
  145. data = request.json
  146. if not data or "id" not in data:
  147. return jsonify(failed("参数不完整"))
  148. updated_data = update_business_domain(data)
  149. return jsonify(success(updated_data))
  150. except Exception as e:
  151. logger.error(f"更新业务领域失败: {str(e)}")
  152. return jsonify(failed("更新业务领域失败", error=str(e)))
  153. @bp.route("/upload", methods=["POST"])
  154. def bd_upload():
  155. """上传业务领域相关文件"""
  156. response = None
  157. try:
  158. if "file" not in request.files:
  159. return jsonify(failed("没有找到上传的文件"))
  160. file = request.files["file"]
  161. if file.filename == "":
  162. return jsonify(failed("未选择文件"))
  163. if not allowed_file(file.filename):
  164. return jsonify(failed("不支持的文件类型"))
  165. minio_client = get_minio_client()
  166. config = get_minio_config()
  167. file_content = file.read()
  168. file_size = len(file_content)
  169. filename = file.filename or ""
  170. file_type = filename.rsplit(".", 1)[1].lower()
  171. filename_without_ext = filename.rsplit(".", 1)[0]
  172. timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
  173. object_name = (
  174. f"{config['PREFIX']}/"
  175. f"{filename_without_ext}_{timestamp}.{file_type}"
  176. )
  177. minio_client.put_object(
  178. config["MINIO_BUCKET"],
  179. object_name,
  180. io.BytesIO(file_content),
  181. file_size,
  182. content_type=f"application/{file_type}",
  183. )
  184. logger.info(f"文件上传成功: {object_name}, 大小: {file_size}")
  185. return jsonify(
  186. success(
  187. {
  188. "filename": file.filename,
  189. "size": file_size,
  190. "type": file_type,
  191. "url": object_name,
  192. }
  193. )
  194. )
  195. except Exception as e:
  196. logger.error(f"文件上传失败: {str(e)}")
  197. return jsonify(failed("文件上传失败", error=str(e)))
  198. finally:
  199. if response:
  200. response.close()
  201. response.release_conn()
  202. @bp.route("/download", methods=["GET"])
  203. def bd_download():
  204. """下载业务领域相关文件"""
  205. response = None
  206. try:
  207. object_name = request.args.get("url")
  208. if not object_name:
  209. return jsonify(failed("文件路径不能为空"))
  210. object_name = urllib.parse.unquote(object_name)
  211. logger.info(f"下载文件请求: {object_name}")
  212. minio_client = get_minio_client()
  213. config = get_minio_config()
  214. try:
  215. response = minio_client.get_object(
  216. config["MINIO_BUCKET"], object_name
  217. )
  218. file_data = response.read()
  219. except S3Error as e:
  220. logger.error(f"MinIO获取文件失败: {str(e)}")
  221. return jsonify(failed(f"文件获取失败: {str(e)}"))
  222. file_name = object_name.split("/")[-1]
  223. file_stream = io.BytesIO(file_data)
  224. return send_file(
  225. file_stream,
  226. as_attachment=True,
  227. download_name=file_name,
  228. mimetype="application/octet-stream",
  229. )
  230. except Exception as e:
  231. logger.error(f"文件下载失败: {str(e)}")
  232. return jsonify(failed("文件下载失败", error=str(e)))
  233. finally:
  234. if response:
  235. response.close()
  236. response.release_conn()
  237. @bp.route("/graphall", methods=["POST"])
  238. def bd_graph_all():
  239. """获取业务领域完整关系图谱"""
  240. try:
  241. if not request.json:
  242. return jsonify(failed("请求数据不能为空"))
  243. domain_id = request.json.get("id")
  244. include_meta = request.json.get("meta", True)
  245. if domain_id is None:
  246. return jsonify(failed("业务领域ID不能为空"))
  247. try:
  248. domain_id = int(domain_id)
  249. except (ValueError, TypeError):
  250. return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
  251. graph_data = business_domain_graph_all(domain_id, include_meta)
  252. return jsonify(success(graph_data))
  253. except Exception as e:
  254. logger.error(f"获取业务领域图谱失败: {str(e)}")
  255. return jsonify(failed("获取业务领域图谱失败", error=str(e)))
  256. @bp.route("/ddlparse", methods=["POST"])
  257. def bd_ddl_parse():
  258. """解析DDL语句,用于业务领域创建"""
  259. try:
  260. sql_content = ""
  261. if "file" in request.files:
  262. file = request.files["file"]
  263. if file and file.filename:
  264. if not file.filename.lower().endswith(".sql"):
  265. return jsonify(failed("只接受SQL文件"))
  266. sql_content = file.read().decode("utf-8")
  267. logger.info(f"从上传的文件中读取SQL内容,文件名: {file.filename}")
  268. elif request.is_json and request.json:
  269. sql_content = request.json.get("sql", "")
  270. if not sql_content:
  271. return jsonify(failed("SQL内容不能为空,请上传SQL文件或提供SQL内容"))
  272. parser = DDLParser()
  273. ddl_list = parser.parse_ddl(sql_content)
  274. if not ddl_list:
  275. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  276. if isinstance(ddl_list, list):
  277. table_names = []
  278. for table_item in ddl_list:
  279. if isinstance(table_item, dict) and "table_info" in table_item:
  280. table_name = table_item["table_info"].get("name_en")
  281. if table_name:
  282. table_names.append(table_name)
  283. for table_item in ddl_list:
  284. if isinstance(table_item, dict):
  285. table_item["exist"] = False
  286. if table_names:
  287. try:
  288. with neo4j_driver.get_session() as session:
  289. table_query = """
  290. UNWIND $names AS name
  291. OPTIONAL MATCH (n:BusinessDomain {name_en: name})
  292. RETURN name, n IS NOT NULL AS exists
  293. """
  294. table_results = session.run(
  295. table_query, names=table_names
  296. )
  297. exist_map = {}
  298. for record in table_results:
  299. t_name = record["name"]
  300. exists = record["exists"]
  301. exist_map[t_name] = exists
  302. for table_item in ddl_list:
  303. if (
  304. isinstance(table_item, dict)
  305. and "table_info" in table_item
  306. ):
  307. info = table_item["table_info"]
  308. t_name = info.get("name_en")
  309. if t_name and t_name in exist_map:
  310. table_item["exist"] = exist_map[t_name]
  311. except Exception as e:
  312. logger.error(f"检查业务领域存在状态失败: {str(e)}")
  313. elif isinstance(ddl_list, dict):
  314. table_names = list(ddl_list.keys())
  315. for table_name in table_names:
  316. if isinstance(ddl_list[table_name], dict):
  317. ddl_list[table_name]["exist"] = False
  318. else:
  319. logger.warning(
  320. f"表 {table_name} 的值不是字典类型: "
  321. f"{type(ddl_list[table_name])}"
  322. )
  323. if table_names:
  324. try:
  325. with neo4j_driver.get_session() as session:
  326. table_query = """
  327. UNWIND $names AS name
  328. OPTIONAL MATCH (n:BusinessDomain {name_en: name})
  329. RETURN name, n IS NOT NULL AS exists
  330. """
  331. table_results = session.run(
  332. table_query, names=table_names
  333. )
  334. for record in table_results:
  335. t_name = record["name"]
  336. exists = record["exists"]
  337. is_valid = (
  338. t_name in ddl_list
  339. and isinstance(ddl_list[t_name], dict)
  340. )
  341. if is_valid:
  342. ddl_list[t_name]["exist"] = exists
  343. except Exception as e:
  344. logger.error(f"检查业务领域存在状态失败: {str(e)}")
  345. logger.debug(
  346. f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}"
  347. )
  348. return jsonify(success(ddl_list))
  349. except Exception as e:
  350. logger.error(f"解析DDL语句失败: {str(e)}")
  351. logger.error(traceback.format_exc())
  352. return jsonify(failed("解析DDL语句失败", error=str(e)))
  353. @bp.route("/search", methods=["POST"])
  354. def bd_search():
  355. """搜索业务领域关联的元数据"""
  356. try:
  357. if not request.json:
  358. return jsonify(failed("请求数据不能为空"))
  359. page = int(request.json.get("current", 1))
  360. page_size = int(request.json.get("size", 10))
  361. domain_id = request.json.get("id")
  362. name_en_filter = request.json.get("name_en")
  363. name_zh_filter = request.json.get("name_zh")
  364. category_filter = request.json.get("category")
  365. tag_filter = request.json.get("tag")
  366. if domain_id is None:
  367. return jsonify(failed("业务领域ID不能为空"))
  368. try:
  369. domain_id = int(domain_id)
  370. except (ValueError, TypeError):
  371. return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
  372. metadata_list, total_count = business_domain_search_list(
  373. domain_id,
  374. page,
  375. page_size,
  376. name_en_filter,
  377. name_zh_filter,
  378. category_filter,
  379. tag_filter,
  380. )
  381. return jsonify(
  382. success(
  383. {
  384. "records": metadata_list,
  385. "total": total_count,
  386. "size": page_size,
  387. "current": page,
  388. }
  389. )
  390. )
  391. except Exception as e:
  392. logger.error(f"业务领域关联元数据搜索失败: {str(e)}")
  393. return jsonify(failed("业务领域关联元数据搜索失败", error=str(e)))
  394. @bp.route("/compose", methods=["POST"])
  395. def bd_compose():
  396. """从已有业务领域中组合创建新的业务领域"""
  397. try:
  398. data = request.json
  399. if not data:
  400. return jsonify(failed("请求数据不能为空"))
  401. if not data.get("name_zh"):
  402. return jsonify(failed("name_zh 为必填项"))
  403. if not data.get("id_list"):
  404. return jsonify(failed("id_list 为必填项"))
  405. result_data = business_domain_compose(data)
  406. response_data = {"business_domain": result_data}
  407. return jsonify(success(response_data))
  408. except Exception as e:
  409. logger.error(f"组合创建业务领域失败: {str(e)}")
  410. return jsonify(failed("组合创建业务领域失败", error=str(e)))
  411. @bp.route("/labellist", methods=["POST"])
  412. def bd_label_list():
  413. """获取数据标签列表(用于业务领域关联)"""
  414. try:
  415. if not request.json:
  416. return jsonify(failed("请求数据不能为空"))
  417. page = int(request.json.get("current", 1))
  418. page_size = int(request.json.get("size", 10))
  419. name_en_filter = request.json.get("name_en")
  420. name_zh_filter = request.json.get("name_zh")
  421. category_filter = request.json.get("category")
  422. group_filter = request.json.get("group")
  423. labels, total_count = business_domain_label_list(
  424. page,
  425. page_size,
  426. name_en_filter,
  427. name_zh_filter,
  428. category_filter,
  429. group_filter,
  430. )
  431. return jsonify(
  432. success(
  433. {
  434. "records": labels,
  435. "total": total_count,
  436. "size": page_size,
  437. "current": page,
  438. }
  439. )
  440. )
  441. except Exception as e:
  442. logger.error(f"获取标签列表失败: {str(e)}")
  443. return jsonify(failed("获取标签列表失败", error=str(e)))