routes.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664
  1. from io import BytesIO, StringIO
  2. import os
  3. import pandas as pd
  4. from flask import request, jsonify, send_file, current_app
  5. from app.api.data_resource import bp
  6. from app.models.result import success, failed
  7. import logging
  8. import json
  9. import re
  10. from minio import Minio
  11. from app.core.graph.graph_operations import MyEncoder
  12. from app.services.neo4j_driver import neo4j_driver
  13. from app.core.data_resource.resource import (
  14. resource_list,
  15. handle_node,
  16. resource_kinship_graph,
  17. resource_impact_all_graph,
  18. model_resource_list,
  19. select_create_ddl,
  20. data_resource_edit,
  21. handle_id_resource,
  22. id_data_search_list,
  23. table_sql,
  24. select_sql,
  25. id_resource_graph
  26. )
  27. from app.core.meta_data import (
  28. translate_and_parse,
  29. infer_column_type,
  30. text_resource_solve,
  31. get_file_content,
  32. get_formatted_time
  33. )
  34. import traceback
  35. from app.core.system.auth import require_auth
  36. logger = logging.getLogger("app")
  37. def get_minio_client():
  38. """获取 MinIO 客户端实例"""
  39. return Minio(
  40. current_app.config['MINIO_HOST'],
  41. access_key=current_app.config['MINIO_USER'],
  42. secret_key=current_app.config['MINIO_PASSWORD'],
  43. secure=current_app.config['MINIO_SECURE']
  44. )
  45. def get_minio_config():
  46. """获取 MinIO 配置"""
  47. return {
  48. 'bucket_name': current_app.config['BUCKET_NAME'],
  49. 'prefix': current_app.config['PREFIX'],
  50. 'allowed_extensions': current_app.config['ALLOWED_EXTENSIONS']
  51. }
  52. def is_english(text):
  53. """检查文本是否为英文"""
  54. return text.isascii() and bool(re.match(r'^[a-zA-Z0-9_\s.,;:!?()\'"-]+$', text))
  55. @bp.route('/translate', methods=['POST'])
  56. def data_resource_translate():
  57. # 获取表单数据
  58. data_resource = request.form.get('data_resource')
  59. meta_data = request.form.get('meta_data')
  60. file = request.files.get('file')
  61. if not data_resource or not file:
  62. return jsonify(failed("缺少必要参数:data_resource 或文件"))
  63. # 处理meta_data可能为None的情况
  64. if meta_data:
  65. try:
  66. meta_data_list = json.loads(meta_data)
  67. except json.JSONDecodeError:
  68. logger.error(f"解析meta_data失败: {meta_data}")
  69. meta_data_list = []
  70. else:
  71. logger.warning("meta_data为空,将使用空列表")
  72. meta_data_list = []
  73. # 构建翻译后的内容组合
  74. translated_meta_data_list = []
  75. for meta_item in meta_data_list:
  76. if is_english(meta_item): # 检查是否为英文
  77. translated_meta_data_list.append(meta_item) # 如果是英文,则直接添加
  78. else:
  79. translated_meta_data_list.append(translate_and_parse(meta_item)[0]) # 否则翻译后添加
  80. # 对 data_resource 进行翻译
  81. translated_data_resource = translate_and_parse(data_resource)
  82. if translated_data_resource and len(translated_data_resource) > 0:
  83. translated_data_resource = translated_data_resource[0]
  84. else:
  85. translated_data_resource = data_resource # 翻译失败时使用原值
  86. try:
  87. # 构建最终的翻译结果
  88. resource = {"name": data_resource, "en_name": translated_data_resource}
  89. parsed_data = []
  90. # 读取文件内容
  91. file_content = file.read()
  92. # 重置文件指针
  93. file.seek(0)
  94. try:
  95. df = pd.read_excel(BytesIO(file_content))
  96. except Exception as e:
  97. return jsonify(failed(f"文件格式错误: {str(e)}"))
  98. # 获取列名和对应的数据类型
  99. # 如果meta_data为空,使用DataFrame的列名
  100. if not meta_data_list and not df.empty:
  101. meta_data_list = df.columns.tolist()
  102. translated_meta_data_list = []
  103. for col in meta_data_list:
  104. if is_english(col):
  105. translated_meta_data_list.append(col)
  106. else:
  107. translated_meta_data_list.append(translate_and_parse(col)[0])
  108. columns_and_types = infer_column_type(df)
  109. for i in range(len(meta_data_list)):
  110. zh = meta_data_list[i]
  111. en = translated_meta_data_list[i]
  112. data_type = columns_and_types[i] if i < len(columns_and_types) else "varchar(255)"
  113. parsed_item = {"name": zh, "en_name": en, "data_type": data_type}
  114. parsed_data.append(parsed_item)
  115. response_data = {
  116. "head_data": parsed_data,
  117. "data_resource": resource
  118. }
  119. return jsonify(success(response_data, "success"))
  120. except Exception as e:
  121. logger.error(f"翻译处理失败: {str(e)}", exc_info=True)
  122. return jsonify(failed({}, str(e)))
  123. @bp.route('/save', methods=['POST'])
  124. def data_resource_save():
  125. """保存数据资源"""
  126. try:
  127. # 获取表单数据
  128. # 表单以 receiver 开头时使用下面的方法:
  129. # receiver = request.json.get('receiver', {})
  130. receiver = request.get_json()
  131. additional_info = receiver['additional_info']
  132. # 检查receiver是否存在
  133. if not receiver:
  134. return jsonify(failed("参数不完整:缺少receiver"))
  135. # 检查url是否存在
  136. if 'url' not in receiver:
  137. return jsonify(failed("参数不完整:缺少url"))
  138. file_extension = receiver['url'].split('.')[-1]
  139. if file_extension == 'xlsx' or file_extension == 'xls' or file_extension == 'csv':
  140. head_data = additional_info['head_data']
  141. data_resource = additional_info['data_resource']
  142. if not receiver or not data_resource:
  143. return jsonify(failed("参数不完整"))
  144. # 调用业务逻辑处理数据资源创建
  145. resource_id = handle_node(receiver, head_data, data_resource)
  146. else:
  147. return jsonify(failed("文件格式错误"))
  148. return jsonify(success({"id": resource_id}))
  149. except Exception as e:
  150. logger.error(f"保存数据资源失败: {str(e)}")
  151. return jsonify(failed(str(e)))
  152. @bp.route('/delete', methods=['POST'])
  153. def data_resource_delete():
  154. """删除数据资源"""
  155. try:
  156. # 获取资源ID
  157. resource_id = request.json.get('id')
  158. if not resource_id:
  159. return jsonify(failed("资源ID不能为空"))
  160. with neo4j_driver.get_session() as session:
  161. # 删除数据资源节点及其关系
  162. cypher = """
  163. MATCH (n:data_resource)
  164. WHERE id(n) = $resource_id
  165. DETACH DELETE n
  166. """
  167. session.run(cypher, resource_id=int(resource_id))
  168. return jsonify(success({"message": "数据资源删除成功"}))
  169. except Exception as e:
  170. logger.error(f"删除数据资源失败: {str(e)}")
  171. return jsonify(failed(str(e)))
  172. @bp.route('/update', methods=['POST'])
  173. def data_resource_update():
  174. """更新数据资源"""
  175. try:
  176. # 获取更新数据
  177. data = request.json
  178. if not data or "id" not in data:
  179. return jsonify(failed("参数不完整"))
  180. # 调用业务逻辑更新数据资源
  181. updated_data = data_resource_edit(data)
  182. return jsonify(success(updated_data))
  183. except Exception as e:
  184. logger.error(f"更新数据资源失败: {str(e)}")
  185. return jsonify(failed(str(e)))
  186. @bp.route('/ddl', methods=['POST'])
  187. def id_data_ddl():
  188. """解析数据资源的DDL"""
  189. try:
  190. # 获取SQL内容
  191. sql_content = request.json.get('sql', '')
  192. if not sql_content:
  193. return jsonify(failed("SQL内容不能为空"))
  194. # 记录原始SQL用于调试
  195. logger.debug(f"原始SQL: {sql_content}")
  196. # 提取创建表的DDL语句
  197. create_ddl_list = select_create_ddl(sql_content)
  198. if not create_ddl_list:
  199. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  200. # 解析每个表定义
  201. tables_dict = {} # 最终返回的表字典
  202. for ddl in create_ddl_list:
  203. table_info = table_sql(ddl)
  204. if table_info:
  205. # table_info格式: {"table_name": {"exist": bool, "meta": [...], "table_comment": "..."}}
  206. # 合并到结果字典中
  207. tables_dict.update(table_info)
  208. if not tables_dict:
  209. return jsonify(failed("解析表结构失败"))
  210. # 记录结果
  211. logger.debug(f"解析结果: {json.dumps(tables_dict, ensure_ascii=False)}")
  212. # 直接返回解析结果
  213. return jsonify(success(tables_dict))
  214. except Exception as e:
  215. logger.error(f"解析DDL失败: {str(e)}")
  216. logger.error(traceback.format_exc()) # 添加详细错误堆栈
  217. return jsonify(failed(str(e)))
  218. @bp.route('/list', methods=['POST'])
  219. def data_resource_list():
  220. """获取数据资源列表"""
  221. try:
  222. # 获取分页和筛选参数
  223. page = int(request.json.get('current', 1))
  224. page_size = int(request.json.get('size', 10))
  225. en_name_filter = request.json.get('en_name')
  226. name_filter = request.json.get('name')
  227. type_filter = request.json.get('type', 'all')
  228. category_filter = request.json.get('category')
  229. tag_filter = request.json.get('tag')
  230. # 调用业务逻辑查询数据资源列表
  231. resources, total_count = resource_list(
  232. page,
  233. page_size,
  234. en_name_filter,
  235. name_filter,
  236. type_filter,
  237. category_filter,
  238. tag_filter
  239. )
  240. # 返回结果
  241. return jsonify(success({
  242. "records": resources,
  243. "total": total_count,
  244. "size": page_size,
  245. "current": page
  246. }))
  247. except Exception as e:
  248. logger.error(f"获取数据资源列表失败: {str(e)}")
  249. return jsonify(failed(str(e)))
  250. @bp.route('/search', methods=['POST'])
  251. def id_data_search():
  252. """搜索数据资源关联的元数据"""
  253. try:
  254. # 获取参数
  255. resource_id = request.json.get('id')
  256. if not resource_id:
  257. return jsonify(failed("资源ID不能为空"))
  258. page = int(request.json.get('current', 1))
  259. page_size = int(request.json.get('size', 10))
  260. en_name_filter = request.json.get('en_name')
  261. name_filter = request.json.get('name')
  262. category_filter = request.json.get('category')
  263. tag_filter = request.json.get('tag')
  264. # 调用业务逻辑查询关联元数据
  265. metadata_list, total_count = id_data_search_list(
  266. resource_id,
  267. page,
  268. page_size,
  269. en_name_filter,
  270. name_filter,
  271. category_filter,
  272. tag_filter
  273. )
  274. # 返回结果
  275. return jsonify(success({
  276. "records": metadata_list,
  277. "total": total_count,
  278. "size": page_size,
  279. "current": page
  280. }))
  281. except Exception as e:
  282. logger.error(f"搜索数据资源关联的元数据失败: {str(e)}")
  283. return jsonify(failed(str(e)))
  284. def dynamic_type_conversion(value, target_type):
  285. """动态类型转换"""
  286. if value is None:
  287. return None
  288. if target_type == "int" or target_type == "INT":
  289. return int(value)
  290. elif target_type == "float" or target_type == "FLOAT" or target_type == "double" or target_type == "DOUBLE":
  291. return float(value)
  292. elif target_type == "bool" or target_type == "BOOL" or target_type == "boolean" or target_type == "BOOLEAN":
  293. if isinstance(value, str):
  294. return value.lower() in ('true', 'yes', '1', 't', 'y')
  295. return bool(value)
  296. else:
  297. return str(value)
  298. @bp.route('/graph/all', methods=['POST'])
  299. def data_resource_graph_all():
  300. """获取数据资源完整图谱"""
  301. try:
  302. # 获取参数
  303. resource_id = request.json.get('id')
  304. meta = request.json.get('meta', True)
  305. if not resource_id:
  306. return jsonify(failed("资源ID不能为空"))
  307. # 调用业务逻辑获取图谱
  308. graph_data = resource_impact_all_graph(resource_id, meta)
  309. return jsonify(success(graph_data))
  310. except Exception as e:
  311. logger.error(f"获取数据资源完整图谱失败: {str(e)}")
  312. return jsonify(failed(str(e)))
  313. @bp.route('/graph', methods=['POST'])
  314. def data_resource_list_graph():
  315. """获取数据资源亲缘关系图谱"""
  316. try:
  317. # 获取参数
  318. resource_id = request.json.get('id')
  319. meta = request.json.get('meta', True)
  320. if not resource_id:
  321. return jsonify(failed("资源ID不能为空"))
  322. # 调用业务逻辑获取图谱
  323. graph_data = resource_kinship_graph(resource_id, meta)
  324. return jsonify(success(graph_data))
  325. except Exception as e:
  326. logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
  327. return jsonify(failed(str(e)))
  328. @bp.route('/save/metadata', methods=['POST'])
  329. def id_data_save():
  330. """保存数据资源关联的元数据"""
  331. try:
  332. # 获取参数
  333. resource_id = request.json.get('id')
  334. metadata_list = request.json.get('data', [])
  335. if not resource_id:
  336. return jsonify(failed("资源ID不能为空"))
  337. if not metadata_list:
  338. return jsonify(failed("元数据列表不能为空"))
  339. # 处理元数据保存
  340. with neo4j_driver.get_session() as session:
  341. # 先删除现有关系
  342. cypher_delete = """
  343. MATCH (n:data_resource)-[r:contain]->()
  344. WHERE id(n) = $resource_id
  345. DELETE r
  346. """
  347. session.run(cypher_delete, resource_id=int(resource_id))
  348. # 添加新关系
  349. for meta in metadata_list:
  350. # 创建或获取元数据节点
  351. meta_cypher = """
  352. MERGE (m:Metadata {name: $name})
  353. ON CREATE SET m.en_name = $en_name, m.createTime = $create_time
  354. RETURN m
  355. """
  356. meta_result = session.run(
  357. meta_cypher,
  358. name=meta["name"],
  359. en_name=meta["en_name"],
  360. create_time=meta.get("createTime", get_formatted_time())
  361. )
  362. meta_node = meta_result.single()["m"]
  363. # 创建关系
  364. rel_cypher = """
  365. MATCH (n:data_resource), (m:Metadata)
  366. WHERE id(n) = $resource_id AND id(m) = $meta_id
  367. CREATE (n)-[r:contain]->(m)
  368. RETURN r
  369. """
  370. session.run(
  371. rel_cypher,
  372. resource_id=int(resource_id),
  373. meta_id=meta_node.id
  374. )
  375. return jsonify(success({"message": "元数据保存成功"}))
  376. except Exception as e:
  377. logger.error(f"保存数据资源关联的元数据失败: {str(e)}")
  378. return jsonify(failed(str(e)))
  379. @bp.route('/sql/test', methods=['POST'])
  380. def sql_test():
  381. """测试SQL查询"""
  382. try:
  383. # 获取参数
  384. sql_query = request.json.get('sql', '')
  385. if not sql_query:
  386. return jsonify(failed("SQL查询不能为空"))
  387. # 解析SQL
  388. parsed_sql = select_sql(sql_query)
  389. if not parsed_sql:
  390. return jsonify(failed("解析SQL失败"))
  391. # 返回解析结果
  392. return jsonify(success(parsed_sql))
  393. except Exception as e:
  394. logger.error(f"测试SQL查询失败: {str(e)}")
  395. return jsonify(failed(str(e)))
  396. # 废弃的识别DDL语句方法,该API 与 ddl API 功能类似,但功能简化了
  397. @bp.route('/ddl/identify', methods=['POST'])
  398. def sql_ddl_identify():
  399. """识别DDL语句"""
  400. try:
  401. # 获取参数
  402. sql_content = request.json.get('sql', '')
  403. if not sql_content:
  404. return jsonify(failed("SQL内容不能为空"))
  405. # 提取创建表的DDL语句
  406. create_ddl_list = select_create_ddl(sql_content)
  407. if not create_ddl_list:
  408. return jsonify(failed("未找到有效的CREATE TABLE语句"))
  409. return jsonify(success({"count": len(create_ddl_list)}))
  410. except Exception as e:
  411. logger.error(f"识别DDL语句失败: {str(e)}")
  412. return jsonify(failed(str(e)))
  413. @bp.route('/model/list', methods=['POST'])
  414. def resource_model_list():
  415. """获取模型资源列表"""
  416. try:
  417. # 获取分页和筛选参数
  418. page = int(request.json.get('current', 1))
  419. page_size = int(request.json.get('size', 10))
  420. name_filter = request.json.get('name')
  421. # 调用业务逻辑查询模型资源列表
  422. resources, total_count = model_resource_list(page, page_size, name_filter)
  423. # 返回结果
  424. return jsonify(success({
  425. "records": resources,
  426. "total": total_count,
  427. "size": page_size,
  428. "current": page
  429. }))
  430. except Exception as e:
  431. logger.error(f"获取模型资源列表失败: {str(e)}")
  432. return jsonify(failed(str(e)))
  433. @bp.route('/detail', methods=['POST'])
  434. def data_resource_detail():
  435. """获取数据资源详情"""
  436. try:
  437. # 获取资源ID
  438. resource_id = request.json.get('id')
  439. if not resource_id:
  440. return jsonify(failed("资源ID不能为空"))
  441. # 调用业务逻辑查询数据资源详情
  442. resource_data = handle_id_resource(resource_id)
  443. if not resource_data:
  444. return jsonify(failed("资源不存在"))
  445. return jsonify(success(resource_data))
  446. except Exception as e:
  447. logger.error(f"获取数据资源详情失败: {str(e)}")
  448. return jsonify(failed(str(e)))
  449. @bp.route('/config', methods=['GET'])
  450. @require_auth
  451. def get_resource_config():
  452. """获取数据资源配置信息"""
  453. config = get_minio_config()
  454. return jsonify({
  455. 'allowed_extensions': list(config['allowed_extensions']),
  456. 'bucket_name': config['bucket_name'],
  457. 'prefix': config['prefix']
  458. })
  459. """解析表定义SQL,支持带schema和不带schema两种格式"""
  460. try:
  461. # 支持以下格式:
  462. # 1. CREATE TABLE tablename
  463. # 2. CREATE TABLE "tablename"
  464. # 3. CREATE TABLE schema.tablename
  465. # 4. CREATE TABLE "schema"."tablename"
  466. table_name_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|([^"\s\.]+))\.)?(?:"([^"]+)"|([^"\s\(]+))'
  467. table_name_match = re.search(table_name_pattern, sql, re.IGNORECASE)
  468. if not table_name_match:
  469. return None
  470. # 获取表名,优先使用带引号的名称,如果没有则使用不带引号的
  471. schema = table_name_match.group(1) or table_name_match.group(2) # schema是可选的
  472. table_name = table_name_match.group(3) or table_name_match.group(4) # 实际表名
  473. # 提取字段定义
  474. fields_pattern = r'CREATE\s+TABLE[^(]*\(\s*(.*?)\s*\)'
  475. fields_match = re.search(fields_pattern, sql, re.DOTALL | re.IGNORECASE)
  476. if not fields_match:
  477. return None
  478. fields_text = fields_match.group(1)
  479. # 分割字段定义
  480. field_definitions = []
  481. in_parenthesis = 0
  482. current_field = ""
  483. for char in fields_text:
  484. if char == '(':
  485. in_parenthesis += 1
  486. current_field += char
  487. elif char == ')':
  488. in_parenthesis -= 1
  489. current_field += char
  490. elif char == ',' and in_parenthesis == 0:
  491. field_definitions.append(current_field.strip())
  492. current_field = ""
  493. else:
  494. current_field += char
  495. if current_field.strip():
  496. field_definitions.append(current_field.strip())
  497. # 解析每个字段
  498. fields = []
  499. primary_keys = []
  500. for field_def in field_definitions:
  501. # 忽略PRIMARY KEY等约束定义
  502. if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
  503. # 提取主键字段
  504. pk_pattern = r'PRIMARY\s+KEY\s*\(\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s*\)'
  505. pk_match = re.search(pk_pattern, field_def, re.IGNORECASE)
  506. if pk_match:
  507. pk = next((g for g in pk_match.groups() if g is not None), "")
  508. primary_keys.append(pk)
  509. continue
  510. # 解析常规字段定义
  511. field_pattern = r'^\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s+([A-Za-z0-9_]+(?:\s*\([^)]*\))?)'
  512. field_match = re.search(field_pattern, field_def)
  513. if field_match:
  514. # 提取字段名和类型
  515. field_name = next((g for g in field_match.groups()[:4] if g is not None), "")
  516. field_type = field_match.group(5)
  517. # 检查是否为主键
  518. is_primary = "PRIMARY KEY" in field_def.upper()
  519. if is_primary:
  520. primary_keys.append(field_name)
  521. # 检查是否为非空
  522. not_null = "NOT NULL" in field_def.upper()
  523. # 检查默认值
  524. default_match = re.search(r'DEFAULT\s+([^,\s]+)', field_def, re.IGNORECASE)
  525. default_value = default_match.group(1) if default_match else None
  526. # 添加字段信息
  527. field_info = {
  528. "name": field_name,
  529. "type": clean_type(field_type),
  530. "is_primary": is_primary,
  531. "not_null": not_null
  532. }
  533. if default_value:
  534. field_info["default"] = default_value
  535. fields.append(field_info)
  536. # 更新主键标记
  537. for field in fields:
  538. if field["name"] in primary_keys and not field["is_primary"]:
  539. field["is_primary"] = True
  540. # 返回结果,包含schema信息
  541. result = {
  542. "table_name": table_name,
  543. "fields": fields
  544. }
  545. # 如果有schema,添加到结果中
  546. if schema:
  547. result["schema"] = schema
  548. return result
  549. except Exception as e:
  550. logger.error(f"解析表定义SQL失败: {str(e)}")
  551. return None