فهرست منبع

已经完成了data source api的开发,准备修改ddl导入

wangxq 1 ماه پیش
والد
کامیت
1e5d1a886c
5فایلهای تغییر یافته به همراه597 افزوده شده و 3 حذف شده
  1. 2 1
      app/__init__.py
  2. 5 0
      app/api/data_source/__init__.py
  3. 321 0
      app/api/data_source/routes.py
  4. 181 2
      app/core/llm/ddl_parser.py
  5. 88 0
      migrate_meta_data_type.py

+ 2 - 1
app/__init__.py

@@ -27,6 +27,7 @@ def create_app():
     from app.api.production_line import bp as production_line_bp
     from app.api.graph import bp as graph_bp
     from app.api.system import bp as system_bp
+    from app.api.data_source import bp as data_source_bp
     
     app.register_blueprint(meta_bp, url_prefix='/api/meta')
     app.register_blueprint(resource_bp, url_prefix='/api/resource')
@@ -36,7 +37,7 @@ def create_app():
     app.register_blueprint(production_line_bp, url_prefix='/api/pipeline')
     app.register_blueprint(graph_bp, url_prefix='/api/graph')
     app.register_blueprint(system_bp, url_prefix='/api/system')
-    
+    app.register_blueprint(data_source_bp, url_prefix='/api/datasource')
     # Configure logging
     configure_logging(app)
     

+ 5 - 0
app/api/data_source/__init__.py

@@ -0,0 +1,5 @@
+from flask import Blueprint
+
+bp = Blueprint('data_source', __name__)
+
+from app.api.data_source import routes 

+ 321 - 0
app/api/data_source/routes.py

