routes.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. from flask import request, jsonify
  2. import logging
  3. import json
  4. from datetime import datetime
  5. from app.models.result import success, failed
  6. from app.api.data_source import bp
  7. from app.core.graph.graph_operations import create_or_get_node, execute_cypher_query
  8. from flask_sqlalchemy import SQLAlchemy
  9. from sqlalchemy import create_engine, text, URL
  10. logger = logging.getLogger(__name__)
  11. # 创建数据源时使用此api
  12. @bp.route('/save', methods=['POST'])
  13. def data_source_save():
  14. """保存数据源"""
  15. try:
  16. # 获取表单数据
  17. data = request.json
  18. logger.debug(f"保存数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
  19. # 检查必填参数
  20. required_fields = ['database', 'host', 'port', 'username', 'password', 'en_name', 'type']
  21. missing_fields = [field for field in required_fields if not data.get(field)]
  22. if missing_fields:
  23. error_msg = f"缺少必填参数: {', '.join(missing_fields)}"
  24. logger.error(error_msg)
  25. return jsonify(failed(error_msg))
  26. # 检查en_name是否已存在
  27. check_query = """
  28. MATCH (n:DataSource)
  29. WHERE n.en_name = $en_name
  30. RETURN n
  31. """
  32. result = execute_cypher_query(check_query, {'en_name': data['en_name']})
  33. # 添加创建时间
  34. data['create_dt'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  35. if result:
  36. # 如果存在,更新节点
  37. node = result[0]['n']
  38. node_id = node['_id']
  39. # 更新属性
  40. update_query = """
  41. MATCH (n:DataSource)
  42. WHERE id(n) = $node_id
  43. SET n += $properties
  44. RETURN id(n) as node_id
  45. """
  46. result = execute_cypher_query(update_query, {
  47. 'node_id': node_id,
  48. 'properties': data
  49. })
  50. message = "数据源更新成功"
  51. else:
  52. # 如果不存在,创建新节点
  53. node_id = create_or_get_node('DataSource', **data)
  54. message = "数据源创建成功"
  55. # 返回成功结果
  56. return jsonify(success({
  57. "id": node_id,
  58. "message": message
  59. }))
  60. except Exception as e:
  61. logger.error(f"保存数据源失败: {str(e)}")
  62. return jsonify(failed(str(e)))
  63. # 获取数据源列表 或根据id获取数据源信息
  64. @bp.route('/list', methods=['POST'])
  65. def data_source_list():
  66. """获取数据源列表或指定数据源信息"""
  67. try:
  68. # 获取请求参数
  69. data = request.json
  70. # 构建查询条件
  71. where_conditions = []
  72. params = {}
  73. # 如果指定了id
  74. if data and 'id' in data:
  75. where_conditions.append("id(n) = $id")
  76. params['id'] = int(data['id'])
  77. # 如果有其他属性
  78. elif data:
  79. for key, value in data.items():
  80. if value: # 只处理非空值
  81. where_conditions.append(f"n.{key} = ${key}")
  82. params[key] = value
  83. # 构建WHERE子句
  84. where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
  85. # 构建查询语句
  86. cypher = f"""
  87. MATCH (n:DataSource)
  88. {where_clause}
  89. RETURN n
  90. """
  91. # 执行查询
  92. result = execute_cypher_query(cypher, params)
  93. # 格式化结果
  94. data_sources = []
  95. for record in result:
  96. node = record['n']
  97. node['id'] = node['_id']
  98. data_sources.append(node)
  99. # 返回结果
  100. return jsonify(success({
  101. "data_source": data_sources,
  102. "total": len(data_sources)
  103. }))
  104. except Exception as e:
  105. logger.error(f"获取数据源列表失败: {str(e)}")
  106. return jsonify(failed(str(e)))
  107. @bp.route('/delete', methods=['POST'])
  108. def data_source_delete():
  109. """删除数据源"""
  110. try:
  111. # 获取请求参数
  112. data = request.json
  113. logger.debug(f"删除数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
  114. # 检查参数
  115. if not data or ('id' not in data and 'en_name' not in data):
  116. error_msg = "必须提供id或en_name参数"
  117. logger.error(error_msg)
  118. return jsonify(failed(error_msg))
  119. # 构建删除条件
  120. if 'id' in data:
  121. where_clause = "id(n) = $id"
  122. params = {'id': int(data['id'])}
  123. else:
  124. where_clause = "n.en_name = $en_name"
  125. params = {'en_name': data['en_name']}
  126. # 构建删除语句
  127. delete_query = f"""
  128. MATCH (n:DataSource)
  129. WHERE {where_clause}
  130. WITH n
  131. OPTIONAL MATCH (n)-[r]-()
  132. DELETE r, n
  133. RETURN count(n) as deleted_count
  134. """
  135. # 执行删除
  136. result = execute_cypher_query(delete_query, params)
  137. if result and result[0]['deleted_count'] > 0:
  138. return jsonify(success({
  139. "message": "数据源删除成功",
  140. "deleted_count": result[0]['deleted_count']
  141. }))
  142. else:
  143. error_msg = "未找到指定的数据源"
  144. logger.error(error_msg)
  145. return jsonify(failed(error_msg))
  146. except Exception as e:
  147. logger.error(f"删除数据源失败: {str(e)}")
  148. return jsonify(failed(str(e)))
  149. @bp.route('/parse', methods=['POST'])
  150. def data_source_connstr_parse():
  151. """解析数据源连接字符串"""
  152. try:
  153. # 获取请求参数
  154. data = request.json
  155. logger.debug(f"解析连接字符串请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
  156. # 检查参数
  157. if not data or 'conn_str' not in data:
  158. error_msg = "缺少连接字符串参数"
  159. logger.error(error_msg)
  160. return jsonify(failed(error_msg))
  161. # 创建DDLParser实例并解析连接字符串
  162. from app.core.llm.ddl_parser import DDLParser
  163. parser = DDLParser()
  164. result = parser.parse_db_conn_str(data['conn_str'])
  165. # 检查解析结果
  166. if isinstance(result, dict) and 'code' in result and result['code'] == 500:
  167. error_msg = f"解析连接字符串失败: {result.get('message', '未知错误')}"
  168. logger.error(error_msg)
  169. return jsonify(failed(error_msg))
  170. # 返回成功结果
  171. return jsonify(success(result))
  172. except Exception as e:
  173. logger.error(f"解析连接字符串失败: {str(e)}")
  174. return jsonify(failed(str(e)))
  175. @bp.route('/valid', methods=['POST'])
  176. def data_source_connstr_valid():
  177. """验证数据源连接信息"""
  178. try:
  179. # 获取请求参数
  180. data = request.json
  181. logger.debug(f"验证连接信息请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
  182. # 检查参数
  183. if not data:
  184. error_msg = "缺少连接信息参数"
  185. logger.error(error_msg)
  186. return jsonify(failed(error_msg))
  187. # 检查密码是否为空
  188. if not data.get('password'):
  189. error_msg = "密码不能为空"
  190. logger.error(error_msg)
  191. return jsonify(failed(error_msg))
  192. # 创建DDLParser实例并验证连接信息
  193. from app.core.llm.ddl_parser import DDLParser
  194. parser = DDLParser()
  195. result = parser.valid_db_conn_str(data)
  196. # 根据验证结果返回响应
  197. if result == "success":
  198. # 检查数据源是否已存在
  199. check_query = """
  200. MATCH (n:DataSource)
  201. WHERE n.en_name = $en_name
  202. RETURN n
  203. """
  204. existing_source = execute_cypher_query(check_query, {'en_name': data['en_name']})
  205. if existing_source:
  206. return jsonify(success({
  207. "message": "连接信息验证通过,但该数据源的定义已经存在,如果保存则会更新该数据源",
  208. "valid": True,
  209. "exists": True
  210. }))
  211. else:
  212. return jsonify(success({
  213. "message": "连接信息验证通过",
  214. "valid": True,
  215. "exists": False
  216. }))
  217. else:
  218. return jsonify(failed({
  219. "message": "连接信息验证失败",
  220. "valid": False,
  221. "exists": False
  222. }))
  223. except Exception as e:
  224. logger.error(f"验证连接信息失败: {str(e)}")
  225. return jsonify(failed(str(e)))
  226. @bp.route('/conntest', methods=['POST'])
  227. def data_source_conn_test():
  228. """测试数据源连接"""
  229. try:
  230. # 获取请求参数
  231. data = request.json
  232. logger.debug(f"测试连接请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
  233. # 检查必需参数
  234. required_fields = ['type', 'username', 'host', 'port', 'database', 'password']
  235. missing_fields = [field for field in required_fields if not data.get(field)]
  236. if missing_fields:
  237. error_msg = f"缺少必需参数: {', '.join(missing_fields)}"
  238. logger.error(error_msg)
  239. return jsonify(failed(error_msg))
  240. # 构建数据库URL
  241. db_url = URL.create(
  242. drivername=data['type'],
  243. username=data['username'],
  244. password=data.get('password', ''),
  245. host=data['host'],
  246. port=data['port'],
  247. database=data['database']
  248. )
  249. # 创建数据库引擎
  250. engine = create_engine(db_url, connect_args={'connect_timeout': 5})
  251. # 测试连接
  252. try:
  253. with engine.connect() as conn:
  254. # 执行一个简单的查询来测试连接
  255. conn.execute(text("SELECT 1"))
  256. return jsonify(success({
  257. "message": f"{data['type']}连接测试成功",
  258. "connected": True
  259. }))
  260. except Exception as e:
  261. return jsonify(failed({
  262. "message": f"连接测试失败: {str(e)}",
  263. "connected": False
  264. }))
  265. except Exception as e:
  266. logger.error(f"测试连接失败: {str(e)}")
  267. return jsonify(failed(str(e)))
  268. @bp.route('/graph', methods=['POST'])
  269. def data_source_graph_relationship():
  270. pass