|
- from flask import request, jsonify
- import logging
- import json
- from datetime import datetime
- from app.models.result import success, failed
- from app.api.data_source import bp
- from app.core.graph.graph_operations import create_or_get_node, execute_cypher_query
- from flask_sqlalchemy import SQLAlchemy
- from sqlalchemy import create_engine, text, URL
- logger = logging.getLogger(__name__)
- # 创建数据源时使用此api
- @bp.route('/save', methods=['POST'])
- def data_source_save():
- """保存数据源"""
- try:
- # 获取表单数据
- data = request.json
- logger.debug(f"保存数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
- # 检查必填参数
- required_fields = ['database', 'host', 'port', 'username', 'password', 'en_name', 'type']
- missing_fields = [field for field in required_fields if not data.get(field)]
-
- if missing_fields:
- error_msg = f"缺少必填参数: {', '.join(missing_fields)}"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
- # 检查en_name是否已存在
- check_query = """
- MATCH (n:DataSource)
- WHERE n.en_name = $en_name
- RETURN n
- """
- result = execute_cypher_query(check_query, {'en_name': data['en_name']})
-
- # 添加创建时间
- data['create_dt'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- if result:
- # 如果存在,更新节点
- node = result[0]['n']
- node_id = node['_id']
- # 更新属性
- update_query = """
- MATCH (n:DataSource)
- WHERE id(n) = $node_id
- SET n += $properties
- RETURN id(n) as node_id
- """
- result = execute_cypher_query(update_query, {
- 'node_id': node_id,
- 'properties': data
- })
- message = "数据源更新成功"
- else:
- # 如果不存在,创建新节点
- node_id = create_or_get_node('DataSource', **data)
- message = "数据源创建成功"
-
- # 返回成功结果
- return jsonify(success({
- "id": node_id,
- "message": message
- }))
- except Exception as e:
- logger.error(f"保存数据源失败: {str(e)}")
- return jsonify(failed(str(e)))
-
-
- # 获取数据源列表 或根据id获取数据源信息
- @bp.route('/list', methods=['POST'])
- def data_source_list():
- """获取数据源列表或指定数据源信息"""
- try:
- # 获取请求参数
- data = request.json
-
- # 构建查询条件
- where_conditions = []
- params = {}
-
- # 如果指定了id
- if data and 'id' in data:
- where_conditions.append("id(n) = $id")
- params['id'] = int(data['id'])
- # 如果有其他属性
- elif data:
- for key, value in data.items():
- if value: # 只处理非空值
- where_conditions.append(f"n.{key} = ${key}")
- params[key] = value
-
- # 构建WHERE子句
- where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
-
- # 构建查询语句
- cypher = f"""
- MATCH (n:DataSource)
- {where_clause}
- RETURN n
- """
-
- # 执行查询
- result = execute_cypher_query(cypher, params)
-
- # 格式化结果
- data_sources = []
- for record in result:
- node = record['n']
- node['id'] = node['_id']
- data_sources.append(node)
-
- # 返回结果
- return jsonify(success({
- "data_source": data_sources,
- "total": len(data_sources)
- }))
-
- except Exception as e:
- logger.error(f"获取数据源列表失败: {str(e)}")
- return jsonify(failed(str(e)))
-
- @bp.route('/delete', methods=['POST'])
- def data_source_delete():
- """删除数据源"""
- try:
- # 获取请求参数
- data = request.json
- logger.debug(f"删除数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
- # 检查参数
- if not data or ('id' not in data and 'en_name' not in data):
- error_msg = "必须提供id或en_name参数"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
- # 构建删除条件
- if 'id' in data:
- where_clause = "id(n) = $id"
- params = {'id': int(data['id'])}
- else:
- where_clause = "n.en_name = $en_name"
- params = {'en_name': data['en_name']}
- # 构建删除语句
- delete_query = f"""
- MATCH (n:DataSource)
- WHERE {where_clause}
- WITH n
- OPTIONAL MATCH (n)-[r]-()
- DELETE r, n
- RETURN count(n) as deleted_count
- """
-
- # 执行删除
- result = execute_cypher_query(delete_query, params)
-
- if result and result[0]['deleted_count'] > 0:
- return jsonify(success({
- "message": "数据源删除成功",
- "deleted_count": result[0]['deleted_count']
- }))
- else:
- error_msg = "未找到指定的数据源"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
-
- except Exception as e:
- logger.error(f"删除数据源失败: {str(e)}")
- return jsonify(failed(str(e)))
-
- @bp.route('/parse', methods=['POST'])
- def data_source_connstr_parse():
- """解析数据源连接字符串"""
- try:
- # 获取请求参数
- data = request.json
- logger.debug(f"解析连接字符串请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
- # 检查参数
- if not data or 'conn_str' not in data:
- error_msg = "缺少连接字符串参数"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
- # 创建DDLParser实例并解析连接字符串
- from app.core.llm.ddl_parser import DDLParser
- parser = DDLParser()
- result = parser.parse_db_conn_str(data['conn_str'])
- # 检查解析结果
- if isinstance(result, dict) and 'code' in result and result['code'] == 500:
- error_msg = f"解析连接字符串失败: {result.get('message', '未知错误')}"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
- # 返回成功结果
- return jsonify(success(result))
- except Exception as e:
- logger.error(f"解析连接字符串失败: {str(e)}")
- return jsonify(failed(str(e)))
-
- @bp.route('/valid', methods=['POST'])
- def data_source_connstr_valid():
- """验证数据源连接信息"""
- try:
- # 获取请求参数
- data = request.json
- logger.debug(f"验证连接信息请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
- # 检查参数
- if not data:
- error_msg = "缺少连接信息参数"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
- # 检查密码是否为空
- if not data.get('password'):
- error_msg = "密码不能为空"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
- # 创建DDLParser实例并验证连接信息
- from app.core.llm.ddl_parser import DDLParser
- parser = DDLParser()
- result = parser.valid_db_conn_str(data)
- # 根据验证结果返回响应
- if result == "success":
- # 检查数据源是否已存在
- check_query = """
- MATCH (n:DataSource)
- WHERE n.en_name = $en_name
- RETURN n
- """
- existing_source = execute_cypher_query(check_query, {'en_name': data['en_name']})
-
- if existing_source:
- return jsonify(success("连接信息验证通过,但该数据源的定义已经存在,如果保存则会更新该数据源"))
- else:
- return jsonify(success("连接信息验证通过"))
- else:
- return jsonify(failed("连接信息验证失败"))
- except Exception as e:
- logger.error(f"验证连接信息失败: {str(e)}")
- return jsonify(failed(str(e)))
-
- @bp.route('/conntest', methods=['POST'])
- def data_source_conn_test():
- """测试数据源连接"""
- try:
- # 获取请求参数
- data = request.json
- logger.debug(f"测试连接请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
- # 检查必需参数
- required_fields = ['type', 'username', 'host', 'port', 'database', 'password']
- missing_fields = [field for field in required_fields if not data.get(field)]
-
- if missing_fields:
- error_msg = f"缺少必需参数: {', '.join(missing_fields)}"
- logger.error(error_msg)
- return jsonify(failed(error_msg))
- # 构建数据库URL
- db_url = URL.create(
- drivername=data['type'],
- username=data['username'],
- password=data.get('password', ''),
- host=data['host'],
- port=data['port'],
- database=data['database']
- )
- # 创建数据库引擎
- engine = create_engine(db_url, connect_args={'connect_timeout': 5})
-
- # 测试连接
- try:
- with engine.connect() as conn:
- # 执行一个简单的查询来测试连接
- conn.execute(text("SELECT 1"))
- return jsonify(success({
- "message": f"{data['type']}连接测试成功",
- "connected": True
- }))
- except Exception as e:
- return jsonify(failed({
- "message": f"连接测试失败: {str(e)}",
- "connected": False
- }))
- except Exception as e:
- logger.error(f"测试连接失败: {str(e)}")
- return jsonify(failed(str(e)))
- @bp.route('/graph', methods=['POST'])
- def data_source_graph_relationship():
- pass
|