routes.py 18 KB


  1. from io import BytesIO, StringIO
  2. import os
  3. import pandas as pd
  4. from flask import request, jsonify, send_file
  5. from app.api.data_resource import bp
  6. from app.models.result import success, failed
  7. import logging
  8. import json
  9. import re
  10. from minio import Minio
  11. from app.config.config import Config
  12. from app.core.graph.graph_operations import MyEncoder
  13. from app.services.neo4j_driver import neo4j_driver
  14. from app.core.data_resource.resource import (
  15. resource_list,
  16. handle_node,
  17. resource_kinship_graph,
  18. resource_impact_all_graph,
  19. model_resource_list,
  20. select_create_ddl,
  21. data_resource_edit,
  22. handle_id_resource,
  23. id_data_search_list,
  24. table_sql,
  25. select_sql,
  26. id_resource_graph
  27. )
  28. from app.core.meta_data import (
  29. translate_and_parse,
  30. infer_column_type,
  31. text_resource_solve,
  32. get_file_content,
  33. get_formatted_time
  34. )
  35. logger = logging.getLogger("app")
  36. # 配置MinIO客户端
  37. minio_client = Minio(
  38. Config.MINIO_HOST,
  39. access_key=Config.MINIO_USER,
  40. secret_key=Config.MINIO_PASSWORD,
  41. secure=True
  42. )
  43. # 配置文件上传相关
  44. UPLOAD_FOLDER = Config.UPLOAD_FOLDER
  45. bucket_name = Config.BUCKET_NAME
  46. prefix = Config.PREFIX
  47. def is_english(text):
  48. """检查文本是否为英文"""
  49. return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text))
  50. @bp.route('/translate', methods=['POST'])
  51. def data_resource_translate():
  52. # 获取表单数据
  53. data_resource = request.form.get('data_resource')
  54. meta_data = request.form.get('meta_data')
  55. meta_data_list = json.loads(meta_data)
  56. file = request.files.get('file')
  57. if not data_resource or not meta_data or not file:
  58. return jsonify(failed("缺少必要参数"))
  59. # 构建翻译后的内容组合
  60. translated_meta_data_list = []
  61. for meta_item in meta_data_list:
  62. if is_english(meta_item): # 检查是否为英文
  63. translated_meta_data_list.append(meta_item) # 如果是英文,则直接添加
  64. else:
  65. translated_meta_data_list.append(translate_and_parse(meta_item)[0]) # 否则翻译后添加
  66. # 对 data_resource 进行翻译
  67. translated_data_resource = translate_and_parse(data_resource)
  68. if translated_data_resource and len(translated_data_resource) > 0:
  69. translated_data_resource = translated_data_resource[0]
  70. else:
  71. translated_data_resource = data_resource # 翻译失败时使用原值
  72. try:
  73. # 构建最终的翻译结果
  74. # meta_en = translated_meta_data_list
  75. resource = {"name": data_resource, "en_name": translated_data_resource}
  76. parsed_data = []
  77. # 读取文件内容
  78. file_content = file.read()
  79. # 重置文件指针
  80. file.seek(0)
  81. try:
  82. df = pd.read_excel(BytesIO(file_content))
  83. except Exception as e:
  84. return jsonify(failed(f"文件格式错误: {str(e)}"))
  85. # 获取列名和对应的数据类型
  86. columns_and_types = infer_column_type(df)
  87. for i in range(len(meta_data_list)):
  88. zh = meta_data_list[i]
  89. en = translated_meta_data_list[i]
  90. data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)"
  91. parsed_item = {"name": zh, "en_name": en, "data_type": data_type}
  92. parsed_data.append(parsed_item)
  93. response_data = {
  94. "head_data": parsed_data,
  95. "data_resource": resource
  96. }
  97. return jsonify(success(response_data, "success"))
  98. except Exception as e:
  99. return jsonify(failed({}, str(e)))
  100. # 废弃的翻译方法
  101. # """数据资源翻译"""
  102. # try:
  103. # # 获取表单数据
  104. # name = request.json.get('name', '')
  105. # en_name = request.json.get('en_name', '')
  106. # data_type = request.json.get('data_type', 'table')
  107. # is_file = request.json.get('is_file', False)
  108. # # 验证输入
  109. # if not name:
  110. # return jsonify(failed("名称不能为空"))
  111. # # 如果已经提供了英文名,则直接使用
  112. # if en_name and is_english(en_name):
  113. # translated = True
  114. # return jsonify(success({"name": name, "en_name": en_name, "translated": translated}))
  115. # # 否则进行翻译
  116. # try:
  117. # if data_type == 'table':
  118. # prompt = f"""将以下数据表名(中文)翻译成英文,不需要额外说明:
  119. # 中文:{name}
  120. # 英文(snake_case格式):
  121. # """
  122. # result = text_resource_solve(None, name, "")
  123. # translated = True
  124. # return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
  125. # else:
  126. # result = text_resource_solve(None, name, "")
  127. # translated = True
  128. # return jsonify(success({"name": name, "en_name": result["en_name"], "translated": translated}))
  129. # except Exception as e:
  130. # logger.error(f"翻译失败: {str(e)}")
  131. # return jsonify(failed(f"翻译失败: {str(e)}"))
  132. # except Exception as e:
  133. # logger.error(f"处理数据资源翻译请求失败: {str(e)}")
  134. # return jsonify(failed(str(e)))
  135. @bp.route('/save', methods=['POST'])
  136. def data_resource_save():
  137. """保存数据资源"""
  138. try:
  139. # 获取表单数据
  140. receiver = request.json.get('receiver', {})
  141. head_data = request.json.get('head_data', [])
  142. data_resource = request.json.get('data_resource', {})
  143. if not receiver or not data_resource:
  144. return jsonify(failed("参数不完整"))
  145. # 调用业务逻辑处理数据资源创建
  146. resource_id = handle_node(receiver, head_data, data_resource)
  147. return jsonify(success({"id": resource_id}))
  148. except Exception as e:
  149. logger.error(f"保存数据资源失败: {str(e)}")
  150. return jsonify(failed(str(e)))
  151. @bp.route('/delete', methods=['POST'])
  152. def data_resource_delete():
  153. """删除数据资源"""
  154. try:
  155. # 获取资源ID
  156. resource_id = request.json.get('id')
  157. if not resource_id:
  158. return jsonify(failed("资源ID不能为空"))
  159. with neo4j_driver.get_session() as session:
  160. # 删除数据资源节点及其关系
  161. cypher = """
  162. MATCH (n:data_resource)
  163. WHERE id(n) = $resource_id
  164. DETACH DELETE n
  165. """
  166. session.run(cypher, resource_id=int(resource_id))
  167. return jsonify(success({"message": "数据资源删除成功"}))
  168. except Exception as e:
  169. logger.error(f"删除数据资源失败: {str(e)}")
  170. return jsonify(failed(str(e)))
  171. @bp.route('/update', methods=['POST'])
  172. def data_resource_update():
  173. """更新数据资源"""
  174. try:
  175. # 获取更新数据
  176. data = request.json
  177. if not data or "id" not in data:
  178. return jsonify(failed("参数不完整"))
  179. # 调用业务逻辑更新数据资源
  180. updated_data = data_resource_edit(data)
  181. return jsonify(success(updated_data))
  182. except Exception as e:
  183. logger.error(f"更新数据资源失败: {str(e)}")
  184. return jsonify(failed(str(e)))
  185. @bp.route('/ddl', methods=['POST'])
  186. def id_data_ddl():
  187. """解析数据资源的DDL"""
  188. try:
  189. # 获取SQL内容
  190. sql_content = request.json.get('sql', '')
  191. if not sql_content:
  192. return jsonify(failed("SQL内容不能为空"))
  193. # 提取创建表的DDL语句
  194. create_ddl_list = select_create_ddl(sql_content)
  195. if not create_ddl_list:
  196. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  197. # 解析每个表定义
  198. tables = []
  199. for ddl in create_ddl_list:
  200. table_info = table_sql(ddl)
  201. if table_info:
  202. tables.append(table_info)
  203. if not tables:
  204. return jsonify(failed("解析表结构失败"))
  205. # 转换结果为前端需要的格式
  206. result = []
  207. for table in tables:
  208. table_result = {
  209. "name": table["table_name"],
  210. "en_name": table["table_name"], # 初始使用原名
  211. "fields": []
  212. }
  213. # 处理每个字段
  214. for field in table["fields"]:
  215. field_info = {
  216. "name": field["name"],
  217. "en_name": field["name"], # 初始使用原名
  218. "type": field["type"],
  219. "is_primary": field["is_primary"],
  220. "nullable": not field.get("not_null", False)
  221. }
  222. if "default" in field:
  223. field_info["default"] = field["default"]
  224. table_result["fields"].append(field_info)
  225. result.append(table_result)
  226. return jsonify(success(result))
  227. except Exception as e:
  228. logger.error(f"解析DDL失败: {str(e)}")
  229. return jsonify(failed(str(e)))
  230. @bp.route('/list', methods=['POST'])
  231. def data_resource_list():
  232. """获取数据资源列表"""
  233. try:
  234. # 获取分页和筛选参数
  235. page = int(request.json.get('current', 1))
  236. page_size = int(request.json.get('size', 10))
  237. en_name_filter = request.json.get('en_name')
  238. name_filter = request.json.get('name')
  239. type_filter = request.json.get('type', 'all')
  240. category_filter = request.json.get('category')
  241. tag_filter = request.json.get('tag')
  242. # 调用业务逻辑查询数据资源列表
  243. resources, total_count = resource_list(
  244. page,
  245. page_size,
  246. en_name_filter,
  247. name_filter,
  248. type_filter,
  249. category_filter,
  250. tag_filter
  251. )
  252. # 返回结果
  253. return jsonify(success({
  254. "records": resources,
  255. "total": total_count,
  256. "size": page_size,
  257. "current": page
  258. }))
  259. except Exception as e:
  260. logger.error(f"获取数据资源列表失败: {str(e)}")
  261. return jsonify(failed(str(e)))
  262. @bp.route('/search', methods=['POST'])
  263. def id_data_search():
  264. """搜索数据资源关联的元数据"""
  265. try:
  266. # 获取参数
  267. resource_id = request.json.get('id')
  268. if not resource_id:
  269. return jsonify(failed("资源ID不能为空"))
  270. page = int(request.json.get('current', 1))
  271. page_size = int(request.json.get('size', 10))
  272. en_name_filter = request.json.get('en_name')
  273. name_filter = request.json.get('name')
  274. category_filter = request.json.get('category')
  275. tag_filter = request.json.get('tag')
  276. # 调用业务逻辑查询关联元数据
  277. metadata_list, total_count = id_data_search_list(
  278. resource_id,
  279. page,
  280. page_size,
  281. en_name_filter,
  282. name_filter,
  283. category_filter,
  284. tag_filter
  285. )
  286. # 返回结果
  287. return jsonify(success({
  288. "records": metadata_list,
  289. "total": total_count,
  290. "size": page_size,
  291. "current": page
  292. }))
  293. except Exception as e:
  294. logger.error(f"搜索数据资源关联的元数据失败: {str(e)}")
  295. return jsonify(failed(str(e)))
  296. def dynamic_type_conversion(value, target_type):
  297. """动态类型转换"""
  298. if value is None:
  299. return None
  300. if target_type == "int" or target_type == "INT":
  301. return int(value)
  302. elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE":
  303. return float(value)
  304. elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN":
  305. if isinstance(value, str):
  306. return value.lower() in ('true', 'yes', '1', 't', 'y')
  307. return bool(value)
  308. else:
  309. return str(value)
  310. @bp.route('/graph/all', methods=['POST'])
  311. def data_resource_graph_all():
  312. """获取数据资源完整图谱"""
  313. try:
  314. # 获取参数
  315. resource_id = request.json.get('id')
  316. meta = request.json.get('meta', True)
  317. if not resource_id:
  318. return jsonify(failed("资源ID不能为空"))
  319. # 调用业务逻辑获取图谱
  320. graph_data = resource_impact_all_graph(resource_id, meta)
  321. return jsonify(success(graph_data))
  322. except Exception as e:
  323. logger.error(f"获取数据资源完整图谱失败: {str(e)}")
  324. return jsonify(failed(str(e)))
  325. @bp.route('/graph', methods=['POST'])
  326. def data_resource_list_graph():
  327. """获取数据资源亲缘关系图谱"""
  328. try:
  329. # 获取参数
  330. resource_id = request.json.get('id')
  331. meta = request.json.get('meta', True)
  332. if not resource_id:
  333. return jsonify(failed("资源ID不能为空"))
  334. # 调用业务逻辑获取图谱
  335. graph_data = resource_kinship_graph(resource_id, meta)
  336. return jsonify(success(graph_data))
  337. except Exception as e:
  338. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  339. return jsonify(failed(str(e)))
  340. @bp.route('/save/metadata', methods=['POST'])
  341. def id_data_save():
  342. """保存数据资源关联的元数据"""
  343. try:
  344. # 获取参数
  345. resource_id = request.json.get('id')
  346. metadata_list = request.json.get('data', [])
  347. if not resource_id:
  348. return jsonify(failed("资源ID不能为空"))
  349. if not metadata_list:
  350. return jsonify(failed("元数据列表不能为空"))
  351. # 处理元数据保存
  352. with neo4j_driver.get_session() as session:
  353. # 先删除现有关系
  354. cypher_delete = """
  355. MATCH (n:data_resource)-[r:contain]->()
  356. WHERE id(n) = $resource_id
  357. DELETE r
  358. """
  359. session.run(cypher_delete, resource_id=int(resource_id))
  360. # 添加新关系
  361. for meta in metadata_list:
  362. # 创建或获取元数据节点
  363. meta_cypher = """
  364. MERGE (m:Metadata {name: $name})
  365. ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
  366. RETURN m
  367. """
  368. meta_result = session.run(
  369. meta_cypher,
  370. name=meta["name"],
  371. en_name=meta["en_name"],
  372. create_time=meta.get("createTime", get_formatted_time())
  373. )
  374. meta_node = meta_result.single()["m"]
  375. # 创建关系
  376. rel_cypher = """
  377. MATCH (n:data_resource), (m:Metadata)
  378. WHERE id(n) = $resource_id AND id(m) = $meta_id
  379. CREATE (n)-[r:contain]->(m)
  380. RETURN r
  381. """
  382. session.run(
  383. rel_cypher,
  384. resource_id=int(resource_id),
  385. meta_id=meta_node.id
  386. )
  387. return jsonify(success({"message": "元数据保存成功"}))
  388. except Exception as e:
  389. logger.error(f"保存数据资源关联的元数据失败: {str(e)}")
  390. return jsonify(failed(str(e)))
  391. @bp.route('/sql/test', methods=['POST'])
  392. def sql_test():
  393. """测试SQL查询"""
  394. try:
  395. # 获取参数
  396. sql_query = request.json.get('sql', '')
  397. if not sql_query:
  398. return jsonify(failed("SQL查询不能为空"))
  399. # 解析SQL
  400. parsed_sql = select_sql(sql_query)
  401. if not parsed_sql:
  402. return jsonify(failed("解析SQL失败"))
  403. # 返回解析结果
  404. return jsonify(success(parsed_sql))
  405. except Exception as e:
  406. logger.error(f"测试SQL查询失败: {str(e)}")
  407. return jsonify(failed(str(e)))
  408. @bp.route('/ddl/identify', methods=['POST'])
  409. def sql_ddl_identify():
  410. """识别DDL语句"""
  411. try:
  412. # 获取参数
  413. sql_content = request.json.get('sql', '')
  414. if not sql_content:
  415. return jsonify(failed("SQL内容不能为空"))
  416. # 提取创建表的DDL语句
  417. create_ddl_list = select_create_ddl(sql_content)
  418. if not create_ddl_list:
  419. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  420. return jsonify(success({"count": len(create_ddl_list)}))
  421. except Exception as e:
  422. logger.error(f"识别DDL语句失败: {str(e)}")
  423. return jsonify(failed(str(e)))
  424. @bp.route('/model/list', methods=['POST'])
  425. def resource_model_list():
  426. """获取模型资源列表"""
  427. try:
  428. # 获取分页和筛选参数
  429. page = int(request.json.get('current', 1))
  430. page_size = int(request.json.get('size', 10))
  431. name_filter = request.json.get('name')
  432. # 调用业务逻辑查询模型资源列表
  433. resources, total_count = model_resource_list(page, page_size, name_filter)
  434. # 返回结果
  435. return jsonify(success({
  436. "records": resources,
  437. "total": total_count,
  438. "size": page_size,
  439. "current": page
  440. }))
  441. except Exception as e:
  442. logger.error(f"获取模型资源列表失败: {str(e)}")
  443. return jsonify(failed(str(e)))
  444. @bp.route('/detail', methods=['POST'])
  445. def data_resource_detail():
  446. """获取数据资源详情"""
  447. try:
  448. # 获取资源ID
  449. resource_id = request.json.get('id')
  450. if not resource_id:
  451. return jsonify(failed("资源ID不能为空"))
  452. # 调用业务逻辑查询数据资源详情
  453. resource_data = handle_id_resource(resource_id)
  454. if not resource_data:
  455. return jsonify(failed("资源不存在"))
  456. return jsonify(success(resource_data))
  457. except Exception as e:
  458. logger.error(f"获取数据资源详情失败: {str(e)}")
  459. return jsonify(failed(str(e)))