routes.py 16 KB


  1. from io import BytesIO, StringIO
  2. import os
  3. import pandas as pd
  4. from flask import request, jsonify, send_file
  5. from app.api.data_resource import bp
  6. from app.models.result import success, failed
  7. import logging
  8. import json
  9. import re
  10. from minio import Minio
  11. from app.config.config import Config
  12. from app.services.neo4j_driver import neo4j_driver
  13. from app.core.data_resource.resource import (
  14. resource_list,
  15. handle_node,
  16. resource_kinship_graph,
  17. resource_impact_all_graph,
  18. model_resource_list,
  19. select_create_ddl,
  20. data_resource_edit,
  21. handle_id_resource,
  22. id_data_search_list,
  23. table_sql,
  24. select_sql,
  25. id_resource_graph
  26. )
  27. from app.core.meta_data import (
  28. translate_and_parse,
  29. infer_column_type,
  30. text_resource_solve,
  31. get_file_content,
  32. get_formatted_time
  33. )
  34. logger = logging.getLogger("app")
  35. # 配置MinIO客户端
  36. minio_client = Minio(
  37. Config.MINIO_HOST,
  38. access_key=Config.MINIO_USER,
  39. secret_key=Config.MINIO_PASSWORD,
  40. secure=True
  41. )
  42. # 配置文件上传相关
  43. UPLOAD_FOLDER = Config.UPLOAD_FOLDER
  44. bucket_name = Config.BUCKET_NAME
  45. prefix = Config.PREFIX
  46. def is_english(text):
  47. """检查文本是否为英文"""
  48. return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text))
  49. @bp.route('/translate', methods=['POST'])
  50. def data_resource_translate():
  51. """数据资源翻译"""
  52. try:
  53. # 获取表单数据
  54. name = request.json.get('name', '')
  55. en_name = request.json.get('en_name', '')
  56. data_type = request.json.get('data_type', 'table')
  57. is_file = request.json.get('is_file', False)
  58. # 验证输入
  59. if not name:
  60. return jsonify(failed("名称不能为空"))
  61. # 如果已经提供了英文名,则直接使用
  62. if en_name and is_english(en_name):
  63. translated = True
  64. return jsonify(success({"name": name, "en_name": en_name, "translated": translated}))
  65. # 否则进行翻译
  66. try:
  67. if data_type == 'table':
  68. prompt = f"""将以下数据表名(中文)翻译成英文,不需要额外说明:
  69. 中文:{name}
  70. 英文(snake_case格式):
  71. """
  72. result = text_resource_solve(None, name, "")
  73. translated = True
  74. return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
  75. else:
  76. result = text_resource_solve(None, name, "")
  77. translated = True
  78. return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
  79. except Exception as e:
  80. logger.error(f"翻译失败: {str(e)}")
  81. return jsonify(failed(f"翻译失败: {str(e)}"))
  82. except Exception as e:
  83. logger.error(f"处理数据资源翻译请求失败: {str(e)}")
  84. return jsonify(failed(str(e)))
  85. @bp.route('/save', methods=['POST'])
  86. def data_resource_save():
  87. """保存数据资源"""
  88. try:
  89. # 获取表单数据
  90. receiver = request.json.get('receiver', {})
  91. head_data = request.json.get('head_data', [])
  92. data_resource = request.json.get('data_resource', {})
  93. if not receiver or not data_resource:
  94. return jsonify(failed("参数不完整"))
  95. # 调用业务逻辑处理数据资源创建
  96. resource_id = handle_node(receiver, head_data, data_resource)
  97. return jsonify(success({"id": resource_id}))
  98. except Exception as e:
  99. logger.error(f"保存数据资源失败: {str(e)}")
  100. return jsonify(failed(str(e)))
  101. @bp.route('/delete', methods=['POST'])
  102. def data_resource_delete():
  103. """删除数据资源"""
  104. try:
  105. # 获取资源ID
  106. resource_id = request.json.get('id')
  107. if not resource_id:
  108. return jsonify(failed("资源ID不能为空"))
  109. with neo4j_driver.get_session() as session:
  110. # 删除数据资源节点及其关系
  111. cypher = """
  112. MATCH (n:data_resource)
  113. WHERE id(n) = $resource_id
  114. DETACH DELETE n
  115. """
  116. session.run(cypher, resource_id=int(resource_id))
  117. return jsonify(success({"message": "数据资源删除成功"}))
  118. except Exception as e:
  119. logger.error(f"删除数据资源失败: {str(e)}")
  120. return jsonify(failed(str(e)))
  121. @bp.route('/update', methods=['POST'])
  122. def data_resource_update():
  123. """更新数据资源"""
  124. try:
  125. # 获取更新数据
  126. data = request.json
  127. if not data or "id" not in data:
  128. return jsonify(failed("参数不完整"))
  129. # 调用业务逻辑更新数据资源
  130. updated_data = data_resource_edit(data)
  131. return jsonify(success(updated_data))
  132. except Exception as e:
  133. logger.error(f"更新数据资源失败: {str(e)}")
  134. return jsonify(failed(str(e)))
  135. @bp.route('/ddl', methods=['POST'])
  136. def id_data_ddl():
  137. """解析数据资源的DDL"""
  138. try:
  139. # 获取SQL内容
  140. sql_content = request.json.get('sql', '')
  141. if not sql_content:
  142. return jsonify(failed("SQL内容不能为空"))
  143. # 提取创建表的DDL语句
  144. create_ddl_list = select_create_ddl(sql_content)
  145. if not create_ddl_list:
  146. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  147. # 解析每个表定义
  148. tables = []
  149. for ddl in create_ddl_list:
  150. table_info = table_sql(ddl)
  151. if table_info:
  152. tables.append(table_info)
  153. if not tables:
  154. return jsonify(failed("解析表结构失败"))
  155. # 转换结果为前端需要的格式
  156. result = []
  157. for table in tables:
  158. table_result = {
  159. "name": table["table_name"],
  160. "en_name": table["table_name"], # 初始使用原名
  161. "fields": []
  162. }
  163. # 处理每个字段
  164. for field in table["fields"]:
  165. field_info = {
  166. "name": field["name"],
  167. "en_name": field["name"], # 初始使用原名
  168. "type": field["type"],
  169. "is_primary": field["is_primary"],
  170. "nullable": not field.get("not_null", False)
  171. }
  172. if "default" in field:
  173. field_info["default"] = field["default"]
  174. table_result["fields"].append(field_info)
  175. result.append(table_result)
  176. return jsonify(success(result))
  177. except Exception as e:
  178. logger.error(f"解析DDL失败: {str(e)}")
  179. return jsonify(failed(str(e)))
  180. @bp.route('/list', methods=['POST'])
  181. def data_resource_list():
  182. """获取数据资源列表"""
  183. try:
  184. # 获取分页和筛选参数
  185. page = int(request.json.get('current', 1))
  186. page_size = int(request.json.get('size', 10))
  187. en_name_filter = request.json.get('en_name')
  188. name_filter = request.json.get('name')
  189. type_filter = request.json.get('type', 'all')
  190. category_filter = request.json.get('category')
  191. tag_filter = request.json.get('tag')
  192. # 调用业务逻辑查询数据资源列表
  193. resources, total_count = resource_list(
  194. page,
  195. page_size,
  196. en_name_filter,
  197. name_filter,
  198. type_filter,
  199. category_filter,
  200. tag_filter
  201. )
  202. # 返回结果
  203. return jsonify(success({
  204. "records": resources,
  205. "total": total_count,
  206. "size": page_size,
  207. "current": page
  208. }))
  209. except Exception as e:
  210. logger.error(f"获取数据资源列表失败: {str(e)}")
  211. return jsonify(failed(str(e)))
  212. @bp.route('/search', methods=['POST'])
  213. def id_data_search():
  214. """搜索数据资源关联的元数据"""
  215. try:
  216. # 获取参数
  217. resource_id = request.json.get('id')
  218. if not resource_id:
  219. return jsonify(failed("资源ID不能为空"))
  220. page = int(request.json.get('current', 1))
  221. page_size = int(request.json.get('size', 10))
  222. en_name_filter = request.json.get('en_name')
  223. name_filter = request.json.get('name')
  224. category_filter = request.json.get('category')
  225. tag_filter = request.json.get('tag')
  226. # 调用业务逻辑查询关联元数据
  227. metadata_list, total_count = id_data_search_list(
  228. resource_id,
  229. page,
  230. page_size,
  231. en_name_filter,
  232. name_filter,
  233. category_filter,
  234. tag_filter
  235. )
  236. # 返回结果
  237. return jsonify(success({
  238. "records": metadata_list,
  239. "total": total_count,
  240. "size": page_size,
  241. "current": page
  242. }))
  243. except Exception as e:
  244. logger.error(f"搜索数据资源关联的元数据失败: {str(e)}")
  245. return jsonify(failed(str(e)))
  246. def dynamic_type_conversion(value, target_type):
  247. """动态类型转换"""
  248. if value is None:
  249. return None
  250. if target_type == "int" or target_type == "INT":
  251. return int(value)
  252. elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE":
  253. return float(value)
  254. elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN":
  255. if isinstance(value, str):
  256. return value.lower() in ('true', 'yes', '1', 't', 'y')
  257. return bool(value)
  258. else:
  259. return str(value)
  260. @bp.route('/graph/all', methods=['POST'])
  261. def data_resource_graph_all():
  262. """获取数据资源完整图谱"""
  263. try:
  264. # 获取参数
  265. resource_id = request.json.get('id')
  266. meta = request.json.get('meta', True)
  267. if not resource_id:
  268. return jsonify(failed("资源ID不能为空"))
  269. # 调用业务逻辑获取图谱
  270. graph_data = resource_impact_all_graph(resource_id, meta)
  271. return jsonify(success(graph_data))
  272. except Exception as e:
  273. logger.error(f"获取数据资源完整图谱失败: {str(e)}")
  274. return jsonify(failed(str(e)))
  275. @bp.route('/graph', methods=['POST'])
  276. def data_resource_list_graph():
  277. """获取数据资源亲缘关系图谱"""
  278. try:
  279. # 获取参数
  280. resource_id = request.json.get('id')
  281. meta = request.json.get('meta', True)
  282. if not resource_id:
  283. return jsonify(failed("资源ID不能为空"))
  284. # 调用业务逻辑获取图谱
  285. graph_data = resource_kinship_graph(resource_id, meta)
  286. return jsonify(success(graph_data))
  287. except Exception as e:
  288. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  289. return jsonify(failed(str(e)))
  290. @bp.route('/save/metadata', methods=['POST'])
  291. def id_data_save():
  292. """保存数据资源关联的元数据"""
  293. try:
  294. # 获取参数
  295. resource_id = request.json.get('id')
  296. metadata_list = request.json.get('data', [])
  297. if not resource_id:
  298. return jsonify(failed("资源ID不能为空"))
  299. if not metadata_list:
  300. return jsonify(failed("元数据列表不能为空"))
  301. # 处理元数据保存
  302. with neo4j_driver.get_session() as session:
  303. # 先删除现有关系
  304. cypher_delete = """
  305. MATCH (n:data_resource)-[r:contain]->()
  306. WHERE id(n) = $resource_id
  307. DELETE r
  308. """
  309. session.run(cypher_delete, resource_id=int(resource_id))
  310. # 添加新关系
  311. for meta in metadata_list:
  312. # 创建或获取元数据节点
  313. meta_cypher = """
  314. MERGE (m:Metadata {name: $name})
  315. ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
  316. RETURN m
  317. """
  318. meta_result = session.run(
  319. meta_cypher,
  320. name=meta["name"],
  321. en_name=meta["en_name"],
  322. create_time=meta.get("createTime", get_formatted_time())
  323. )
  324. meta_node = meta_result.single()["m"]
  325. # 创建关系
  326. rel_cypher = """
  327. MATCH (n:data_resource), (m:Metadata)
  328. WHERE id(n) = $resource_id AND id(m) = $meta_id
  329. CREATE (n)-[r:contain]->(m)
  330. RETURN r
  331. """
  332. session.run(
  333. rel_cypher,
  334. resource_id=int(resource_id),
  335. meta_id=meta_node.id
  336. )
  337. return jsonify(success({"message": "元数据保存成功"}))
  338. except Exception as e:
  339. logger.error(f"保存数据资源关联的元数据失败: {str(e)}")
  340. return jsonify(failed(str(e)))
  341. @bp.route('/sql/test', methods=['POST'])
  342. def sql_test():
  343. """测试SQL查询"""
  344. try:
  345. # 获取参数
  346. sql_query = request.json.get('sql', '')
  347. if not sql_query:
  348. return jsonify(failed("SQL查询不能为空"))
  349. # 解析SQL
  350. parsed_sql = select_sql(sql_query)
  351. if not parsed_sql:
  352. return jsonify(failed("解析SQL失败"))
  353. # 返回解析结果
  354. return jsonify(success(parsed_sql))
  355. except Exception as e:
  356. logger.error(f"测试SQL查询失败: {str(e)}")
  357. return jsonify(failed(str(e)))
  358. @bp.route('/ddl/identify', methods=['POST'])
  359. def sql_ddl_identify():
  360. """识别DDL语句"""
  361. try:
  362. # 获取参数
  363. sql_content = request.json.get('sql', '')
  364. if not sql_content:
  365. return jsonify(failed("SQL内容不能为空"))
  366. # 提取创建表的DDL语句
  367. create_ddl_list = select_create_ddl(sql_content)
  368. if not create_ddl_list:
  369. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  370. return jsonify(success({"count": len(create_ddl_list)}))
  371. except Exception as e:
  372. logger.error(f"识别DDL语句失败: {str(e)}")
  373. return jsonify(failed(str(e)))
  374. @bp.route('/model/list', methods=['POST'])
  375. def resource_model_list():
  376. """获取模型资源列表"""
  377. try:
  378. # 获取分页和筛选参数
  379. page = int(request.json.get('current', 1))
  380. page_size = int(request.json.get('size', 10))
  381. name_filter = request.json.get('name')
  382. # 调用业务逻辑查询模型资源列表
  383. resources, total_count = model_resource_list(page, page_size, name_filter)
  384. # 返回结果
  385. return jsonify(success({
  386. "records": resources,
  387. "total": total_count,
  388. "size": page_size,
  389. "current": page
  390. }))
  391. except Exception as e:
  392. logger.error(f"获取模型资源列表失败: {str(e)}")
  393. return jsonify(failed(str(e)))
  394. @bp.route('/detail', methods=['POST'])
  395. def data_resource_detail():
  396. """获取数据资源详情"""
  397. try:
  398. # 获取资源ID
  399. resource_id = request.json.get('id')
  400. if not resource_id:
  401. return jsonify(failed("资源ID不能为空"))
  402. # 调用业务逻辑查询数据资源详情
  403. resource_data = handle_id_resource(resource_id)
  404. if not resource_data:
  405. return jsonify(failed("资源不存在"))
  406. return jsonify(success(resource_data))
  407. except Exception as e:
  408. logger.error(f"获取数据资源详情失败: {str(e)}")
  409. return jsonify(failed(str(e)))