@@ -0,0 +1,321 @@
+from flask import request, jsonify
+import logging
+import json
+from datetime import datetime
+from app.models.result import success, failed
+from app.api.data_source import bp
+from app.core.graph.graph_operations import create_or_get_node, execute_cypher_query
+from flask_sqlalchemy import SQLAlchemy
+from sqlalchemy import create_engine, text, URL
+logger = logging.getLogger(__name__)
+
+# 创建数据源时使用此api
+@bp.route('/save', methods=['POST'])
+def data_source_save():
+    """保存数据源"""
+    try:
+        # 获取表单数据
+        data = request.json
+        logger.debug(f"保存数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查必填参数
+        required_fields = ['database', 'host', 'port', 'username', 'password', 'en_name', 'type']
+        missing_fields = [field for field in required_fields if not data.get(field)]
+        
+        if missing_fields:
+            error_msg = f"缺少必填参数: {', '.join(missing_fields)}"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 检查en_name是否已存在
+        check_query = """
+        MATCH (n:DataSource)
+        WHERE n.en_name = $en_name
+        RETURN n
+        """
+        result = execute_cypher_query(check_query, {'en_name': data['en_name']})
+        
+        # 添加创建时间
+        data['create_dt'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+        
+        if result:
+            # 如果存在,更新节点
+            node = result[0]['n']
+            node_id = node['_id']
+            # 更新属性
+            update_query = """
+            MATCH (n:DataSource)
+            WHERE id(n) = $node_id
+            SET n += $properties
+            RETURN id(n) as node_id
+            """
+            result = execute_cypher_query(update_query, {
+                'node_id': node_id,
+                'properties': data
+            })
+            message = "数据源更新成功"
+        else:
+            # 如果不存在,创建新节点
+            node_id = create_or_get_node('DataSource', **data)
+            message = "数据源创建成功"
+        
+        # 返回成功结果
+        return jsonify(success({
+            "id": node_id,
+            "message": message
+        }))
+    except Exception as e:
+        logger.error(f"保存数据源失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+    
+# 获取数据源列表 或根据id获取数据源信息
+@bp.route('/list', methods=['POST'])
+def data_source_list():
+    """获取数据源列表或指定数据源信息"""
+    try:
+        # 获取请求参数
+        data = request.json
+        
+        # 构建查询条件
+        where_conditions = []
+        params = {}
+        
+        # 如果指定了id
+        if data and 'id' in data:
+            where_conditions.append("id(n) = $id")
+            params['id'] = int(data['id'])
+        # 如果有其他属性
+        elif data:
+            for key, value in data.items():
+                if value:  # 只处理非空值
+                    where_conditions.append(f"n.{key} = ${key}")
+                    params[key] = value
+        
+        # 构建WHERE子句
+        where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
+        
+        # 构建查询语句
+        cypher = f"""
+        MATCH (n:DataSource)
+        {where_clause}
+        RETURN n
+        """
+        
+        # 执行查询
+        result = execute_cypher_query(cypher, params)
+        
+        # 格式化结果
+        data_sources = []
+        for record in result:
+            node = record['n']
+            node['id'] = node['_id']
+            data_sources.append(node)
+        
+        # 返回结果
+        return jsonify(success({
+            "data_source": data_sources,
+            "total": len(data_sources)
+        }))
+            
+    except Exception as e:
+        logger.error(f"获取数据源列表失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+
+@bp.route('/delete', methods=['POST'])
+def data_source_delete():
+    """删除数据源"""
+    try:
+        # 获取请求参数
+        data = request.json
+        logger.debug(f"删除数据源请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查参数
+        if not data or ('id' not in data and 'en_name' not in data):
+            error_msg = "必须提供id或en_name参数"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 构建删除条件
+        if 'id' in data:
+            where_clause = "id(n) = $id"
+            params = {'id': int(data['id'])}
+        else:
+            where_clause = "n.en_name = $en_name"
+            params = {'en_name': data['en_name']}
+
+        # 构建删除语句
+        delete_query = f"""
+        MATCH (n:DataSource)
+        WHERE {where_clause}
+        WITH n
+        OPTIONAL MATCH (n)-[r]-()
+        DELETE r, n
+        RETURN count(n) as deleted_count
+        """
+        
+        # 执行删除
+        result = execute_cypher_query(delete_query, params)
+        
+        if result and result[0]['deleted_count'] > 0:
+            return jsonify(success({
+                "message": "数据源删除成功",
+                "deleted_count": result[0]['deleted_count']
+            }))
+        else:
+            error_msg = "未找到指定的数据源"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+            
+    except Exception as e:
+        logger.error(f"删除数据源失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+
+
+@bp.route('/parse', methods=['POST'])
+def data_source_connstr_parse():
+    """解析数据源连接字符串"""
+    try:
+        # 获取请求参数
+        data = request.json
+        logger.debug(f"解析连接字符串请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查参数
+        if not data or 'conn_str' not in data:
+            error_msg = "缺少连接字符串参数"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 创建DDLParser实例并解析连接字符串
+        from app.core.llm.ddl_parser import DDLParser
+        parser = DDLParser()
+        result = parser.parse_db_conn_str(data['conn_str'])
+
+        # 检查解析结果
+        if isinstance(result, dict) and 'code' in result and result['code'] == 500:
+            error_msg = f"解析连接字符串失败: {result.get('message', '未知错误')}"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 返回成功结果
+        return jsonify(success(result))
+
+    except Exception as e:
+        logger.error(f"解析连接字符串失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+
+@bp.route('/valid', methods=['POST'])
+def data_source_connstr_valid():
+    """验证数据源连接信息"""
+    try:
+        # 获取请求参数
+        data = request.json
+        logger.debug(f"验证连接信息请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查参数
+        if not data:
+            error_msg = "缺少连接信息参数"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 检查密码是否为空
+        if not data.get('password'):
+            error_msg = "密码不能为空"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 创建DDLParser实例并验证连接信息
+        from app.core.llm.ddl_parser import DDLParser
+        parser = DDLParser()
+        result = parser.valid_db_conn_str(data)
+
+        # 根据验证结果返回响应
+        if result == "success":
+            # 检查数据源是否已存在
+            check_query = """
+            MATCH (n:DataSource)
+            WHERE n.en_name = $en_name
+            RETURN n
+            """
+            existing_source = execute_cypher_query(check_query, {'en_name': data['en_name']})
+            
+            if existing_source:
+                return jsonify(success({
+                    "message": "连接信息验证通过,但该数据源的定义已经存在,如果保存则会更新该数据源",
+                    "valid": True,
+                    "exists": True
+                }))
+            else:
+                return jsonify(success({
+                    "message": "连接信息验证通过",
+                    "valid": True,
+                    "exists": False
+                }))
+        else:
+            return jsonify(failed({
+                "message": "连接信息验证失败",
+                "valid": False,
+                "exists": False
+            }))
+
+    except Exception as e:
+        logger.error(f"验证连接信息失败: {str(e)}")
+        return jsonify(failed(str(e)))
+    
+
+@bp.route('/conntest', methods=['POST'])
+def data_source_conn_test():
+    """测试数据源连接"""
+    try:
+        # 获取请求参数
+        data = request.json
+        logger.debug(f"测试连接请求数据: {json.dumps(data, ensure_ascii=False) if data else 'None'}")
+
+        # 检查必需参数
+        required_fields = ['type', 'username', 'host', 'port', 'database', 'password']
+        missing_fields = [field for field in required_fields if not data.get(field)]
+        
+        if missing_fields:
+            error_msg = f"缺少必需参数: {', '.join(missing_fields)}"
+            logger.error(error_msg)
+            return jsonify(failed(error_msg))
+
+        # 构建数据库URL
+        db_url = URL.create(
+            drivername=data['type'],
+            username=data['username'],
+            password=data.get('password', ''),
+            host=data['host'],
+            port=data['port'],
+            database=data['database']
+        )
+
+        # 创建数据库引擎
+        engine = create_engine(db_url, connect_args={'connect_timeout': 5})
+        
+        # 测试连接
+        try:
+            with engine.connect() as conn:
+                # 执行一个简单的查询来测试连接
+                conn.execute(text("SELECT 1"))
+                return jsonify(success({
+                    "message": f"{data['type']}连接测试成功",
+                    "connected": True
+                }))
+        except Exception as e:
+            return jsonify(failed({
+                "message": f"连接测试失败: {str(e)}",
+                "connected": False
+            }))
+
+    except Exception as e:
+        logger.error(f"测试连接失败: {str(e)}")
+        return jsonify(failed(str(e)))
+
+
+@bp.route('/graph', methods=['POST'])
+def data_source_graph_relationship():
+    pass

+ 181 - 2
app/core/llm/ddl_parser.py

@@ -2,8 +2,11 @@ import os
 import requests
 import re
 import json
+import logging
 from flask import current_app
 
+logger = logging.getLogger(__name__)
+
 class DDLParser:
     def __init__(self, api_key=None):
         """
@@ -34,7 +37,7 @@ class DDLParser:
         返回:
             解析结果的JSON对象
         """
-        prompt = self._optimize_prompt()
+        prompt = self._optimize_ddl_prompt()
         payload = {
             "model": self.model_name,
             "messages": [
@@ -91,7 +94,76 @@ class DDLParser:
                 "message": f"API请求失败: {str(e)}"
             }
 
-    def _optimize_prompt(self):
+
+    def parse_db_conn_str(self, conn_str):
+        """
+        解析数据库连接字符串
+        
+        参数:
+            conn_str: 要解析的数据库连接字符串
+            
+        返回:
+            解析结果的JSON对象
+        """
+        prompt = self._optimize_connstr_parse_prompt()
+        payload = {
+            "model": self.model_name,
+            "messages": [
+                {
+                    "role": "system",
+                    "content": "你是一个专业的数据库连接字符串解析专家,擅长解析各种数据库的连接字符串并提取关键信息。"
+                },
+                {
+                    "role": "user", 
+                    "content": f"{prompt}\n\n{conn_str}"
+                }
+            ]
+        }
+        
+        try:
+            response = requests.post(
+                f"{self.base_url}/chat/completions",
+                headers=self.headers,
+                json=payload,
+                timeout=30
+            )
+            response.raise_for_status()
+            
+            result = response.json()
+            
+            if "choices" in result and len(result["choices"]) > 0:
+                content = result["choices"][0]["message"]["content"]
+                
+                try:
+                    json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
+                    if json_match:
+                        json_content = json_match.group(1)
+                    else:
+                        json_content = content
+                        
+                    parsed_result = json.loads(json_content)
+                    return parsed_result
+                except json.JSONDecodeError as e:
+                    return {
+                        "code": 500,
+                        "message": f"无法解析返回的JSON: {str(e)}",
+                        "original_response": content
+                    }
+            
+            return {
+                "code": 500,
+                "message": "无法获取有效响应",
+                "original_response": result
+            }
+            
+        except requests.RequestException as e:
+            return {
+                "code": 500,
+                "message": f"API请求失败: {str(e)}"
+            }
+
+
+    def _optimize_ddl_prompt(self):
         """返回优化后的提示词模板"""
         return """
 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
@@ -150,3 +222,110 @@ class DDLParser:
 请仅返回JSON格式结果,不要包含任何其他解释文字。
 """
 
+    def _optimize_connstr_parse_prompt(self):
+        """返回优化后的连接字符串解析提示词模板"""
+        return """
+请解析以下数据库连接字符串,并按照指定的JSON格式返回结果:
+
+规则说明:
+1. 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
+2. 根据连接串格式识别数据库类型,数据库类型请使用小写,如 mysql/postgresql/sqlserver/oracle/db2/sybase
+3. data_source.en_name格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加
+4. data_source.name留空
+5. 无法确定数据库类型时,type设为"unknown"
+6. 除了database,password,username,en_name,host,port,type,name 之外,连接串的其它字段放在param属性中
+
+返回格式示例:
+{
+    "data_source": {
+        "en_name": "mydatabase_10.52.31.104_5432_myuser",
+        "name": "",
+        "type": "postgresql",
+        "host": "10.52.31.104",
+        "port": 5432,
+        "database": "mydatabase",
+        "username": "myuser",
+        "password": "mypassword",
+        "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
+    }
+}
+
+请仅返回JSON格式结果,不要包含任何其他解释文字。
+"""
+
+
+    def _optimize_connstr_valid_prompt(self):
+        """返回优化后的连接字符串验证提示词模板"""
+        return """
+请验证以下数据库连接信息是否符合规则:
+
+规则说明:
+1. 必填字段检查:
+   - database: 数据库名称,不能为空,符合数据库名称的命名规范。
+   - en_name: 格式必须为 "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}"
+   - host: 主机名或IP地址,不能为空
+   - port: 端口号,必须为数字
+   - type: 数据库类型,必须为以下之一:mysql/postgresql/sqlserver/oracle/db2/sybase
+   - username: 用户名,不能为空,名称中间不能有空格。
+
+2. 字段格式检查:
+   - en_name中的各个部分必须与对应的字段值匹配
+   - port必须是有效的端口号(1-65535)
+   - type必须是小写的数据库类型名称
+   - param中的参数格式必须正确(key=value格式)
+
+3. 可选字段:
+   - password: 密码(可选)
+   - name: 中文名称(可选)
+   - desc: 描述(可选)
+
+请检查提供的连接信息是否符合以上规则,如果符合则返回"success",否则返回"failure"。
+
+请仅返回"success"或"failure",不要包含任何其他解释文字。
+"""
+
+    def valid_db_conn_str(self, conn_str):
+        """
+        验证数据库连接字符串是否符合规则
+        
+        参数:
+            conn_str: 要验证的数据库连接信息(JSON格式)
+            
+        返回:
+            "success" 或 "failure"
+        """
+        prompt = self._optimize_connstr_valid_prompt()
+        payload = {
+            "model": self.model_name,
+            "messages": [
+                {
+                    "role": "system",
+                    "content": "你是一个专业的数据库连接信息验证专家,擅长验证数据库连接信息的完整性和正确性。"
+                },
+                {
+                    "role": "user", 
+                    "content": f"{prompt}\n\n{json.dumps(conn_str, ensure_ascii=False)}"
+                }
+            ]
+        }
+        
+        try:
+            response = requests.post(
+                f"{self.base_url}/chat/completions",
+                headers=self.headers,
+                json=payload,
+                timeout=30
+            )
+            response.raise_for_status()
+            
+            result = response.json()
+            
+            if "choices" in result and len(result["choices"]) > 0:
+                content = result["choices"][0]["message"]["content"].strip().lower()
+                return "success" if content == "success" else "failure"
+            
+            return "failure"
+            
+        except Exception as e:
+            logger.error(f"LLM 验证数据库连接字符串失败: {str(e)}")
+            return "failure"

+ 88 - 0
migrate_meta_data_type.py

@@ -0,0 +1,88 @@
+"""
+元数据属性迁移脚本:将meta_data节点的type属性改为data_type属性
+运行方式:python migrate_meta_data_type.py
+"""
+
+import logging
+import sys
+import time
+from app.services.neo4j_driver import neo4j_driver
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("meta_data_migration")
+
+def migrate_type_to_data_type():
+    """
+    将meta_data节点的type属性迁移到data_type属性
+    """
+    try:
+        with neo4j_driver.get_session() as session:
+            # 查询拥有type属性的meta_data节点数量
+            count_query = """
+            MATCH (n:meta_data)
+            WHERE EXISTS(n.type)
+            RETURN COUNT(n) as count
+            """
+            count_result = session.run(count_query)
+            node_count = count_result.single()["count"]
+            
+            if node_count == 0:
+                logger.info("没有找到带有type属性的meta_data节点,无需迁移")
+                return
+                
+            logger.info(f"找到 {node_count} 个需要迁移的meta_data节点")
+            
+            # 执行迁移:复制type属性到data_type,然后删除type属性
+            migrate_query = """
+            MATCH (n:meta_data)
+            WHERE EXISTS(n.type)
+            SET n.data_type = n.type
+            REMOVE n.type
+            RETURN COUNT(n) as migrated_count
+            """
+            
+            migrate_result = session.run(migrate_query)
+            migrated_count = migrate_result.single()["migrated_count"]
+            
+            logger.info(f"成功迁移 {migrated_count} 个meta_data节点的type属性到data_type属性")
+            
+            # 验证迁移结果
+            verify_query = """
+            MATCH (n:meta_data)
+            WHERE EXISTS(n.type)
+            RETURN COUNT(n) as remaining_count
+            """
+            verify_result = session.run(verify_query)
+            remaining_count = verify_result.single()["remaining_count"]
+            
+            if remaining_count > 0:
+                logger.warning(f"仍有 {remaining_count} 个meta_data节点保留了type属性")
+            else:
+                logger.info("所有meta_data节点的type属性已成功迁移到data_type属性")
+                
+    except Exception as e:
+        logger.error(f"迁移过程中发生错误: {str(e)}")
+        raise
+
+if __name__ == "__main__":
+    try:
+        logger.info("开始迁移meta_data节点的type属性到data_type属性...")
+        start_time = time.time()
+        
+        migrate_type_to_data_type()
+        
+        end_time = time.time()
+        execution_time = end_time - start_time
+        logger.info(f"迁移完成,耗时: {execution_time:.2f} 秒")
+        
+    except Exception as e:
+        logger.error(f"迁移失败: {str(e)}")
+        sys.exit(1)