routes.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. """
  2. Business Domain API 路由模块
  3. 提供业务领域相关的 RESTful API 接口
  4. """
  5. import io
  6. import json
  7. import logging
  8. import time
  9. import traceback
  10. import urllib.parse
  11. from flask import current_app, jsonify, request, send_file
  12. from minio import Minio
  13. from minio.error import S3Error
  14. from app.api.business_domain import bp
  15. from app.core.business_domain import (
  16. business_domain_compose,
  17. business_domain_graph_all,
  18. business_domain_label_list,
  19. business_domain_list,
  20. business_domain_search_list,
  21. delete_business_domain,
  22. get_business_domain_by_id,
  23. save_business_domain,
  24. update_business_domain,
  25. )
  26. from app.core.llm.ddl_parser import DDLParser
  27. from app.models.result import failed, success
  28. from app.services.neo4j_driver import neo4j_driver
  29. logger = logging.getLogger("app")
  30. # ----------------------- MinIO helpers -----------------------
  31. def get_minio_client():
  32. """获取 MinIO 客户端实例"""
  33. return Minio(
  34. current_app.config["MINIO_HOST"],
  35. access_key=current_app.config["MINIO_USER"],
  36. secret_key=current_app.config["MINIO_PASSWORD"],
  37. secure=current_app.config["MINIO_SECURE"],
  38. )
  39. def get_minio_config():
  40. """获取 MinIO 配置"""
  41. return {
  42. "MINIO_BUCKET": current_app.config["MINIO_BUCKET"],
  43. "PREFIX": current_app.config.get("BUSINESS_DOMAIN_PREFIX", "business_domain"),
  44. "ALLOWED_EXTENSIONS": current_app.config["ALLOWED_EXTENSIONS"],
  45. }
  46. def allowed_file(filename):
  47. """检查文件扩展名是否允许"""
  48. if "." not in filename:
  49. return False
  50. ext = filename.rsplit(".", 1)[1].lower()
  51. return ext in get_minio_config()["ALLOWED_EXTENSIONS"]
  52. # ----------------------- Business Domain APIs -----------------------
  53. @bp.route("/list", methods=["POST"])
  54. def bd_list():
  55. """获取业务领域列表"""
  56. try:
  57. if not request.json:
  58. return jsonify(failed("请求数据不能为空"))
  59. page = int(request.json.get("current", 1))
  60. page_size = int(request.json.get("size", 10))
  61. name_en_filter = request.json.get("name_en")
  62. name_zh_filter = request.json.get("name_zh")
  63. type_filter = request.json.get("type", "all")
  64. category_filter = request.json.get("category")
  65. tag_filter = request.json.get("tag")
  66. domains, total_count = business_domain_list(
  67. page,
  68. page_size,
  69. name_en_filter,
  70. name_zh_filter,
  71. type_filter,
  72. category_filter,
  73. tag_filter,
  74. )
  75. return jsonify(
  76. success(
  77. {
  78. "records": domains,
  79. "total": total_count,
  80. "size": page_size,
  81. "current": page,
  82. }
  83. )
  84. )
  85. except Exception as e:
  86. logger.error(f"获取业务领域列表失败: {str(e)}")
  87. return jsonify(failed("获取业务领域列表失败", error=str(e)))
  88. @bp.route("/detail", methods=["POST"])
  89. def bd_detail():
  90. """获取业务领域详情"""
  91. try:
  92. if not request.json:
  93. return jsonify(failed("请求数据不能为空"))
  94. domain_id = request.json.get("id")
  95. if domain_id is None:
  96. return jsonify(failed("业务领域ID不能为空"))
  97. try:
  98. domain_id = int(domain_id)
  99. except (ValueError, TypeError):
  100. return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
  101. domain_data = get_business_domain_by_id(domain_id)
  102. if not domain_data:
  103. return jsonify(failed("业务领域不存在"))
  104. return jsonify(success(domain_data))
  105. except Exception as e:
  106. logger.error(f"获取业务领域详情失败: {str(e)}")
  107. return jsonify(failed("获取业务领域详情失败", error=str(e)))
  108. @bp.route("/delete", methods=["POST"])
  109. def bd_delete():
  110. """删除业务领域"""
  111. try:
  112. if not request.json:
  113. return jsonify(failed("请求数据不能为空"))
  114. domain_id = request.json.get("id")
  115. if domain_id is None:
  116. return jsonify(failed("业务领域ID不能为空"))
  117. result = delete_business_domain(domain_id)
  118. if result:
  119. return jsonify(success({"message": "业务领域删除成功"}))
  120. return jsonify(failed("业务领域删除失败"))
  121. except Exception as e:
  122. logger.error(f"删除业务领域失败: {str(e)}")
  123. return jsonify(failed("删除业务领域失败", error=str(e)))
  124. @bp.route("/save", methods=["POST"])
  125. def bd_save():
  126. """保存业务领域(新建或更新)"""
  127. try:
  128. data = request.json
  129. if not data:
  130. return jsonify(failed("请求数据不能为空"))
  131. if not data.get("id") and (not data.get("name_zh") or not data.get("name_en")):
  132. return jsonify(failed("新建时 name_zh 和 name_en 为必填项"))
  133. saved_data = save_business_domain(data)
  134. return jsonify(success(saved_data))
  135. except Exception as e:
  136. logger.error(f"保存业务领域失败: {str(e)}")
  137. return jsonify(failed("保存业务领域失败", error=str(e)))
  138. @bp.route("/update", methods=["POST"])
  139. def bd_update():
  140. """更新业务领域"""
  141. try:
  142. data = request.json
  143. if not data or "id" not in data:
  144. return jsonify(failed("参数不完整"))
  145. updated_data = update_business_domain(data)
  146. return jsonify(success(updated_data))
  147. except Exception as e:
  148. logger.error(f"更新业务领域失败: {str(e)}")
  149. return jsonify(failed("更新业务领域失败", error=str(e)))
  150. @bp.route("/upload", methods=["POST"])
  151. def bd_upload():
  152. """上传业务领域相关文件"""
  153. response = None
  154. try:
  155. if "file" not in request.files:
  156. return jsonify(failed("没有找到上传的文件"))
  157. file = request.files["file"]
  158. if file.filename == "":
  159. return jsonify(failed("未选择文件"))
  160. if not allowed_file(file.filename):
  161. return jsonify(failed("不支持的文件类型"))
  162. minio_client = get_minio_client()
  163. config = get_minio_config()
  164. file_content = file.read()
  165. file_size = len(file_content)
  166. filename = file.filename or ""
  167. file_type = filename.rsplit(".", 1)[1].lower()
  168. filename_without_ext = filename.rsplit(".", 1)[0]
  169. timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
  170. object_name = (
  171. f"{config['PREFIX']}/{filename_without_ext}_{timestamp}.{file_type}"
  172. )
  173. minio_client.put_object(
  174. config["MINIO_BUCKET"],
  175. object_name,
  176. io.BytesIO(file_content),
  177. file_size,
  178. content_type=f"application/{file_type}",
  179. )
  180. logger.info(f"文件上传成功: {object_name}, 大小: {file_size}")
  181. return jsonify(
  182. success(
  183. {
  184. "filename": file.filename,
  185. "size": file_size,
  186. "type": file_type,
  187. "url": object_name,
  188. }
  189. )
  190. )
  191. except Exception as e:
  192. logger.error(f"文件上传失败: {str(e)}")
  193. return jsonify(failed("文件上传失败", error=str(e)))
  194. finally:
  195. if response:
  196. response.close()
  197. response.release_conn()
  198. @bp.route("/download", methods=["GET"])
  199. def bd_download():
  200. """下载业务领域相关文件"""
  201. response = None
  202. try:
  203. object_name = request.args.get("url")
  204. if not object_name:
  205. return jsonify(failed("文件路径不能为空"))
  206. object_name = urllib.parse.unquote(object_name)
  207. logger.info(f"下载文件请求: {object_name}")
  208. minio_client = get_minio_client()
  209. config = get_minio_config()
  210. try:
  211. response = minio_client.get_object(config["MINIO_BUCKET"], object_name)
  212. file_data = response.read()
  213. except S3Error as e:
  214. logger.error(f"MinIO获取文件失败: {str(e)}")
  215. return jsonify(failed(f"文件获取失败: {str(e)}"))
  216. file_name = object_name.split("/")[-1]
  217. file_stream = io.BytesIO(file_data)
  218. return send_file(
  219. file_stream,
  220. as_attachment=True,
  221. download_name=file_name,
  222. mimetype="application/octet-stream",
  223. )
  224. except Exception as e:
  225. logger.error(f"文件下载失败: {str(e)}")
  226. return jsonify(failed("文件下载失败", error=str(e)))
  227. finally:
  228. if response:
  229. response.close()
  230. response.release_conn()
  231. @bp.route("/graphall", methods=["POST"])
  232. def bd_graph_all():
  233. """获取业务领域完整关系图谱"""
  234. try:
  235. if not request.json:
  236. return jsonify(failed("请求数据不能为空"))
  237. domain_id = request.json.get("id")
  238. include_meta = request.json.get("meta", True)
  239. if domain_id is None:
  240. return jsonify(failed("业务领域ID不能为空"))
  241. try:
  242. domain_id = int(domain_id)
  243. except (ValueError, TypeError):
  244. return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
  245. graph_data = business_domain_graph_all(domain_id, include_meta)
  246. return jsonify(success(graph_data))
  247. except Exception as e:
  248. logger.error(f"获取业务领域图谱失败: {str(e)}")
  249. return jsonify(failed("获取业务领域图谱失败", error=str(e)))
  250. def _get_file_extension(filename: str) -> str:
  251. """获取文件扩展名(小写)"""
  252. if "." not in filename:
  253. return ""
  254. return filename.rsplit(".", 1)[1].lower()
  255. def _check_table_existence(table_list: list) -> list:
  256. """
  257. 检查表在 Neo4j 中的存在状态
  258. Args:
  259. table_list: 表信息列表
  260. Returns:
  261. 更新了 exist 字段的表信息列表
  262. """
  263. table_names = []
  264. for table_item in table_list:
  265. if isinstance(table_item, dict) and "table_info" in table_item:
  266. table_name = table_item["table_info"].get("name_en")
  267. if table_name:
  268. table_names.append(table_name)
  269. # 初始化 exist 字段
  270. for table_item in table_list:
  271. if isinstance(table_item, dict):
  272. table_item["exist"] = False
  273. if table_names:
  274. try:
  275. with neo4j_driver.get_session() as session:
  276. table_query = """
  277. UNWIND $names AS name
  278. OPTIONAL MATCH (n:BusinessDomain {name_en: name})
  279. RETURN name, n IS NOT NULL AS exists
  280. """
  281. table_results = session.run(table_query, names=table_names)
  282. exist_map = {}
  283. for record in table_results:
  284. t_name = record["name"]
  285. exists = record["exists"]
  286. exist_map[t_name] = exists
  287. for table_item in table_list:
  288. if isinstance(table_item, dict) and "table_info" in table_item:
  289. info = table_item["table_info"]
  290. t_name = info.get("name_en")
  291. if t_name and t_name in exist_map:
  292. table_item["exist"] = exist_map[t_name]
  293. except Exception as e:
  294. logger.error(f"检查业务领域存在状态失败: {str(e)}")
  295. return table_list
  296. # 支持的文件类型
  297. ALLOWED_DDL_EXTENSIONS = {"sql", "xlsx", "xls", "docx", "doc", "pdf"}
  298. @bp.route("/ddlparse", methods=["POST"])
  299. def bd_ddl_parse():
  300. """
  301. 解析文件内容,提取数据表定义信息
  302. 支持的文件类型:
  303. - SQL文件 (.sql): 解析DDL建表语句
  304. - Excel文件 (.xlsx, .xls): 解析表格中的表结构定义
  305. - Word文件 (.docx, .doc): 解析文档中的表结构定义
  306. - PDF文件 (.pdf): 解析PDF中的表结构定义
  307. 返回:
  308. JSON数组格式的表结构信息
  309. """
  310. try:
  311. if "file" not in request.files:
  312. return jsonify(failed("没有找到上传的文件,请上传一个文件"))
  313. file = request.files["file"]
  314. if not file or not file.filename:
  315. return jsonify(failed("未选择文件"))
  316. filename = file.filename
  317. file_ext = _get_file_extension(filename)
  318. if file_ext not in ALLOWED_DDL_EXTENSIONS:
  319. return jsonify(
  320. failed(
  321. f"不支持的文件类型: .{file_ext},"
  322. f"支持的类型: {', '.join('.' + ext for ext in ALLOWED_DDL_EXTENSIONS)}"
  323. )
  324. )
  325. file_content = file.read()
  326. logger.info(f"接收到文件上传,文件名: {filename}, 类型: {file_ext}")
  327. parser = DDLParser()
  328. ddl_list = []
  329. # 根据文件类型选择不同的解析方法
  330. if file_ext == "sql":
  331. # SQL 文件直接解析 DDL
  332. sql_content = file_content.decode("utf-8")
  333. ddl_list = parser.parse_ddl(sql_content)
  334. elif file_ext in {"xlsx", "xls"}:
  335. # Excel 文件解析
  336. ddl_list = parser.parse_excel_content(file_content)
  337. elif file_ext in {"docx", "doc"}:
  338. # Word 文件解析
  339. if file_ext == "doc":
  340. return jsonify(
  341. failed("暂不支持 .doc 格式,请转换为 .docx 格式后重新上传")
  342. )
  343. ddl_list = parser.parse_word_content(file_content)
  344. elif file_ext == "pdf":
  345. # PDF 文件解析
  346. ddl_list = parser.parse_pdf_content(file_content)
  347. # 验证解析结果
  348. if not ddl_list:
  349. return jsonify(failed("未找到有效的数据表定义信息"))
  350. # 确保结果是列表格式
  351. if isinstance(ddl_list, dict):
  352. if "table_info" in ddl_list:
  353. ddl_list = [ddl_list]
  354. else:
  355. # 兼容旧格式(字典形式的多表)
  356. table_names = list(ddl_list.keys())
  357. converted_list = []
  358. for table_name in table_names:
  359. table_data = ddl_list[table_name]
  360. if isinstance(table_data, dict):
  361. table_data["exist"] = False
  362. converted_list.append(table_data)
  363. ddl_list = converted_list
  364. # 检查表在 Neo4j 中的存在状态
  365. ddl_list = _check_table_existence(ddl_list)
  366. logger.debug(f"识别到的数据表: {json.dumps(ddl_list, ensure_ascii=False)}")
  367. return jsonify(success(ddl_list))
  368. except ValueError as e:
  369. logger.error(f"文件解析失败: {str(e)}")
  370. return jsonify(failed(str(e)))
  371. except Exception as e:
  372. logger.error(f"解析文件失败: {str(e)}")
  373. logger.error(traceback.format_exc())
  374. return jsonify(failed("解析文件失败", error=str(e)))
  375. @bp.route("/search", methods=["POST"])
  376. def bd_search():
  377. """搜索业务领域关联的元数据"""
  378. try:
  379. if not request.json:
  380. return jsonify(failed("请求数据不能为空"))
  381. page = int(request.json.get("current", 1))
  382. page_size = int(request.json.get("size", 10))
  383. domain_id = request.json.get("id")
  384. name_en_filter = request.json.get("name_en")
  385. name_zh_filter = request.json.get("name_zh")
  386. category_filter = request.json.get("category")
  387. tag_filter = request.json.get("tag")
  388. if domain_id is None:
  389. return jsonify(failed("业务领域ID不能为空"))
  390. try:
  391. domain_id = int(domain_id)
  392. except (ValueError, TypeError):
  393. return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
  394. metadata_list, total_count = business_domain_search_list(
  395. domain_id,
  396. page,
  397. page_size,
  398. name_en_filter,
  399. name_zh_filter,
  400. category_filter,
  401. tag_filter,
  402. )
  403. return jsonify(
  404. success(
  405. {
  406. "records": metadata_list,
  407. "total": total_count,
  408. "size": page_size,
  409. "current": page,
  410. }
  411. )
  412. )
  413. except Exception as e:
  414. logger.error(f"业务领域关联元数据搜索失败: {str(e)}")
  415. return jsonify(failed("业务领域关联元数据搜索失败", error=str(e)))
  416. @bp.route("/compose", methods=["POST"])
  417. def bd_compose():
  418. """从已有业务领域中组合创建新的业务领域"""
  419. try:
  420. data = request.json
  421. if not data:
  422. return jsonify(failed("请求数据不能为空"))
  423. if not data.get("name_zh"):
  424. return jsonify(failed("name_zh 为必填项"))
  425. if not data.get("id_list"):
  426. return jsonify(failed("id_list 为必填项"))
  427. result_data = business_domain_compose(data)
  428. response_data = {"business_domain": result_data}
  429. return jsonify(success(response_data))
  430. except Exception as e:
  431. logger.error(f"组合创建业务领域失败: {str(e)}")
  432. return jsonify(failed("组合创建业务领域失败", error=str(e)))
  433. @bp.route("/labellist", methods=["POST"])
  434. def bd_label_list():
  435. """获取数据标签列表(用于业务领域关联)"""
  436. try:
  437. if not request.json:
  438. return jsonify(failed("请求数据不能为空"))
  439. page = int(request.json.get("current", 1))
  440. page_size = int(request.json.get("size", 10))
  441. name_en_filter = request.json.get("name_en")
  442. name_zh_filter = request.json.get("name_zh")
  443. category_filter = request.json.get("category")
  444. group_filter = request.json.get("group")
  445. labels, total_count = business_domain_label_list(
  446. page,
  447. page_size,
  448. name_en_filter,
  449. name_zh_filter,
  450. category_filter,
  451. group_filter,
  452. )
  453. return jsonify(
  454. success(
  455. {
  456. "records": labels,
  457. "total": total_count,
  458. "size": page_size,
  459. "current": page,
  460. }
  461. )
  462. )
  463. except Exception as e:
  464. logger.error(f"获取标签列表失败: {str(e)}")
  465. return jsonify(failed("获取标签列表失败", error=str(e)))