routes.py 24 KB


  1. """
  2. Business Domain API 路由模块
  3. 提供业务领域相关的 RESTful API 接口
  4. """
  5. import io
  6. import json
  7. import time
  8. import logging
  9. import traceback
  10. import urllib.parse
  11. from flask import request, jsonify, current_app, send_file
  12. from minio import Minio
  13. from minio.error import S3Error
  14. from app.api.business_domain import bp
  15. from app.models.result import success, failed
  16. from app.services.neo4j_driver import neo4j_driver
  17. from app.core.llm.ddl_parser import DDLParser
  18. from app.core.business_domain import (
  19. business_domain_list,
  20. get_business_domain_by_id,
  21. delete_business_domain,
  22. update_business_domain,
  23. save_business_domain,
  24. business_domain_graph_all,
  25. business_domain_search_list,
  26. business_domain_compose,
  27. business_domain_label_list
  28. )
  29. logger = logging.getLogger("app")
  30. def get_minio_client():
  31. """获取 MinIO 客户端实例"""
  32. return Minio(
  33. current_app.config['MINIO_HOST'],
  34. access_key=current_app.config['MINIO_USER'],
  35. secret_key=current_app.config['MINIO_PASSWORD'],
  36. secure=current_app.config['MINIO_SECURE']
  37. )
  38. def get_minio_config():
  39. """获取 MinIO 配置"""
  40. return {
  41. 'MINIO_BUCKET': current_app.config['MINIO_BUCKET'],
  42. 'PREFIX': current_app.config.get(
  43. 'BUSINESS_DOMAIN_PREFIX', 'business_domain'
  44. ),
  45. 'ALLOWED_EXTENSIONS': current_app.config['ALLOWED_EXTENSIONS']
  46. }
  47. def allowed_file(filename):
  48. """检查文件扩展名是否允许"""
  49. if '.' not in filename:
  50. return False
  51. ext = filename.rsplit('.', 1)[1].lower()
  52. return ext in get_minio_config()['ALLOWED_EXTENSIONS']
  53. @bp.route('/list', methods=['POST'])
  54. def bd_list():
  55. """
  56. 获取业务领域列表
  57. 请求参数 (JSON):
  58. - current: 当前页码,默认1
  59. - size: 每页大小,默认10
  60. - name_en: 英文名称过滤条件(可选)
  61. - name_zh: 中文名称过滤条件(可选)
  62. - type: 类型过滤条件,默认'all'表示不过滤(可选)
  63. - category: 分类过滤条件(可选)
  64. - tag: 标签过滤条件(可选)
  65. 返回:
  66. - success: 是否成功
  67. - message: 消息
  68. - data:
  69. - records: 业务领域列表
  70. - total: 总数量
  71. - size: 每页大小
  72. - current: 当前页码
  73. """
  74. try:
  75. # 获取分页和筛选参数
  76. if not request.json:
  77. return jsonify(failed('请求数据不能为空'))
  78. page = int(request.json.get('current', 1))
  79. page_size = int(request.json.get('size', 10))
  80. name_en_filter = request.json.get('name_en')
  81. name_zh_filter = request.json.get('name_zh')
  82. type_filter = request.json.get('type', 'all')
  83. category_filter = request.json.get('category')
  84. tag_filter = request.json.get('tag')
  85. # 调用业务逻辑查询业务领域列表
  86. domains, total_count = business_domain_list(
  87. page,
  88. page_size,
  89. name_en_filter,
  90. name_zh_filter,
  91. type_filter,
  92. category_filter,
  93. tag_filter
  94. )
  95. # 返回结果
  96. return jsonify(success({
  97. "records": domains,
  98. "total": total_count,
  99. "size": page_size,
  100. "current": page
  101. }))
  102. except Exception as e:
  103. logger.error(f"获取业务领域列表失败: {str(e)}")
  104. return jsonify(failed(str(e)))
  105. @bp.route('/detail', methods=['POST'])
  106. def bd_detail():
  107. """
  108. 获取业务领域详情
  109. 请求参数 (JSON):
  110. - id: 业务领域节点ID(必填)
  111. 返回:
  112. - success: 是否成功
  113. - message: 消息
  114. - data: 业务领域详情
  115. """
  116. try:
  117. # 获取参数
  118. if not request.json:
  119. return jsonify(failed('请求数据不能为空'))
  120. domain_id = request.json.get('id')
  121. if domain_id is None:
  122. return jsonify(failed("业务领域ID不能为空"))
  123. # 确保传入的ID为整数
  124. try:
  125. domain_id = int(domain_id)
  126. except (ValueError, TypeError):
  127. return jsonify(failed(f"业务领域ID必须为整数, 收到的是: {domain_id}"))
  128. # 调用业务逻辑查询业务领域详情
  129. domain_data = get_business_domain_by_id(domain_id)
  130. if not domain_data:
  131. return jsonify(failed("业务领域不存在"))
  132. return jsonify(success(domain_data))
  133. except Exception as e:
  134. logger.error(f"获取业务领域详情失败: {str(e)}")
  135. return jsonify(failed(str(e)))
  136. @bp.route('/delete', methods=['POST'])
  137. def bd_delete():
  138. """
  139. 删除业务领域
  140. 请求参数 (JSON):
  141. - id: 业务领域节点ID(必填)
  142. 返回:
  143. - success: 是否成功
  144. - message: 消息
  145. - data: 删除结果
  146. """
  147. try:
  148. # 获取参数
  149. if not request.json:
  150. return jsonify(failed("请求数据不能为空"))
  151. domain_id = request.json.get('id')
  152. if domain_id is None:
  153. return jsonify(failed("业务领域ID不能为空"))
  154. # 调用业务逻辑删除业务领域
  155. result = delete_business_domain(domain_id)
  156. if result:
  157. return jsonify(success({"message": "业务领域删除成功"}))
  158. else:
  159. return jsonify(failed("业务领域删除失败"))
  160. except Exception as e:
  161. logger.error(f"删除业务领域失败: {str(e)}")
  162. return jsonify(failed(str(e)))
  163. @bp.route('/save', methods=['POST'])
  164. def bd_save():
  165. """
  166. 保存业务领域(新建或更新)
  167. 请求参数 (JSON):
  168. - id: 业务领域节点ID(可选,有则更新,无则新建)
  169. - name_zh: 中文名称(新建时必填)
  170. - name_en: 英文名称(新建时必填)
  171. - describe: 描述(可选)
  172. - type: 类型(可选)
  173. - category: 分类(可选)
  174. - tag: 标签ID(可选)
  175. - data_source: 数据源ID(可选)
  176. - 其他属性字段...
  177. 返回:
  178. - success: 是否成功
  179. - message: 消息
  180. - data: 保存后的业务领域数据
  181. """
  182. try:
  183. # 获取保存数据
  184. data = request.json
  185. if not data:
  186. return jsonify(failed("请求数据不能为空"))
  187. # 新建时校验必填字段
  188. if not data.get("id"):
  189. if not data.get("name_zh") or not data.get("name_en"):
  190. return jsonify(failed("新建时 name_zh 和 name_en 为必填项"))
  191. # 调用业务逻辑保存业务领域
  192. saved_data = save_business_domain(data)
  193. return jsonify(success(saved_data))
  194. except Exception as e:
  195. logger.error(f"保存业务领域失败: {str(e)}")
  196. return jsonify(failed(str(e)))
  197. @bp.route('/update', methods=['POST'])
  198. def bd_update():
  199. """
  200. 更新业务领域
  201. 请求参数 (JSON):
  202. - id: 业务领域节点ID(必填)
  203. - name_zh: 中文名称(可选)
  204. - name_en: 英文名称(可选)
  205. - describe: 描述(可选)
  206. - tag: 标签ID(可选)
  207. - data_source: 数据源ID(可选)
  208. - 其他属性字段...
  209. 返回:
  210. - success: 是否成功
  211. - message: 消息
  212. - data: 更新后的业务领域数据
  213. """
  214. try:
  215. # 获取更新数据
  216. data = request.json
  217. if not data or "id" not in data:
  218. return jsonify(failed("参数不完整"))
  219. # 调用业务逻辑更新业务领域
  220. updated_data = update_business_domain(data)
  221. return jsonify(success(updated_data))
  222. except Exception as e:
  223. logger.error(f"更新业务领域失败: {str(e)}")
  224. return jsonify(failed(str(e)))
  225. @bp.route('/upload', methods=['POST'])
  226. def bd_upload():
  227. """
  228. 上传业务领域相关文件
  229. 请求参数 (multipart/form-data):
  230. - file: 上传的文件(必填)
  231. 返回:
  232. - success: 是否成功
  233. - message: 消息
  234. - data:
  235. - filename: 原始文件名
  236. - size: 文件大小(字节)
  237. - type: 文件类型
  238. - url: 文件存储路径
  239. """
  240. try:
  241. # 检查请求中是否有文件
  242. if 'file' not in request.files:
  243. return jsonify(failed("没有找到上传的文件"))
  244. file = request.files['file']
  245. # 检查文件名
  246. if file.filename == '':
  247. return jsonify(failed("未选择文件"))
  248. # 检查文件类型
  249. if not allowed_file(file.filename):
  250. return jsonify(failed("不支持的文件类型"))
  251. # 获取 MinIO 配置
  252. minio_client = get_minio_client()
  253. config = get_minio_config()
  254. # 读取文件内容
  255. file_content = file.read()
  256. file_size = len(file_content)
  257. filename = file.filename or ''
  258. file_type = filename.rsplit('.', 1)[1].lower()
  259. # 提取文件名(不包含扩展名)
  260. filename_without_ext = filename.rsplit('.', 1)[0]
  261. # 生成紧凑的时间戳 (yyyyMMddHHmmss)
  262. timestamp = time.strftime("%Y%m%d%H%M%S", time.localtime())
  263. # 生成唯一文件名
  264. prefix = config['PREFIX']
  265. object_name = (
  266. f"{prefix}/{filename_without_ext}_{timestamp}.{file_type}"
  267. )
  268. # 上传文件到 MinIO
  269. minio_client.put_object(
  270. config['MINIO_BUCKET'],
  271. object_name,
  272. io.BytesIO(file_content),
  273. file_size,
  274. content_type=f"application/{file_type}"
  275. )
  276. logger.info(f"文件上传成功: {object_name}, 大小: {file_size}")
  277. # 返回结果
  278. return jsonify(success({
  279. "filename": file.filename,
  280. "size": file_size,
  281. "type": file_type,
  282. "url": object_name
  283. }))
  284. except Exception as e:
  285. logger.error(f"文件上传失败: {str(e)}")
  286. return jsonify(failed(str(e)))
  287. @bp.route('/download', methods=['GET'])
  288. def bd_download():
  289. """
  290. 下载业务领域相关文件
  291. 请求参数 (URL Query):
  292. - url: 文件存储路径(必填)
  293. 返回:
  294. - 文件流(作为附件下载)
  295. """
  296. response = None
  297. try:
  298. # 获取文件路径参数
  299. object_name = request.args.get('url')
  300. if not object_name:
  301. return jsonify(failed("文件路径不能为空"))
  302. # URL解码,处理特殊字符
  303. object_name = urllib.parse.unquote(object_name)
  304. # 记录下载请求信息,便于调试
  305. logger.info(f"下载文件请求: {object_name}")
  306. # 获取 MinIO 配置
  307. minio_client = get_minio_client()
  308. config = get_minio_config()
  309. # 获取文件
  310. try:
  311. response = minio_client.get_object(
  312. config['MINIO_BUCKET'], object_name
  313. )
  314. file_data = response.read()
  315. except S3Error as e:
  316. logger.error(f"MinIO获取文件失败: {str(e)}")
  317. return jsonify(failed(f"文件获取失败: {str(e)}"))
  318. # 获取文件名
  319. file_name = object_name.split('/')[-1]
  320. # 直接从内存返回文件,不创建临时文件
  321. file_stream = io.BytesIO(file_data)
  322. # 返回文件
  323. return send_file(
  324. file_stream,
  325. as_attachment=True,
  326. download_name=file_name,
  327. mimetype="application/octet-stream"
  328. )
  329. except Exception as e:
  330. logger.error(f"文件下载失败: {str(e)}")
  331. return jsonify(failed(str(e)))
  332. finally:
  333. if response:
  334. response.close()
  335. response.release_conn()
  336. @bp.route('/graphall', methods=['POST'])
  337. def bd_graph_all():
  338. """
  339. 获取业务领域完整关系图谱
  340. 请求参数 (JSON):
  341. - id: 业务领域节点ID(必填)
  342. - meta: 是否包含元数据节点,默认True(可选)
  343. 返回:
  344. - success: 是否成功
  345. - message: 消息
  346. - data:
  347. - nodes: 节点列表
  348. - lines: 关系列表
  349. """
  350. try:
  351. # 获取参数
  352. if not request.json:
  353. return jsonify(failed('请求数据不能为空'))
  354. domain_id = request.json.get('id')
  355. include_meta = request.json.get('meta', True)
  356. if domain_id is None:
  357. return jsonify(failed("业务领域ID不能为空"))
  358. # 确保传入的ID为整数
  359. try:
  360. domain_id = int(domain_id)
  361. except (ValueError, TypeError):
  362. return jsonify(failed(
  363. f"业务领域ID必须为整数, 收到的是: {domain_id}"
  364. ))
  365. # 调用业务逻辑获取完整图谱
  366. graph_data = business_domain_graph_all(domain_id, include_meta)
  367. return jsonify(success(graph_data))
  368. except Exception as e:
  369. logger.error(f"获取业务领域图谱失败: {str(e)}")
  370. return jsonify(failed(str(e)))
  371. @bp.route('/ddlparse', methods=['POST'])
  372. def bd_ddl_parse():
  373. """
  374. 解析DDL语句,用于业务领域创建
  375. 请求参数:
  376. - file: SQL文件(multipart/form-data,可选)
  377. - sql: SQL内容(JSON,可选)
  378. 至少提供其中一种方式
  379. 返回:
  380. - success: 是否成功
  381. - message: 消息
  382. - data: 解析后的DDL列表,包含表信息和字段信息
  383. """
  384. try:
  385. # 获取参数 - 支持两种方式:上传文件或JSON
  386. sql_content = ''
  387. # 检查是否有文件上传
  388. if 'file' in request.files:
  389. file = request.files['file']
  390. # 检查文件是否存在且文件名不为空
  391. if file and file.filename:
  392. # 检查是否是SQL文件
  393. if not file.filename.lower().endswith('.sql'):
  394. return jsonify(failed("只接受SQL文件"))
  395. # 读取文件内容
  396. sql_content = file.read().decode('utf-8')
  397. logger.info(
  398. f"从上传的文件中读取SQL内容,文件名: {file.filename}"
  399. )
  400. # 如果没有文件上传,检查是否有JSON输入
  401. elif request.is_json and request.json:
  402. sql_content = request.json.get('sql', '')
  403. # 如果两种方式都没有提供SQL内容,则返回错误
  404. if not sql_content:
  405. return jsonify(failed(
  406. "SQL内容不能为空,请上传SQL文件或提供SQL内容"
  407. ))
  408. parser = DDLParser()
  409. # 提取创建表的DDL语句
  410. ddl_list = parser.parse_ddl(sql_content)
  411. if not ddl_list:
  412. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  413. # 处理表的存在状态
  414. if isinstance(ddl_list, list):
  415. # 新格式:数组格式
  416. # 获取所有表名
  417. table_names = []
  418. for table_item in ddl_list:
  419. if isinstance(table_item, dict) and 'table_info' in table_item:
  420. table_name = table_item['table_info'].get('name_en')
  421. if table_name:
  422. table_names.append(table_name)
  423. # 首先为所有表设置默认的exist状态
  424. for table_item in ddl_list:
  425. if isinstance(table_item, dict):
  426. table_item["exist"] = False
  427. if table_names:
  428. try:
  429. # 查询业务领域是否存在
  430. with neo4j_driver.get_session() as session:
  431. table_query = """
  432. UNWIND $names AS name
  433. OPTIONAL MATCH (n:BusinessDomain {name_en: name})
  434. RETURN name, n IS NOT NULL AS exists
  435. """
  436. table_results = session.run(
  437. table_query, names=table_names
  438. )
  439. # 创建存在状态映射
  440. exist_map = {}
  441. for record in table_results:
  442. table_name = record["name"]
  443. exists = record["exists"]
  444. exist_map[table_name] = exists
  445. # 更新存在的表的状态
  446. for table_item in ddl_list:
  447. if (isinstance(table_item, dict)
  448. and 'table_info' in table_item):
  449. info = table_item['table_info']
  450. t_name = info.get('name_en')
  451. if t_name and t_name in exist_map:
  452. table_item["exist"] = exist_map[t_name]
  453. except Exception as e:
  454. logger.error(f"检查业务领域存在状态失败: {str(e)}")
  455. # 如果查询失败,所有表保持默认的False状态
  456. elif isinstance(ddl_list, dict):
  457. # 兼容旧格式:字典格式(以表名为key)
  458. table_names = list(ddl_list.keys())
  459. # 首先为所有表设置默认的exist状态
  460. for table_name in table_names:
  461. if isinstance(ddl_list[table_name], dict):
  462. ddl_list[table_name]["exist"] = False
  463. else:
  464. logger.warning(
  465. f"表 {table_name} 的值不是字典类型: "
  466. f"{type(ddl_list[table_name])}"
  467. )
  468. if table_names:
  469. try:
  470. # 查询业务领域是否存在
  471. with neo4j_driver.get_session() as session:
  472. table_query = """
  473. UNWIND $names AS name
  474. OPTIONAL MATCH (n:BusinessDomain {name_en: name})
  475. RETURN name, n IS NOT NULL AS exists
  476. """
  477. table_results = session.run(
  478. table_query, names=table_names
  479. )
  480. # 更新存在的表的状态
  481. for record in table_results:
  482. table_name = record["name"]
  483. exists = record["exists"]
  484. is_valid = (
  485. table_name in ddl_list
  486. and isinstance(ddl_list[table_name], dict)
  487. )
  488. if is_valid:
  489. ddl_list[table_name]["exist"] = exists
  490. except Exception as e:
  491. logger.error(f"检查业务领域存在状态失败: {str(e)}")
  492. # 如果查询失败,所有表保持默认的False状态
  493. logger.debug(
  494. f"识别到的DDL语句: {json.dumps(ddl_list, ensure_ascii=False)}"
  495. )
  496. return jsonify(success(ddl_list))
  497. except Exception as e:
  498. logger.error(f"解析DDL语句失败: {str(e)}")
  499. logger.error(traceback.format_exc())
  500. return jsonify(failed(str(e)))
  501. @bp.route('/search', methods=['POST'])
  502. def bd_search():
  503. """
  504. 搜索业务领域关联的元数据
  505. 请求参数 (JSON):
  506. - id: 业务领域节点ID(必填)
  507. - current: 当前页码,默认1
  508. - size: 每页大小,默认10
  509. - name_en: 英文名称过滤条件(可选)
  510. - name_zh: 中文名称过滤条件(可选)
  511. - category: 分类过滤条件(可选)
  512. - tag: 标签过滤条件(可选)
  513. 返回:
  514. - success: 是否成功
  515. - message: 消息
  516. - data:
  517. - records: 元数据列表
  518. - total: 总数量
  519. - size: 每页大小
  520. - current: 当前页码
  521. """
  522. try:
  523. # 获取分页和筛选参数
  524. if not request.json:
  525. return jsonify(failed('请求数据不能为空'))
  526. page = int(request.json.get('current', 1))
  527. page_size = int(request.json.get('size', 10))
  528. domain_id = request.json.get('id')
  529. name_en_filter = request.json.get('name_en')
  530. name_zh_filter = request.json.get('name_zh')
  531. category_filter = request.json.get('category')
  532. tag_filter = request.json.get('tag')
  533. if domain_id is None:
  534. return jsonify(failed("业务领域ID不能为空"))
  535. # 确保传入的ID为整数
  536. try:
  537. domain_id = int(domain_id)
  538. except (ValueError, TypeError):
  539. return jsonify(failed(
  540. f"业务领域ID必须为整数, 收到的是: {domain_id}"
  541. ))
  542. # 记录请求信息
  543. logger.info(f"获取业务领域关联元数据请求,ID: {domain_id}")
  544. # 调用业务逻辑查询关联元数据
  545. metadata_list, total_count = business_domain_search_list(
  546. domain_id,
  547. page,
  548. page_size,
  549. name_en_filter,
  550. name_zh_filter,
  551. category_filter,
  552. tag_filter
  553. )
  554. # 返回结果
  555. return jsonify(success({
  556. "records": metadata_list,
  557. "total": total_count,
  558. "size": page_size,
  559. "current": page
  560. }))
  561. except Exception as e:
  562. logger.error(f"业务领域关联元数据搜索失败: {str(e)}")
  563. return jsonify(failed(str(e)))
  564. @bp.route('/compose', methods=['POST'])
  565. def bd_compose():
  566. """
  567. 从已有业务领域中组合创建新的业务领域
  568. 请求参数 (JSON):
  569. - name_zh: 中文名称(必填)
  570. - name_en: 英文名称(可选,不提供则自动翻译)
  571. - id_list: 关联的业务领域和元数据列表(必填)
  572. 格式: [{"domain_id": 123, "metaData": [{"id": 456}, ...]}]
  573. - describe: 描述(可选)
  574. - type: 类型(可选)
  575. - category: 分类(可选)
  576. - tag: 标签ID(可选)
  577. - data_source: 数据源ID(可选)
  578. 返回:
  579. - success: 是否成功
  580. - message: 消息
  581. - data: 创建后的业务领域数据
  582. """
  583. try:
  584. # 获取请求数据
  585. data = request.json
  586. if not data:
  587. return jsonify(failed("请求数据不能为空"))
  588. # 校验必填字段
  589. if not data.get("name_zh"):
  590. return jsonify(failed("name_zh 为必填项"))
  591. if not data.get("id_list"):
  592. return jsonify(failed("id_list 为必填项"))
  593. # 调用业务逻辑组合创建业务领域
  594. result_data = business_domain_compose(data)
  595. # 构建响应数据
  596. response_data = {
  597. "business_domain": result_data
  598. }
  599. return jsonify(success(response_data))
  600. except Exception as e:
  601. logger.error(f"组合创建业务领域失败: {str(e)}")
  602. return jsonify(failed(str(e)))
  603. @bp.route('/labellist', methods=['POST'])
  604. def bd_label_list():
  605. """
  606. 获取数据标签列表(用于业务领域关联)
  607. 请求参数 (JSON):
  608. - current: 当前页码,默认1
  609. - size: 每页大小,默认10
  610. - name_en: 英文名称过滤条件(可选)
  611. - name_zh: 中文名称过滤条件(可选)
  612. - category: 分类过滤条件(可选)
  613. - group: 分组过滤条件(可选)
  614. 返回:
  615. - success: 是否成功
  616. - message: 消息
  617. - data:
  618. - records: 标签列表
  619. - total: 总数量
  620. - size: 每页大小
  621. - current: 当前页码
  622. """
  623. try:
  624. # 获取分页和筛选参数
  625. if not request.json:
  626. return jsonify(failed('请求数据不能为空'))
  627. page = int(request.json.get('current', 1))
  628. page_size = int(request.json.get('size', 10))
  629. name_en_filter = request.json.get('name_en')
  630. name_zh_filter = request.json.get('name_zh')
  631. category_filter = request.json.get('category')
  632. group_filter = request.json.get('group')
  633. # 调用业务逻辑查询标签列表
  634. labels, total_count = business_domain_label_list(
  635. page,
  636. page_size,
  637. name_en_filter,
  638. name_zh_filter,
  639. category_filter,
  640. group_filter
  641. )
  642. # 返回结果
  643. return jsonify(success({
  644. "records": labels,
  645. "total": total_count,
  646. "size": page_size,
  647. "current": page
  648. }))
  649. except Exception as e:
  650. logger.error(f"获取标签列表失败: {str(e)}")
  651. return jsonify(failed(str(e)))