routes.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. from flask import request, jsonify
  2. import logging
  3. import json
  4. from datetime import datetime
  5. from app.models.result import success, failed
  6. from app.api.data_source import bp
  7. from app.core.graph.graph_operations import (
  8. create_or_get_node, execute_cypher_query
  9. )
  10. from sqlalchemy import create_engine, text, URL
  11. logger = logging.getLogger(__name__)
  12. # 创建数据源时使用此api
  13. @bp.route('/save', methods=['POST'])
  14. def data_source_save():
  15. """保存数据源"""
  16. try:
  17. # 获取表单数据
  18. data = request.json
  19. log_data = json.dumps(data, ensure_ascii=False) if data else 'None'
  20. logger.debug(f"保存数据源请求数据: {log_data}")
  21. # 检查必填参数
  22. required_fields = [
  23. 'database', 'host', 'port', 'username',
  24. 'password', 'name_en', 'type'
  25. ]
  26. if not data:
  27. missing_fields = required_fields
  28. else:
  29. missing_fields = [
  30. field for field in required_fields if not data.get(field)
  31. ]
  32. if missing_fields:
  33. error_msg = f"缺少必填参数: {', '.join(missing_fields)}"
  34. logger.error(error_msg)
  35. return jsonify(failed(error_msg))
  36. # 此时 data 一定不为 None
  37. assert data is not None
  38. # 检查name_en是否已存在
  39. check_query = """
  40. MATCH (n:DataSource)
  41. WHERE n.name_en = $name_en
  42. RETURN n
  43. """
  44. result = execute_cypher_query(
  45. check_query, {'name_en': data['name_en']}
  46. )
  47. # 添加创建时间
  48. data['create_dt'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  49. if result:
  50. # 如果存在,更新节点
  51. node = result[0]['n']
  52. node_id = node['_id']
  53. # 更新属性
  54. update_query = """
  55. MATCH (n:DataSource)
  56. WHERE id(n) = $node_id
  57. SET n += $properties
  58. RETURN id(n) as node_id
  59. """
  60. result = execute_cypher_query(update_query, {
  61. 'node_id': node_id,
  62. 'properties': data
  63. })
  64. message = "数据源更新成功"
  65. else:
  66. # 如果不存在,创建新节点
  67. node_id = create_or_get_node('DataSource', **data)
  68. message = "数据源创建成功"
  69. # 返回成功结果
  70. return jsonify(success({
  71. "id": node_id,
  72. "message": message
  73. }))
  74. except Exception as e:
  75. logger.error(f"保存数据源失败: {str(e)}")
  76. return jsonify(failed(str(e)))
  77. # 获取数据源列表 或根据id获取数据源信息
  78. @bp.route('/list', methods=['POST'])
  79. def data_source_list():
  80. """获取数据源列表或指定数据源信息"""
  81. try:
  82. # 获取请求参数
  83. data = request.json
  84. # 构建查询条件
  85. where_conditions = []
  86. params = {}
  87. # 如果指定了id
  88. if data and 'id' in data:
  89. where_conditions.append("id(n) = $id")
  90. params['id'] = int(data['id'])
  91. # 如果有其他属性
  92. elif data:
  93. for key, value in data.items():
  94. if value: # 只处理非空值
  95. where_conditions.append(f"n.{key} = ${key}")
  96. params[key] = value
  97. # 构建WHERE子句
  98. if where_conditions:
  99. where_clause = " WHERE " + " AND ".join(where_conditions)
  100. else:
  101. where_clause = ""
  102. # 构建查询语句
  103. cypher = f"""
  104. MATCH (n:DataSource)
  105. {where_clause}
  106. RETURN n
  107. """
  108. # 执行查询
  109. result = execute_cypher_query(cypher, params)
  110. # 格式化结果
  111. data_sources = []
  112. for record in result:
  113. node = record['n']
  114. node['id'] = node['_id']
  115. data_sources.append(node)
  116. # 返回结果
  117. return jsonify(success({
  118. "data_source": data_sources,
  119. "total": len(data_sources)
  120. }))
  121. except Exception as e:
  122. logger.error(f"获取数据源列表失败: {str(e)}")
  123. return jsonify(failed(str(e)))
  124. @bp.route('/delete', methods=['POST'])
  125. def data_source_delete():
  126. """删除数据源"""
  127. try:
  128. # 获取请求参数
  129. data = request.json
  130. log_data = json.dumps(data, ensure_ascii=False) if data else 'None'
  131. logger.debug(f"删除数据源请求数据: {log_data}")
  132. # 检查参数
  133. if not data or ('id' not in data and 'name_en' not in data):
  134. error_msg = "必须提供id或name_en参数"
  135. logger.error(error_msg)
  136. return jsonify(failed(error_msg))
  137. # 构建删除条件
  138. if 'id' in data:
  139. where_clause = "id(n) = $id"
  140. params = {'id': int(data['id'])}
  141. else:
  142. where_clause = "n.name_en = $name_en"
  143. params = {'name_en': data['name_en']}
  144. # 构建删除语句
  145. delete_query = f"""
  146. MATCH (n:DataSource)
  147. WHERE {where_clause}
  148. WITH n
  149. OPTIONAL MATCH (n)-[r]-()
  150. DELETE r, n
  151. RETURN count(n) as deleted_count
  152. """
  153. # 执行删除
  154. result = execute_cypher_query(delete_query, params)
  155. if result and result[0]['deleted_count'] > 0:
  156. return jsonify(success({
  157. "message": "数据源删除成功",
  158. "deleted_count": result[0]['deleted_count']
  159. }))
  160. else:
  161. error_msg = "未找到指定的数据源"
  162. logger.error(error_msg)
  163. return jsonify(failed(error_msg))
  164. except Exception as e:
  165. logger.error(f"删除数据源失败: {str(e)}")
  166. return jsonify(failed(str(e)))
  167. @bp.route('/parse', methods=['POST'])
  168. def data_source_connstr_parse():
  169. """解析数据源连接字符串"""
  170. try:
  171. # 获取请求参数
  172. data = request.json
  173. log_data = json.dumps(data, ensure_ascii=False) if data else 'None'
  174. logger.debug(f"解析连接字符串请求数据: {log_data}")
  175. # 检查参数
  176. if not data or 'conn_str' not in data:
  177. error_msg = "缺少连接字符串参数"
  178. logger.error(error_msg)
  179. return jsonify(failed(error_msg))
  180. # 创建DDLParser实例并解析连接字符串
  181. from app.core.llm.ddl_parser import DDLParser
  182. parser = DDLParser()
  183. result = parser.parse_db_conn_str(data['conn_str'])
  184. # 检查解析结果
  185. is_error = (isinstance(result, dict) and
  186. 'code' in result and result['code'] == 500)
  187. if is_error:
  188. msg = result.get('message', '未知错误')
  189. error_msg = f"解析连接字符串失败: {msg}"
  190. logger.error(error_msg)
  191. return jsonify(failed(error_msg))
  192. # 返回成功结果
  193. return jsonify(success(result))
  194. except Exception as e:
  195. logger.error(f"解析连接字符串失败: {str(e)}")
  196. return jsonify(failed(str(e)))
  197. @bp.route('/valid', methods=['POST'])
  198. def data_source_connstr_valid():
  199. """验证数据源连接信息"""
  200. try:
  201. # 获取请求参数
  202. data = request.json
  203. log_data = json.dumps(data, ensure_ascii=False) if data else 'None'
  204. logger.debug(f"验证连接信息请求数据: {log_data}")
  205. # 检查参数
  206. if not data:
  207. error_msg = "缺少连接信息参数"
  208. logger.error(error_msg)
  209. return jsonify(failed(error_msg))
  210. # 检查密码是否为空
  211. if not data.get('password'):
  212. error_msg = "密码不能为空"
  213. logger.error(error_msg)
  214. return jsonify(failed(error_msg))
  215. # 创建DDLParser实例并验证连接信息
  216. from app.core.llm.ddl_parser import DDLParser
  217. parser = DDLParser()
  218. result = parser.valid_db_conn_str(data)
  219. # 根据验证结果返回响应
  220. if result == "success":
  221. # 检查数据源是否已存在
  222. check_query = """
  223. MATCH (n:DataSource)
  224. WHERE n.name_en = $name_en
  225. RETURN n
  226. """
  227. existing_source = execute_cypher_query(
  228. check_query, {'name_en': data['name_en']}
  229. )
  230. if existing_source:
  231. msg = "连接信息验证通过,但该数据源的定义已经存在,如果保存则会更新该数据源"
  232. return jsonify(success(msg))
  233. else:
  234. return jsonify(success("连接信息验证通过"))
  235. else:
  236. return jsonify(failed("连接信息验证失败"))
  237. except Exception as e:
  238. logger.error(f"验证连接信息失败: {str(e)}")
  239. return jsonify(failed(str(e)))
  240. @bp.route('/conntest', methods=['POST'])
  241. def data_source_conn_test():
  242. """测试数据源连接"""
  243. try:
  244. # 获取请求参数
  245. data = request.json
  246. log_data = json.dumps(data, ensure_ascii=False) if data else 'None'
  247. logger.debug(f"测试连接请求数据: {log_data}")
  248. # 检查必需参数
  249. required_fields = [
  250. 'type', 'username', 'host', 'port', 'database', 'password'
  251. ]
  252. if not data:
  253. missing_fields = required_fields
  254. else:
  255. missing_fields = [
  256. field for field in required_fields if not data.get(field)
  257. ]
  258. if missing_fields:
  259. error_msg = f"缺少必需参数: {', '.join(missing_fields)}"
  260. logger.error(error_msg)
  261. return jsonify(failed(error_msg))
  262. # 此时 data 一定不为 None
  263. assert data is not None
  264. # 构建数据库URL
  265. db_url = URL.create(
  266. drivername=data['type'],
  267. username=data['username'],
  268. password=data.get('password', ''),
  269. host=data['host'],
  270. port=data['port'],
  271. database=data['database']
  272. )
  273. # 创建数据库引擎
  274. engine = create_engine(db_url, connect_args={'connect_timeout': 5})
  275. # 测试连接
  276. try:
  277. with engine.connect() as conn:
  278. # 执行一个简单的查询来测试连接
  279. conn.execute(text("SELECT 1"))
  280. return jsonify(success({
  281. "message": f"{data['type']}连接测试成功",
  282. "connected": True
  283. }))
  284. except Exception as e:
  285. return jsonify(failed(f"连接测试失败: {str(e)}"))
  286. except Exception as e:
  287. logger.error(f"测试连接失败: {str(e)}")
  288. return jsonify(failed(str(e)))
  289. @bp.route('/graph', methods=['POST'])
  290. def data_source_graph_relationship():
  291. """获取数据源关系图"""
  292. # TODO: 待实现
  293. return jsonify(failed("该功能尚未实现"))