from flask import request, jsonify import logging import json from datetime import datetime from app.models.result import success, failed from app.api.data_source import bp from app.core.graph.graph_operations import create_or_get_node, execute_cypher_query from flask_sqlalchemy import SQLAlchemy from sqlalchemy import create_engine, text, URL logger = logging.getLogger(__name__) # 创建数据源时使用此api @bp.route('/save', methods=['POST']) def data_source_save(): """保存数据源""" try: # 获取表单数据 data = request.json logger.debug(f"保存数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}") # 检查必填参数 required_fields = ['database', 'host', 'port', 'username', 'password', 'en_name', 'type'] missing_fields = [field for field in required_fields if not data.get(field)] if missing_fields: error_msg = f"缺少必填参数: {', '.join(missing_fields)}" logger.error(error_msg) return jsonify(failed(error_msg)) # 检查en_name是否已存在 check_query = """ MATCH (n:DataSource) WHERE n.en_name = $en_name RETURN n """ result = execute_cypher_query(check_query, {'en_name': data['en_name']}) # 添加创建时间 data['create_dt'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') if result: # 如果存在,更新节点 node = result[0]['n'] node_id = node['_id'] # 更新属性 update_query = """ MATCH (n:DataSource) WHERE id(n) = $node_id SET n += $properties RETURN id(n) as node_id """ result = execute_cypher_query(update_query, { 'node_id': node_id, 'properties': data }) message = "数据源更新成功" else: # 如果不存在,创建新节点 node_id = create_or_get_node('DataSource', **data) message = "数据源创建成功" # 返回成功结果 return jsonify(success({ "id": node_id, "message": message })) except Exception as e: logger.error(f"保存数据源失败: {str(e)}") return jsonify(failed(str(e))) # 获取数据源列表 或根据id获取数据源信息 @bp.route('/list', methods=['POST']) def data_source_list(): """获取数据源列表或指定数据源信息""" try: # 获取请求参数 data = request.json # 构建查询条件 where_conditions = [] params = {} # 如果指定了id if data and 'id' in data: where_conditions.append("id(n) = $id") params['id'] = int(data['id']) # 如果有其他属性 elif data: for key, value in data.items(): if value: # 只处理非空值 where_conditions.append(f"n.{key} = ${key}") params[key] = value # 构建WHERE子句 where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else "" # 构建查询语句 cypher = f""" MATCH (n:DataSource) {where_clause} RETURN n """ # 执行查询 result = execute_cypher_query(cypher, params) # 格式化结果 data_sources = [] for record in result: node = record['n'] node['id'] = node['_id'] data_sources.append(node) # 返回结果 return jsonify(success({ "data_source": data_sources, "total": len(data_sources) })) except Exception as e: logger.error(f"获取数据源列表失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/delete', methods=['POST']) def data_source_delete(): """删除数据源""" try: # 获取请求参数 data = request.json logger.debug(f"删除数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}") # 检查参数 if not data or ('id' not in data and 'en_name' not in data): error_msg = "必须提供id或en_name参数" logger.error(error_msg) return jsonify(failed(error_msg)) # 构建删除条件 if 'id' in data: where_clause = "id(n) = $id" params = {'id': int(data['id'])} else: where_clause = "n.en_name = $en_name" params = {'en_name': data['en_name']} # 构建删除语句 delete_query = f""" MATCH (n:DataSource) WHERE {where_clause} WITH n OPTIONAL MATCH (n)-[r]-() DELETE r, n RETURN count(n) as deleted_count """ # 执行删除 result = execute_cypher_query(delete_query, params) if result and result[0]['deleted_count'] > 0: return jsonify(success({ "message": "数据源删除成功", "deleted_count": result[0]['deleted_count'] })) else: error_msg = "未找到指定的数据源" logger.error(error_msg) return jsonify(failed(error_msg)) except Exception as e: logger.error(f"删除数据源失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/parse', methods=['POST']) def data_source_connstr_parse(): """解析数据源连接字符串""" try: # 获取请求参数 data = request.json logger.debug(f"解析连接字符串请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}") # 检查参数 if not data or 'conn_str' not in data: error_msg = "缺少连接字符串参数" logger.error(error_msg) return jsonify(failed(error_msg)) # 创建DDLParser实例并解析连接字符串 from app.core.llm.ddl_parser import DDLParser parser = DDLParser() result = parser.parse_db_conn_str(data['conn_str']) # 检查解析结果 if isinstance(result, dict) and 'code' in result and result['code'] == 500: error_msg = f"解析连接字符串失败: {result.get('message', '未知错误')}" logger.error(error_msg) return jsonify(failed(error_msg)) # 返回成功结果 return jsonify(success(result)) except Exception as e: logger.error(f"解析连接字符串失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/valid', methods=['POST']) def data_source_connstr_valid(): """验证数据源连接信息""" try: # 获取请求参数 data = request.json logger.debug(f"验证连接信息请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}") # 检查参数 if not data: error_msg = "缺少连接信息参数" logger.error(error_msg) return jsonify(failed(error_msg)) # 检查密码是否为空 if not data.get('password'): error_msg = "密码不能为空" logger.error(error_msg) return jsonify(failed(error_msg)) # 创建DDLParser实例并验证连接信息 from app.core.llm.ddl_parser import DDLParser parser = DDLParser() result = parser.valid_db_conn_str(data) # 根据验证结果返回响应 if result == "success": # 检查数据源是否已存在 check_query = """ MATCH (n:DataSource) WHERE n.en_name = $en_name RETURN n """ existing_source = execute_cypher_query(check_query, {'en_name': data['en_name']}) if existing_source: return jsonify(success({ "message": "连接信息验证通过,但该数据源的定义已经存在,如果保存则会更新该数据源", "valid": True, "exists": True })) else: return jsonify(success({ "message": "连接信息验证通过", "valid": True, "exists": False })) else: return jsonify(failed({ "message": "连接信息验证失败", "valid": False, "exists": False })) except Exception as e: logger.error(f"验证连接信息失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/conntest', methods=['POST']) def data_source_conn_test(): """测试数据源连接""" try: # 获取请求参数 data = request.json logger.debug(f"测试连接请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}") # 检查必需参数 required_fields = ['type', 'username', 'host', 'port', 'database', 'password'] missing_fields = [field for field in required_fields if not data.get(field)] if missing_fields: error_msg = f"缺少必需参数: {', '.join(missing_fields)}" logger.error(error_msg) return jsonify(failed(error_msg)) # 构建数据库URL db_url = URL.create( drivername=data['type'], username=data['username'], password=data.get('password', ''), host=data['host'], port=data['port'], database=data['database'] ) # 创建数据库引擎 engine = create_engine(db_url, connect_args={'connect_timeout': 5}) # 测试连接 try: with engine.connect() as conn: # 执行一个简单的查询来测试连接 conn.execute(text("SELECT 1")) return jsonify(success({ "message": f"{data['type']}连接测试成功", "connected": True })) except Exception as e: return jsonify(failed({ "message": f"连接测试失败: {str(e)}", "connected": False })) except Exception as e: logger.error(f"测试连接失败: {str(e)}") return jsonify(failed(str(e))) @bp.route('/graph', methods=['POST']) def data_source_graph_relationship(): pass