routes.py 21 KB


  1. from io import BytesIO, StringIO
  2. import os
  3. import pandas as pd
  4. from flask import request, jsonify, send_file
  5. from app.api.data_resource import bp
  6. from app.models.result import success, failed
  7. import logging
  8. import json
  9. import re
  10. from minio import Minio
  11. from app.config.config import Config
  12. from app.core.graph.graph_operations import MyEncoder
  13. from app.services.neo4j_driver import neo4j_driver
  14. from app.core.data_resource.resource import (
  15. resource_list,
  16. handle_node,
  17. resource_kinship_graph,
  18. resource_impact_all_graph,
  19. model_resource_list,
  20. select_create_ddl,
  21. data_resource_edit,
  22. handle_id_resource,
  23. id_data_search_list,
  24. table_sql,
  25. select_sql,
  26. id_resource_graph
  27. )
  28. from app.core.meta_data import (
  29. translate_and_parse,
  30. infer_column_type,
  31. text_resource_solve,
  32. get_file_content,
  33. get_formatted_time
  34. )
  35. import traceback
  36. logger = logging.getLogger("app")
  37. # 配置MinIO客户端
  38. minio_client = Minio(
  39. Config.MINIO_HOST,
  40. access_key=Config.MINIO_USER,
  41. secret_key=Config.MINIO_PASSWORD,
  42. secure=True
  43. )
  44. # 配置文件上传相关
  45. UPLOAD_FOLDER = Config.UPLOAD_FOLDER
  46. bucket_name = Config.BUCKET_NAME
  47. prefix = Config.PREFIX
  48. def is_english(text):
  49. """检查文本是否为英文"""
  50. return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text))
  51. @bp.route('/translate', methods=['POST'])
  52. def data_resource_translate():
  53. # 获取表单数据
  54. data_resource = request.form.get('data_resource')
  55. meta_data = request.form.get('meta_data')
  56. meta_data_list = json.loads(meta_data)
  57. file = request.files.get('file')
  58. if not data_resource or not meta_data or not file:
  59. return jsonify(failed("缺少必要参数"))
  60. # 构建翻译后的内容组合
  61. translated_meta_data_list = []
  62. for meta_item in meta_data_list:
  63. if is_english(meta_item): # 检查是否为英文
  64. translated_meta_data_list.append(meta_item) # 如果是英文,则直接添加
  65. else:
  66. translated_meta_data_list.append(translate_and_parse(meta_item)[0]) # 否则翻译后添加
  67. # 对 data_resource 进行翻译
  68. translated_data_resource = translate_and_parse(data_resource)
  69. if translated_data_resource and len(translated_data_resource) > 0:
  70. translated_data_resource = translated_data_resource[0]
  71. else:
  72. translated_data_resource = data_resource # 翻译失败时使用原值
  73. try:
  74. # 构建最终的翻译结果
  75. # meta_en = translated_meta_data_list
  76. resource = {"name": data_resource, "en_name": translated_data_resource}
  77. parsed_data = []
  78. # 读取文件内容
  79. file_content = file.read()
  80. # 重置文件指针
  81. file.seek(0)
  82. try:
  83. df = pd.read_excel(BytesIO(file_content))
  84. except Exception as e:
  85. return jsonify(failed(f"文件格式错误: {str(e)}"))
  86. # 获取列名和对应的数据类型
  87. columns_and_types = infer_column_type(df)
  88. for i in range(len(meta_data_list)):
  89. zh = meta_data_list[i]
  90. en = translated_meta_data_list[i]
  91. data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)"
  92. parsed_item = {"name": zh, "en_name": en, "data_type": data_type}
  93. parsed_data.append(parsed_item)
  94. response_data = {
  95. "head_data": parsed_data,
  96. "data_resource": resource
  97. }
  98. return jsonify(success(response_data, "success"))
  99. except Exception as e:
  100. return jsonify(failed({}, str(e)))
  101. @bp.route('/save', methods=['POST'])
  102. def data_resource_save():
  103. """保存数据资源"""
  104. try:
  105. # 获取表单数据
  106. # 表单以 receiver 开头时使用下面的方法:
  107. # receiver = request.json.get('receiver', {})
  108. receiver = request.get_json()
  109. additional_info = receiver['additional_info']
  110. # 检查receiver是否存在
  111. if not receiver:
  112. return jsonify(failed("参数不完整:缺少receiver"))
  113. # 检查url是否存在
  114. if 'url' not in receiver:
  115. return jsonify(failed("参数不完整:缺少url"))
  116. file_extension = receiver['url'].split('.')[-1]
  117. if file_extension == 'xlsx' or file_extension == 'xls' or file_extension == 'csv':
  118. head_data = additional_info['head_data']
  119. data_resource = additional_info['data_resource']
  120. if not receiver or not data_resource:
  121. return jsonify(failed("参数不完整"))
  122. # 调用业务逻辑处理数据资源创建
  123. resource_id = handle_node(receiver, head_data, data_resource)
  124. else:
  125. return jsonify(failed("文件格式错误"))
  126. return jsonify(success({"id": resource_id}))
  127. except Exception as e:
  128. logger.error(f"保存数据资源失败: {str(e)}")
  129. return jsonify(failed(str(e)))
  130. @bp.route('/delete', methods=['POST'])
  131. def data_resource_delete():
  132. """删除数据资源"""
  133. try:
  134. # 获取资源ID
  135. resource_id = request.json.get('id')
  136. if not resource_id:
  137. return jsonify(failed("资源ID不能为空"))
  138. with neo4j_driver.get_session() as session:
  139. # 删除数据资源节点及其关系
  140. cypher = """
  141. MATCH (n:data_resource)
  142. WHERE id(n) = $resource_id
  143. DETACH DELETE n
  144. """
  145. session.run(cypher, resource_id=int(resource_id))
  146. return jsonify(success({"message": "数据资源删除成功"}))
  147. except Exception as e:
  148. logger.error(f"删除数据资源失败: {str(e)}")
  149. return jsonify(failed(str(e)))
  150. @bp.route('/update', methods=['POST'])
  151. def data_resource_update():
  152. """更新数据资源"""
  153. try:
  154. # 获取更新数据
  155. data = request.json
  156. if not data or "id" not in data:
  157. return jsonify(failed("参数不完整"))
  158. # 调用业务逻辑更新数据资源
  159. updated_data = data_resource_edit(data)
  160. return jsonify(success(updated_data))
  161. except Exception as e:
  162. logger.error(f"更新数据资源失败: {str(e)}")
  163. return jsonify(failed(str(e)))
  164. @bp.route('/ddl', methods=['POST'])
  165. def id_data_ddl():
  166. """解析数据资源的DDL"""
  167. try:
  168. # 获取SQL内容
  169. sql_content = request.json.get('sql', '')
  170. if not sql_content:
  171. return jsonify(failed("SQL内容不能为空"))
  172. # 记录原始SQL用于调试
  173. logger.debug(f"原始SQL: {sql_content}")
  174. # 提取创建表的DDL语句
  175. create_ddl_list = select_create_ddl(sql_content)
  176. if not create_ddl_list:
  177. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  178. # 解析每个表定义
  179. tables_dict = {} # 最终返回的表字典
  180. for ddl in create_ddl_list:
  181. table_info = table_sql(ddl)
  182. if table_info:
  183. # table_info格式: {"table_name": {"exist": bool, "meta": [...], "table_comment": "..."}}
  184. # 合并到结果字典中
  185. tables_dict.update(table_info)
  186. if not tables_dict:
  187. return jsonify(failed("解析表结构失败"))
  188. # 记录结果
  189. logger.debug(f"解析结果: {json.dumps(tables_dict, ensure_ascii=False)}")
  190. # 直接返回解析结果
  191. return jsonify(success(tables_dict))
  192. except Exception as e:
  193. logger.error(f"解析DDL失败: {str(e)}")
  194. logger.error(traceback.format_exc()) # 添加详细错误堆栈
  195. return jsonify(failed(str(e)))
  196. @bp.route('/list', methods=['POST'])
  197. def data_resource_list():
  198. """获取数据资源列表"""
  199. try:
  200. # 获取分页和筛选参数
  201. page = int(request.json.get('current', 1))
  202. page_size = int(request.json.get('size', 10))
  203. en_name_filter = request.json.get('en_name')
  204. name_filter = request.json.get('name')
  205. type_filter = request.json.get('type', 'all')
  206. category_filter = request.json.get('category')
  207. tag_filter = request.json.get('tag')
  208. # 调用业务逻辑查询数据资源列表
  209. resources, total_count = resource_list(
  210. page,
  211. page_size,
  212. en_name_filter,
  213. name_filter,
  214. type_filter,
  215. category_filter,
  216. tag_filter
  217. )
  218. # 返回结果
  219. return jsonify(success({
  220. "records": resources,
  221. "total": total_count,
  222. "size": page_size,
  223. "current": page
  224. }))
  225. except Exception as e:
  226. logger.error(f"获取数据资源列表失败: {str(e)}")
  227. return jsonify(failed(str(e)))
  228. @bp.route('/search', methods=['POST'])
  229. def id_data_search():
  230. """搜索数据资源关联的元数据"""
  231. try:
  232. # 获取参数
  233. resource_id = request.json.get('id')
  234. if not resource_id:
  235. return jsonify(failed("资源ID不能为空"))
  236. page = int(request.json.get('current', 1))
  237. page_size = int(request.json.get('size', 10))
  238. en_name_filter = request.json.get('en_name')
  239. name_filter = request.json.get('name')
  240. category_filter = request.json.get('category')
  241. tag_filter = request.json.get('tag')
  242. # 调用业务逻辑查询关联元数据
  243. metadata_list, total_count = id_data_search_list(
  244. resource_id,
  245. page,
  246. page_size,
  247. en_name_filter,
  248. name_filter,
  249. category_filter,
  250. tag_filter
  251. )
  252. # 返回结果
  253. return jsonify(success({
  254. "records": metadata_list,
  255. "total": total_count,
  256. "size": page_size,
  257. "current": page
  258. }))
  259. except Exception as e:
  260. logger.error(f"搜索数据资源关联的元数据失败: {str(e)}")
  261. return jsonify(failed(str(e)))
  262. def dynamic_type_conversion(value, target_type):
  263. """动态类型转换"""
  264. if value is None:
  265. return None
  266. if target_type == "int" or target_type == "INT":
  267. return int(value)
  268. elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE":
  269. return float(value)
  270. elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN":
  271. if isinstance(value, str):
  272. return value.lower() in ('true', 'yes', '1', 't', 'y')
  273. return bool(value)
  274. else:
  275. return str(value)
  276. @bp.route('/graph/all', methods=['POST'])
  277. def data_resource_graph_all():
  278. """获取数据资源完整图谱"""
  279. try:
  280. # 获取参数
  281. resource_id = request.json.get('id')
  282. meta = request.json.get('meta', True)
  283. if not resource_id:
  284. return jsonify(failed("资源ID不能为空"))
  285. # 调用业务逻辑获取图谱
  286. graph_data = resource_impact_all_graph(resource_id, meta)
  287. return jsonify(success(graph_data))
  288. except Exception as e:
  289. logger.error(f"获取数据资源完整图谱失败: {str(e)}")
  290. return jsonify(failed(str(e)))
  291. @bp.route('/graph', methods=['POST'])
  292. def data_resource_list_graph():
  293. """获取数据资源亲缘关系图谱"""
  294. try:
  295. # 获取参数
  296. resource_id = request.json.get('id')
  297. meta = request.json.get('meta', True)
  298. if not resource_id:
  299. return jsonify(failed("资源ID不能为空"))
  300. # 调用业务逻辑获取图谱
  301. graph_data = resource_kinship_graph(resource_id, meta)
  302. return jsonify(success(graph_data))
  303. except Exception as e:
  304. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  305. return jsonify(failed(str(e)))
  306. @bp.route('/save/metadata', methods=['POST'])
  307. def id_data_save():
  308. """保存数据资源关联的元数据"""
  309. try:
  310. # 获取参数
  311. resource_id = request.json.get('id')
  312. metadata_list = request.json.get('data', [])
  313. if not resource_id:
  314. return jsonify(failed("资源ID不能为空"))
  315. if not metadata_list:
  316. return jsonify(failed("元数据列表不能为空"))
  317. # 处理元数据保存
  318. with neo4j_driver.get_session() as session:
  319. # 先删除现有关系
  320. cypher_delete = """
  321. MATCH (n:data_resource)-[r:contain]->()
  322. WHERE id(n) = $resource_id
  323. DELETE r
  324. """
  325. session.run(cypher_delete, resource_id=int(resource_id))
  326. # 添加新关系
  327. for meta in metadata_list:
  328. # 创建或获取元数据节点
  329. meta_cypher = """
  330. MERGE (m:Metadata {name: $name})
  331. ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
  332. RETURN m
  333. """
  334. meta_result = session.run(
  335. meta_cypher,
  336. name=meta["name"],
  337. en_name=meta["en_name"],
  338. create_time=meta.get("createTime", get_formatted_time())
  339. )
  340. meta_node = meta_result.single()["m"]
  341. # 创建关系
  342. rel_cypher = """
  343. MATCH (n:data_resource), (m:Metadata)
  344. WHERE id(n) = $resource_id AND id(m) = $meta_id
  345. CREATE (n)-[r:contain]->(m)
  346. RETURN r
  347. """
  348. session.run(
  349. rel_cypher,
  350. resource_id=int(resource_id),
  351. meta_id=meta_node.id
  352. )
  353. return jsonify(success({"message": "元数据保存成功"}))
  354. except Exception as e:
  355. logger.error(f"保存数据资源关联的元数据失败: {str(e)}")
  356. return jsonify(failed(str(e)))
  357. @bp.route('/sql/test', methods=['POST'])
  358. def sql_test():
  359. """测试SQL查询"""
  360. try:
  361. # 获取参数
  362. sql_query = request.json.get('sql', '')
  363. if not sql_query:
  364. return jsonify(failed("SQL查询不能为空"))
  365. # 解析SQL
  366. parsed_sql = select_sql(sql_query)
  367. if not parsed_sql:
  368. return jsonify(failed("解析SQL失败"))
  369. # 返回解析结果
  370. return jsonify(success(parsed_sql))
  371. except Exception as e:
  372. logger.error(f"测试SQL查询失败: {str(e)}")
  373. return jsonify(failed(str(e)))
  374. # 废弃的识别DDL语句方法,该API 与 ddl API 功能类似,但功能简化了
  375. @bp.route('/ddl/identify', methods=['POST'])
  376. def sql_ddl_identify():
  377. """识别DDL语句"""
  378. try:
  379. # 获取参数
  380. sql_content = request.json.get('sql', '')
  381. if not sql_content:
  382. return jsonify(failed("SQL内容不能为空"))
  383. # 提取创建表的DDL语句
  384. create_ddl_list = select_create_ddl(sql_content)
  385. if not create_ddl_list:
  386. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  387. return jsonify(success({"count": len(create_ddl_list)}))
  388. except Exception as e:
  389. logger.error(f"识别DDL语句失败: {str(e)}")
  390. return jsonify(failed(str(e)))
  391. @bp.route('/model/list', methods=['POST'])
  392. def resource_model_list():
  393. """获取模型资源列表"""
  394. try:
  395. # 获取分页和筛选参数
  396. page = int(request.json.get('current', 1))
  397. page_size = int(request.json.get('size', 10))
  398. name_filter = request.json.get('name')
  399. # 调用业务逻辑查询模型资源列表
  400. resources, total_count = model_resource_list(page, page_size, name_filter)
  401. # 返回结果
  402. return jsonify(success({
  403. "records": resources,
  404. "total": total_count,
  405. "size": page_size,
  406. "current": page
  407. }))
  408. except Exception as e:
  409. logger.error(f"获取模型资源列表失败: {str(e)}")
  410. return jsonify(failed(str(e)))
  411. @bp.route('/detail', methods=['POST'])
  412. def data_resource_detail():
  413. """获取数据资源详情"""
  414. try:
  415. # 获取资源ID
  416. resource_id = request.json.get('id')
  417. if not resource_id:
  418. return jsonify(failed("资源ID不能为空"))
  419. # 调用业务逻辑查询数据资源详情
  420. resource_data = handle_id_resource(resource_id)
  421. if not resource_data:
  422. return jsonify(failed("资源不存在"))
  423. return jsonify(success(resource_data))
  424. except Exception as e:
  425. logger.error(f"获取数据资源详情失败: {str(e)}")
  426. return jsonify(failed(str(e)))
  427. """解析表定义SQL,支持带schema和不带schema两种格式"""
  428. try:
  429. # 支持以下格式:
  430. # 1. CREATE TABLE tablename
  431. # 2. CREATE TABLE "tablename"
  432. # 3. CREATE TABLE schema.tablename
  433. # 4. CREATE TABLE "schema"."tablename"
  434. table_name_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|([^"\s\.]+))\.)?(?:"([^"]+)"|([^"\s\(]+))'
  435. table_name_match = re.search(table_name_pattern, sql, re.IGNORECASE)
  436. if not table_name_match:
  437. return None
  438. # 获取表名,优先使用带引号的名称,如果没有则使用不带引号的
  439. schema = table_name_match.group(1) or table_name_match.group(2) # schema是可选的
  440. table_name = table_name_match.group(3) or table_name_match.group(4) # 实际表名
  441. # 提取字段定义
  442. fields_pattern = r'CREATE\s+TABLE[^(]*\(\s*(.*?)\s*\)'
  443. fields_match = re.search(fields_pattern, sql, re.DOTALL | re.IGNORECASE)
  444. if not fields_match:
  445. return None
  446. fields_text = fields_match.group(1)
  447. # 分割字段定义
  448. field_definitions = []
  449. in_parenthesis = 0
  450. current_field = ""
  451. for char in fields_text:
  452. if char == '(':
  453. in_parenthesis += 1
  454. current_field += char
  455. elif char == ')':
  456. in_parenthesis -= 1
  457. current_field += char
  458. elif char == ',' and in_parenthesis == 0:
  459. field_definitions.append(current_field.strip())
  460. current_field = ""
  461. else:
  462. current_field += char
  463. if current_field.strip():
  464. field_definitions.append(current_field.strip())
  465. # 解析每个字段
  466. fields = []
  467. primary_keys = []
  468. for field_def in field_definitions:
  469. # 忽略PRIMARY KEY等约束定义
  470. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  471. # 提取主键字段
  472. pk_pattern = r'PRIMARY\s+KEY\s*\(\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s*\)'
  473. pk_match = re.search(pk_pattern, field_def, re.IGNORECASE)
  474. if pk_match:
  475. pk = next((g for g in pk_match.groups() if g is not None), "")
  476. primary_keys.append(pk)
  477. continue
  478. # 解析常规字段定义
  479. field_pattern = r'^\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s+([A-Za-z0-9_]+(?:\s*\([^)]*\))?)'
  480. field_match = re.search(field_pattern, field_def)
  481. if field_match:
  482. # 提取字段名和类型
  483. field_name = next((g for g in field_match.groups()[:4] if g is not None), "")
  484. field_type = field_match.group(5)
  485. # 检查是否为主键
  486. is_primary = "PRIMARY KEY" in field_def.upper()
  487. if is_primary:
  488. primary_keys.append(field_name)
  489. # 检查是否为非空
  490. not_null = "NOT NULL" in field_def.upper()
  491. # 检查默认值
  492. default_match = re.search(r'DEFAULT\s+([^,\s]+)', field_def, re.IGNORECASE)
  493. default_value = default_match.group(1) if default_match else None
  494. # 添加字段信息
  495. field_info = {
  496. "name": field_name,
  497. "type": clean_type(field_type),
  498. "is_primary": is_primary,
  499. "not_null": not_null
  500. }
  501. if default_value:
  502. field_info["default"] = default_value
  503. fields.append(field_info)
  504. # 更新主键标记
  505. for field in fields:
  506. if field["name"] in primary_keys and not field["is_primary"]:
  507. field["is_primary"] = True
  508. # 返回结果,包含schema信息
  509. result = {
  510. "table_name": table_name,
  511. "fields": fields
  512. }
  513. # 如果有schema,添加到结果中
  514. if schema:
  515. result["schema"] = schema
  516. return result
  517. except Exception as e:
  518. logger.error(f"解析表定义SQL失败: {str(e)}")
  519. return None