routes.py 24 KB


  1. from io import BytesIO, StringIO
  2. import os
  3. import pandas as pd
  4. from flask import request, jsonify, send_file, current_app
  5. from app.api.data_resource import bp
  6. from app.models.result import success, failed
  7. import logging
  8. import json
  9. import re
  10. from minio import Minio
  11. from app.core.graph.graph_operations import MyEncoder
  12. from app.services.neo4j_driver import neo4j_driver
  13. from app.core.data_resource.resource import (
  14. resource_list,
  15. handle_node,
  16. resource_kinship_graph,
  17. resource_impact_all_graph,
  18. model_resource_list,
  19. select_create_ddl,
  20. data_resource_edit,
  21. handle_id_resource,
  22. id_data_search_list,
  23. table_sql,
  24. select_sql,
  25. id_resource_graph,
  26. status_query
  27. )
  28. from app.core.meta_data import (
  29. translate_and_parse,
  30. infer_column_type,
  31. text_resource_solve,
  32. get_file_content,
  33. get_formatted_time
  34. )
  35. import traceback
  36. from app.core.system.auth import require_auth
  37. from app.core.llm.ddl_parser import DDLParser
  38. logger = logging.getLogger("app")
  39. def get_minio_client():
  40. """获取 MinIO 客户端实例"""
  41. return Minio(
  42. current_app.config['MINIO_HOST'],
  43. access_key=current_app.config['MINIO_USER'],
  44. secret_key=current_app.config['MINIO_PASSWORD'],
  45. secure=current_app.config['MINIO_SECURE']
  46. )
  47. def get_minio_config():
  48. """获取 MinIO 配置"""
  49. return {
  50. 'bucket_name': current_app.config['BUCKET_NAME'],
  51. 'prefix': current_app.config['PREFIX'],
  52. 'allowed_extensions': current_app.config['ALLOWED_EXTENSIONS']
  53. }
  54. def is_english(text):
  55. """检查文本是否为英文"""
  56. return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text))
  57. @bp.route('/translate', methods=['POST'])
  58. def data_resource_translate():
  59. # 获取表单数据
  60. data_resource = request.form.get('data_resource')
  61. meta_data = request.form.get('meta_data')
  62. file = request.files.get('file')
  63. if not data_resource or not file:
  64. return jsonify(failed("缺少必要参数:data_resource 或文件"))
  65. # 处理meta_data可能为None的情况
  66. if meta_data:
  67. try:
  68. meta_data_list = json.loads(meta_data)
  69. except json.JSONDecodeError:
  70. logger.error(f"解析meta_data失败: {meta_data}")
  71. meta_data_list = []
  72. else:
  73. logger.warning("meta_data为空,将使用空列表")
  74. meta_data_list = []
  75. # 构建翻译后的内容组合
  76. translated_meta_data_list = []
  77. for meta_item in meta_data_list:
  78. if is_english(meta_item): # 检查是否为英文
  79. translated_meta_data_list.append(meta_item) # 如果是英文,则直接添加
  80. else:
  81. translated_meta_data_list.append(translate_and_parse(meta_item)[0]) # 否则翻译后添加
  82. # 对 data_resource 进行翻译
  83. translated_data_resource = translate_and_parse(data_resource)
  84. if translated_data_resource and len(translated_data_resource) > 0:
  85. translated_data_resource = translated_data_resource[0]
  86. else:
  87. translated_data_resource = data_resource # 翻译失败时使用原值
  88. try:
  89. # 构建最终的翻译结果
  90. resource = {"name": data_resource, "en_name": translated_data_resource}
  91. parsed_data = []
  92. # 读取文件内容
  93. file_content = file.read()
  94. # 重置文件指针
  95. file.seek(0)
  96. try:
  97. df = pd.read_excel(BytesIO(file_content))
  98. except Exception as e:
  99. return jsonify(failed(f"文件格式错误: {str(e)}"))
  100. # 获取列名和对应的数据类型
  101. # 如果meta_data为空,使用DataFrame的列名
  102. if not meta_data_list and not df.empty:
  103. meta_data_list = df.columns.tolist()
  104. translated_meta_data_list = []
  105. for col in meta_data_list:
  106. if is_english(col):
  107. translated_meta_data_list.append(col)
  108. else:
  109. translated_meta_data_list.append(translate_and_parse(col)[0])
  110. columns_and_types = infer_column_type(df)
  111. for i in range(len(meta_data_list)):
  112. zh = meta_data_list[i]
  113. en = translated_meta_data_list[i]
  114. data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)"
  115. parsed_item = {"name": zh, "en_name": en, "data_type": data_type}
  116. parsed_data.append(parsed_item)
  117. response_data = {
  118. "head_data": parsed_data,
  119. "data_resource": resource
  120. }
  121. return jsonify(success(response_data, "success"))
  122. except Exception as e:
  123. logger.error(f"翻译处理失败: {str(e)}", exc_info=True)
  124. return jsonify(failed({}, str(e)))
  125. @bp.route('/save', methods=['POST'])
  126. def data_resource_save():
  127. """保存数据资源"""
  128. try:
  129. # 获取表单数据
  130. # 表单以 receiver 开头时使用下面的方法:
  131. # receiver = request.json.get('receiver', {})
  132. receiver = request.get_json()
  133. # 检查receiver是否存在
  134. if not receiver:
  135. return jsonify(failed("参数不完整:缺少receiver"))
  136. # 检查url是否存在
  137. if 'url' not in receiver:
  138. return jsonify(failed("参数不完整:缺少url"))
  139. additional_info = receiver['additional_info']
  140. if not additional_info:
  141. return jsonify(failed("参数不完整: 缺少additional_info"))
  142. data_resource = additional_info['data_resource']
  143. if not data_resource:
  144. return jsonify(failed("参数不完整: 缺少data_resource"))
  145. file_extension = receiver['url'].split('.')[-1]
  146. head_data = additional_info['head_data']
  147. if file_extension == 'xlsx' or file_extension == 'xls' or file_extension == 'csv':
  148. # 如果文件是excel或csv,则需要检查storage_location是否存在
  149. storage_location = receiver.get('storage_location', '')
  150. if not storage_location:
  151. return jsonify(failed("参数不完整:缺少storage_location或storage_location为空"))
  152. # 调用业务逻辑处理数据资源创建
  153. resource_id = handle_node(receiver, head_data, data_resource)
  154. elif file_extension == 'sql':
  155. data_source = receiver['data_source']
  156. # 如果是ddl,则需要检查data_source是否存在
  157. if not data_source or (isinstance(data_source, dict) and not data_source.get("en_name")):
  158. return jsonify(failed("数据源信息不完整或无效"))
  159. resource_id = handle_node(receiver, head_data, data_resource, data_source)
  160. else:
  161. return jsonify(failed("文件格式错误"))
  162. return jsonify(success({"id": resource_id}))
  163. except Exception as e:
  164. logger.error(f"保存数据资源失败: {str(e)}")
  165. return jsonify(failed(str(e)))
  166. @bp.route('/delete', methods=['POST'])
  167. def data_resource_delete():
  168. """删除数据资源"""
  169. try:
  170. # 获取资源ID
  171. resource_id = request.json.get('id')
  172. if not resource_id:
  173. return jsonify(failed("资源ID不能为空"))
  174. with neo4j_driver.get_session() as session:
  175. # 删除数据资源节点及其关系
  176. cypher = """
  177. MATCH (n:data_resource)
  178. WHERE id(n) = $resource_id
  179. DETACH DELETE n
  180. """
  181. session.run(cypher, resource_id=int(resource_id))
  182. return jsonify(success({"message": "数据资源删除成功"}))
  183. except Exception as e:
  184. logger.error(f"删除数据资源失败: {str(e)}")
  185. return jsonify(failed(str(e)))
  186. @bp.route('/update', methods=['POST'])
  187. def data_resource_update():
  188. """更新数据资源"""
  189. try:
  190. # 获取更新数据
  191. data = request.json
  192. if not data or "id" not in data:
  193. return jsonify(failed("参数不完整"))
  194. # 调用业务逻辑更新数据资源
  195. updated_data = data_resource_edit(data)
  196. return jsonify(success(updated_data))
  197. except Exception as e:
  198. logger.error(f"更新数据资源失败: {str(e)}")
  199. return jsonify(failed(str(e)))
  200. # 解析ddl,使用正则表达式匹配,但没有进行翻译,也没有对注释进行识别
  201. # 使用ddl创建数据资源时,调用该API
  202. @bp.route('/ddl', methods=['POST'])
  203. def id_data_ddl():
  204. """解析数据资源的DDL"""
  205. try:
  206. # 获取SQL内容
  207. sql_content = request.json.get('sql', '')
  208. if not sql_content:
  209. return jsonify(failed("SQL内容不能为空"))
  210. # 记录原始SQL用于调试
  211. logger.debug(f"原始SQL: {sql_content}")
  212. # 提取创建表的DDL语句
  213. create_ddl_list = select_create_ddl(sql_content)
  214. if not create_ddl_list:
  215. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  216. # 解析每个表定义
  217. tables_dict = {} # 最终返回的表字典
  218. for ddl in create_ddl_list:
  219. table_info = table_sql(ddl)
  220. if table_info:
  221. # table_info格式: {"table_name": {"exist": bool, "meta": [...], "table_comment": "..."}}
  222. # 合并到结果字典中
  223. tables_dict.update(table_info)
  224. if not tables_dict:
  225. return jsonify(failed("解析表结构失败"))
  226. # 记录结果
  227. logger.debug(f"解析结果: {json.dumps(tables_dict, ensure_ascii=False)}")
  228. # 直接返回解析结果
  229. return jsonify(success(tables_dict))
  230. except Exception as e:
  231. logger.error(f"解析DDL失败: {str(e)}")
  232. logger.error(traceback.format_exc()) # 添加详细错误堆栈
  233. return jsonify(failed(str(e)))
  234. @bp.route('/list', methods=['POST'])
  235. def data_resource_list():
  236. """获取数据资源列表"""
  237. try:
  238. # 获取分页和筛选参数
  239. page = int(request.json.get('current', 1))
  240. page_size = int(request.json.get('size', 10))
  241. en_name_filter = request.json.get('en_name')
  242. name_filter = request.json.get('name')
  243. type_filter = request.json.get('type', 'all')
  244. category_filter = request.json.get('category')
  245. tag_filter = request.json.get('tag')
  246. # 调用业务逻辑查询数据资源列表
  247. resources, total_count = resource_list(
  248. page,
  249. page_size,
  250. en_name_filter,
  251. name_filter,
  252. type_filter,
  253. category_filter,
  254. tag_filter
  255. )
  256. # 返回结果
  257. return jsonify(success({
  258. "records": resources,
  259. "total": total_count,
  260. "size": page_size,
  261. "current": page
  262. }))
  263. except Exception as e:
  264. logger.error(f"获取数据资源列表失败: {str(e)}")
  265. return jsonify(failed(str(e)))
  266. @bp.route('/search', methods=['POST'])
  267. def id_data_search():
  268. """搜索数据资源关联的元数据"""
  269. try:
  270. # 获取参数
  271. resource_id = request.json.get('id')
  272. if not resource_id:
  273. return jsonify(failed("资源ID不能为空"))
  274. page = int(request.json.get('current', 1))
  275. page_size = int(request.json.get('size', 10))
  276. en_name_filter = request.json.get('en_name')
  277. name_filter = request.json.get('name')
  278. category_filter = request.json.get('category')
  279. tag_filter = request.json.get('tag')
  280. # 调用业务逻辑查询关联元数据
  281. metadata_list, total_count = id_data_search_list(
  282. resource_id,
  283. page,
  284. page_size,
  285. en_name_filter,
  286. name_filter,
  287. category_filter,
  288. tag_filter
  289. )
  290. # 返回结果
  291. return jsonify(success({
  292. "records": metadata_list,
  293. "total": total_count,
  294. "size": page_size,
  295. "current": page
  296. }))
  297. except Exception as e:
  298. logger.error(f"搜索数据资源关联的元数据失败: {str(e)}")
  299. return jsonify(failed(str(e)))
  300. def dynamic_type_conversion(value, target_type):
  301. """动态类型转换"""
  302. if value is None:
  303. return None
  304. if target_type == "int" or target_type == "INT":
  305. return int(value)
  306. elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE":
  307. return float(value)
  308. elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN":
  309. if isinstance(value, str):
  310. return value.lower() in ('true', 'yes', '1', 't', 'y')
  311. return bool(value)
  312. else:
  313. return str(value)
  314. @bp.route('/graph/all', methods=['POST'])
  315. def data_resource_graph_all():
  316. """获取数据资源完整图谱"""
  317. try:
  318. # 获取参数
  319. resource_id = request.json.get('id')
  320. meta = request.json.get('meta', True)
  321. if not resource_id:
  322. return jsonify(failed("资源ID不能为空"))
  323. # 调用业务逻辑获取图谱
  324. graph_data = resource_impact_all_graph(resource_id, meta)
  325. return jsonify(success(graph_data))
  326. except Exception as e:
  327. logger.error(f"获取数据资源完整图谱失败: {str(e)}")
  328. return jsonify(failed(str(e)))
  329. @bp.route('/graph', methods=['POST'])
  330. def data_resource_list_graph():
  331. """获取数据资源亲缘关系图谱"""
  332. try:
  333. # 获取参数
  334. resource_id = request.json.get('id')
  335. meta = request.json.get('meta', True)
  336. if not resource_id:
  337. return jsonify(failed("资源ID不能为空"))
  338. # 调用业务逻辑获取图谱
  339. graph_data = resource_kinship_graph(resource_id, meta)
  340. return jsonify(success(graph_data))
  341. except Exception as e:
  342. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  343. return jsonify(failed(str(e)))
  344. @bp.route('/save/metadata', methods=['POST'])
  345. def id_data_save():
  346. """保存数据资源关联的元数据"""
  347. try:
  348. # 获取参数
  349. resource_id = request.json.get('id')
  350. metadata_list = request.json.get('data', [])
  351. if not resource_id:
  352. return jsonify(failed("资源ID不能为空"))
  353. if not metadata_list:
  354. return jsonify(failed("元数据列表不能为空"))
  355. # 处理元数据保存
  356. with neo4j_driver.get_session() as session:
  357. # 先删除现有关系
  358. cypher_delete = """
  359. MATCH (n:data_resource)-[r:contain]->()
  360. WHERE id(n) = $resource_id
  361. DELETE r
  362. """
  363. session.run(cypher_delete, resource_id=int(resource_id))
  364. # 添加新关系
  365. for meta in metadata_list:
  366. # 创建元数据节点
  367. meta_cypher = """
  368. MERGE (m:Metadata {name: $name})
  369. ON CREATE SET m.en_name = $en_name,
  370. m.createTime = $create_time,
  371. m.type = $type
  372. ON MATCH SET m.type = $type
  373. RETURN m
  374. """
  375. create_time = get_formatted_time()
  376. meta_result = session.run(
  377. meta_cypher,
  378. name=meta["name"],
  379. en_name=meta["en_name"],
  380. create_time=create_time,
  381. type=meta["data_type"]
  382. )
  383. meta_node = meta_result.single()["m"]
  384. meta_id = meta_node.id
  385. # 打印节点ID信息,便于调试
  386. logger.info(f"元数据节点ID: {meta_id}, 类型: {type(meta_id)}")
  387. logger.info(f"数据资源节点ID: {resource_id}, 类型: {type(resource_id)}")
  388. # 使用明确的属性名匹配而不是ID
  389. rel_cypher = """
  390. MATCH (a:data_resource {name: $r_name}), (m:Metadata {name: $m_name})
  391. MERGE (a)-[r:contain]->(m)
  392. RETURN r
  393. """
  394. rel_result = session.run(
  395. rel_cypher,
  396. r_name=receiver['name'],
  397. m_name=meta["name"]
  398. )
  399. # 检查关系是否创建成功
  400. if rel_result.single():
  401. logger.info(f"成功创建关系: {receiver['name']} -> {meta['name']}")
  402. else:
  403. logger.warning(f"关系创建结果为空")
  404. # 额外验证关系是否创建
  405. verify_cypher = """
  406. MATCH (a:data_resource {name: $r_name})-[r:contain]->(m:Metadata {name: $m_name})
  407. RETURN count(r) as rel_count
  408. """
  409. verify_result = session.run(
  410. verify_cypher,
  411. r_name=receiver['name'],
  412. m_name=meta["name"]
  413. )
  414. count = verify_result.single()["rel_count"]
  415. logger.info(f"验证关系数量: {count}")
  416. return jsonify(success({"message": "元数据保存成功"}))
  417. except Exception as e:
  418. logger.error(f"保存数据资源关联的元数据失败: {str(e)}")
  419. return jsonify(failed(str(e)))
  420. @bp.route('/sql/test', methods=['POST'])
  421. def sql_test():
  422. """测试SQL查询"""
  423. try:
  424. # 获取参数
  425. sql_query = request.json.get('sql', '')
  426. if not sql_query:
  427. return jsonify(failed("SQL查询不能为空"))
  428. # 解析SQL
  429. parsed_sql = select_sql(sql_query)
  430. if not parsed_sql:
  431. return jsonify(failed("解析SQL失败"))
  432. # 返回解析结果
  433. return jsonify(success(parsed_sql))
  434. except Exception as e:
  435. logger.error(f"测试SQL查询失败: {str(e)}")
  436. return jsonify(failed(str(e)))
  437. # 使用LLM识别DDL语句,用来代替原来的正则的方式
  438. # 用于在数据资源创建时,识别DDL语句 /api/resource/ddl/parse
  439. @bp.route('/ddl/parse', methods=['POST'])
  440. def ddl_identify():
  441. """识别DDL语句"""
  442. try:
  443. # 获取参数
  444. sql_content = request.json.get('sql', '')
  445. if not sql_content:
  446. return jsonify(failed("SQL内容不能为空"))
  447. parser = DDLParser()
  448. # 提取创建表的DDL语句
  449. ddl_list = parser.parse_ddl(sql_content)
  450. if not ddl_list:
  451. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  452. # 为每个表名添加exist字段
  453. if isinstance(ddl_list, dict):
  454. # 检查是否有data_source键
  455. data_source = None
  456. if "data_source" in ddl_list:
  457. # 临时移除data_source,以便只遍历表
  458. data_source = ddl_list.pop("data_source", None)
  459. # 获取所有表名 - 过滤掉可能的非表结构键
  460. table_names = []
  461. for key, value in list(ddl_list.items()):
  462. # 检查值是否是字典且包含meta键,这表明它是一个表结构
  463. if isinstance(value, dict) and "meta" in value:
  464. table_names.append(key)
  465. # 如果不是表结构,则不处理
  466. # 只有在有表名时才调用status_query
  467. if table_names:
  468. try:
  469. # 调用status_query获取表的存在状态
  470. status_results = status_query(table_names)
  471. # status_query返回的可能是单个值或嵌套列表,需要平展处理
  472. flat_results = []
  473. if status_results:
  474. # 如果是嵌套列表(通常只有一层嵌套),则拍平
  475. if isinstance(status_results, list):
  476. if len(status_results) == 1 and isinstance(status_results[0], list):
  477. flat_results = status_results[0] # 拍平一层嵌套
  478. else:
  479. flat_results = status_results # 已经是平的列表
  480. # 将状态添加到每个表
  481. for i, table_name in enumerate(table_names):
  482. if i < len(flat_results):
  483. ddl_list[table_name]["exist"] = flat_results[i]
  484. else:
  485. ddl_list[table_name]["exist"] = False
  486. except Exception as e:
  487. logger.error(f"检查表存在状态失败: {str(e)}")
  488. # 如果status_query失败,所有表默认为不存在
  489. for table_name in table_names:
  490. ddl_list[table_name]["exist"] = False
  491. # 恢复data_source
  492. if data_source:
  493. ddl_list["data_source"] = data_source
  494. logger.debug(f"识别到的DDL语句: {ddl_list}")
  495. return jsonify(success(ddl_list))
  496. except Exception as e:
  497. logger.error(f"识别DDL语句失败: {str(e)}")
  498. logger.error(traceback.format_exc()) # 添加详细错误堆栈
  499. return jsonify(failed(str(e)))
  500. # 废弃的识别DDL语句方法,该API 与 ddl API 功能类似,但功能简化了
  501. @bp.route('/ddl/identify', methods=['POST'])
  502. def sql_ddl_identify():
  503. """识别DDL语句"""
  504. try:
  505. # 获取参数
  506. sql_content = request.json.get('sql', '')
  507. if not sql_content:
  508. return jsonify(failed("SQL内容不能为空"))
  509. # 提取创建表的DDL语句
  510. create_ddl_list = select_create_ddl(sql_content)
  511. if not create_ddl_list:
  512. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  513. return jsonify(success({"count": len(create_ddl_list)}))
  514. except Exception as e:
  515. logger.error(f"识别DDL语句失败: {str(e)}")
  516. return jsonify(failed(str(e)))
  517. @bp.route('/model/list', methods=['POST'])
  518. def resource_model_list():
  519. """获取模型资源列表"""
  520. try:
  521. # 获取分页和筛选参数
  522. page = int(request.json.get('current', 1))
  523. page_size = int(request.json.get('size', 10))
  524. name_filter = request.json.get('name')
  525. # 调用业务逻辑查询模型资源列表
  526. resources, total_count = model_resource_list(page, page_size, name_filter)
  527. # 返回结果
  528. return jsonify(success({
  529. "records": resources,
  530. "total": total_count,
  531. "size": page_size,
  532. "current": page
  533. }))
  534. except Exception as e:
  535. logger.error(f"获取模型资源列表失败: {str(e)}")
  536. return jsonify(failed(str(e)))
  537. @bp.route('/detail', methods=['POST'])
  538. def data_resource_detail():
  539. """获取数据资源详情"""
  540. try:
  541. # 获取资源ID
  542. resource_id = request.json.get('id')
  543. if not resource_id:
  544. return jsonify(failed("资源ID不能为空"))
  545. # 调用业务逻辑查询数据资源详情
  546. resource_data = handle_id_resource(resource_id)
  547. if not resource_data:
  548. return jsonify(failed("资源不存在"))
  549. return jsonify(success(resource_data))
  550. except Exception as e:
  551. logger.error(f"获取数据资源详情失败: {str(e)}")
  552. return jsonify(failed(str(e)))
  553. @bp.route('/config', methods=['GET'])
  554. @require_auth
  555. def get_resource_config():
  556. """获取数据资源配置信息"""
  557. config = get_minio_config()
  558. return jsonify({
  559. 'allowed_extensions': list(config['allowed_extensions']),
  560. 'bucket_name': config['bucket_name'],
  561. 'prefix': config['prefix']
  562. })