Przeglądaj źródła

使用LLM处理ddl,初步调试完成,未处理data_source的exist的问题

wangxq 1 miesiąc temu
rodzic
commit
5db45d0917
3 zmienionych plików z 242 dodań i 134 usunięć
  1. 20 14
      app/__init__.py
  2. 83 120
      app/api/data_resource/routes.py
  3. 139 0
      app/core/llm/ddl_parser.py

+ 20 - 14
app/__init__.py

@@ -57,29 +57,35 @@ def configure_logging(app):
     log_encoding = app.config.get('LOG_ENCODING', 'UTF-8')
     log_to_console = app.config.get('LOG_TO_CONSOLE', True)
     
-    logger = logging.getLogger("app")
-    logger.setLevel(log_level)
+    # 配置根日志器
+    root_logger = logging.getLogger()
+    root_logger.setLevel(log_level)
     
-    # 清除现有处理器,避免重复
-    if logger.handlers:
-        logger.handlers.clear()
+    # 清除所有现有处理器
+    root_logger.handlers.clear()
     
-    # 文件处理器
-    handler = logging.FileHandler(log_file, encoding=log_encoding)
-    handler.setLevel(log_level)
+    # 文件处理器 - 只添加到根日志器
+    file_handler = logging.FileHandler(log_file, encoding=log_encoding)
+    file_handler.setLevel(log_level)
     logging_format = logging.Formatter(log_format)
-    handler.setFormatter(logging_format)
-    logger.addHandler(handler)
+    file_handler.setFormatter(logging_format)
+    root_logger.addHandler(file_handler)
     
-    # 控制台处理器
+    # 控制台处理器 - 只添加到根日志器
     if log_to_console:
         console = logging.StreamHandler()
         console.setLevel(log_level)
         console.setFormatter(logging_format)
-        logger.addHandler(console)
+        root_logger.addHandler(console)
+    
+    # 确保Flask内部日志器使用我们的配置
+    app.logger.handlers.clear()  # 移除Flask默认处理器
     
-    # 设置根日志级别,确保其他模块的日志也受控制
-    logging.basicConfig(level=log_level, format=log_format)
+    # 配置app日志器,但禁止传播到根日志器
+    logger = logging.getLogger("app")
+    logger.setLevel(log_level)
+    logger.handlers.clear()  # 清除现有处理器
+    logger.propagate = True  # 通过根日志器处理
     
     app.logger.info(f"日志配置完成: 级别={log_level_name}, 文件={log_file}")
     return logger

+ 83 - 120
app/api/data_resource/routes.py

@@ -22,7 +22,8 @@ from app.core.data_resource.resource import (
     id_data_search_list,
     table_sql,
     select_sql,
-    id_resource_graph
+    id_resource_graph,
+    status_query
 )
 from app.core.meta_data import (
     translate_and_parse,
@@ -33,6 +34,7 @@ from app.core.meta_data import (
 )
 import traceback
 from app.core.system.auth import require_auth
+from app.core.llm.ddl_parser import DDLParser
 
 logger = logging.getLogger("app")
 
@@ -217,6 +219,8 @@ def data_resource_update():
         logger.error(f"更新数据资源失败: {str(e)}")
         return jsonify(failed(str(e)))
 
+# 解析ddl,使用正则表达式匹配,但没有进行翻译,也没有对注释进行识别
+# 使用ddl创建数据资源时,调用该API
 @bp.route('/ddl', methods=['POST'])
 def id_data_ddl():
     """解析数据资源的DDL"""
@@ -468,6 +472,83 @@ def sql_test():
         logger.error(f"测试SQL查询失败: {str(e)}")
         return jsonify(failed(str(e)))
 
+# 使用LLM识别DDL语句,用来代替原来的正则的方式
+# 用于在数据资源创建时,识别DDL语句 /api/resource/ddl/parse
+@bp.route('/ddl/parse', methods=['POST'])
+def ddl_identify():
+    """识别DDL语句"""
+    try:
+        # 获取参数
+        sql_content = request.json.get('sql', '')
+        
+        if not sql_content:
+            return jsonify(failed("SQL内容不能为空"))
+        
+        parser = DDLParser()        
+        # 提取创建表的DDL语句
+        ddl_list = parser.parse_ddl(sql_content)
+            
+        if not ddl_list:
+            return jsonify(failed("未找到有效的CREATE TABLE语句"))
+        
+        # 为每个表名添加exist字段
+        if isinstance(ddl_list, dict):
+            # 检查是否有data_source键
+            data_source = None
+            if "data_source" in ddl_list:
+                # 临时移除data_source,以便只遍历表
+                data_source = ddl_list.pop("data_source", None)
+            
+            # 获取所有表名 - 过滤掉可能的非表结构键
+            table_names = []
+            for key, value in list(ddl_list.items()):
+                # 检查值是否是字典且包含meta键,这表明它是一个表结构
+                if isinstance(value, dict) and "meta" in value:
+                    table_names.append(key)
+                # 如果不是表结构,则不处理
+            
+            # 只有在有表名时才调用status_query
+            if table_names:
+                try:
+                    # 调用status_query获取表的存在状态
+                    status_results = status_query(table_names)
+                    
+                    # status_query返回的可能是单个值或嵌套列表,需要平展处理
+                    flat_results = []
+                    if status_results:
+                        # 如果是嵌套列表(通常只有一层嵌套),则拍平
+                        if isinstance(status_results, list):
+                            if len(status_results) == 1 and isinstance(status_results[0], list):
+                                flat_results = status_results[0]  # 拍平一层嵌套
+                            else:
+                                flat_results = status_results  # 已经是平的列表
+                    
+                    # 将状态添加到每个表
+                    for i, table_name in enumerate(table_names):
+                        if i < len(flat_results):
+                            ddl_list[table_name]["exist"] = flat_results[i]
+                        else:
+                            ddl_list[table_name]["exist"] = False
+                            
+                except Exception as e:
+                    logger.error(f"检查表存在状态失败: {str(e)}")
+                    # 如果status_query失败,所有表默认为不存在
+                    for table_name in table_names:
+                        ddl_list[table_name]["exist"] = False
+            
+            # 恢复data_source
+            if data_source:
+                ddl_list["data_source"] = data_source
+        
+        logger.debug(f"识别到的DDL语句: {ddl_list}")
+            
+        return jsonify(success(ddl_list))
+    except Exception as e:
+        logger.error(f"识别DDL语句失败: {str(e)}")
+        logger.error(traceback.format_exc())  # 添加详细错误堆栈
+        return jsonify(failed(str(e)))
+        
+
 # 废弃的识别DDL语句方法,该API 与 ddl API 功能类似,但功能简化了
 @bp.route('/ddl/identify', methods=['POST'])
 def sql_ddl_identify():
@@ -543,122 +624,4 @@ def get_resource_config():
         'allowed_extensions': list(config['allowed_extensions']),
         'bucket_name': config['bucket_name'],
         'prefix': config['prefix']
-    })
-
-    """解析表定义SQL,支持带schema和不带schema两种格式"""
-    try:
-        # 支持以下格式:
-        # 1. CREATE TABLE tablename
-        # 2. CREATE TABLE "tablename"
-        # 3. CREATE TABLE schema.tablename
-        # 4. CREATE TABLE "schema"."tablename"
-        table_name_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|([^"\s\.]+))\.)?(?:"([^"]+)"|([^"\s\(]+))'
-        table_name_match = re.search(table_name_pattern, sql, re.IGNORECASE)
-        
-        if not table_name_match:
-            return None
-            
-        # 获取表名,优先使用带引号的名称,如果没有则使用不带引号的
-        schema = table_name_match.group(1) or table_name_match.group(2)  # schema是可选的
-        table_name = table_name_match.group(3) or table_name_match.group(4)  # 实际表名
-        
-        # 提取字段定义
-        fields_pattern = r'CREATE\s+TABLE[^(]*\(\s*(.*?)\s*\)'
-        fields_match = re.search(fields_pattern, sql, re.DOTALL | re.IGNORECASE)
-        
-        if not fields_match:
-            return None
-            
-        fields_text = fields_match.group(1)
-        
-        # 分割字段定义
-        field_definitions = []
-        in_parenthesis = 0
-        current_field = ""
-        
-        for char in fields_text:
-            if char == '(':
-                in_parenthesis += 1
-                current_field += char
-            elif char == ')':
-                in_parenthesis -= 1
-                current_field += char
-            elif char == ',' and in_parenthesis == 0:
-                field_definitions.append(current_field.strip())
-                current_field = ""
-            else:
-                current_field += char
-                
-        if current_field.strip():
-            field_definitions.append(current_field.strip())
-        
-        # 解析每个字段
-        fields = []
-        primary_keys = []
-        
-        for field_def in field_definitions:
-            # 忽略PRIMARY KEY等约束定义
-            if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
-                # 提取主键字段
-                pk_pattern = r'PRIMARY\s+KEY\s*\(\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s*\)'
-                pk_match = re.search(pk_pattern, field_def, re.IGNORECASE)
-                
-                if pk_match:
-                    pk = next((g for g in pk_match.groups() if g is not None), "")
-                    primary_keys.append(pk)
-                continue
-                
-            # 解析常规字段定义
-            field_pattern = r'^\s*(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))\s+([A-Za-z0-9_]+(?:\s*\([^)]*\))?)'
-            field_match = re.search(field_pattern, field_def)
-            
-            if field_match:
-                # 提取字段名和类型
-                field_name = next((g for g in field_match.groups()[:4] if g is not None), "")
-                field_type = field_match.group(5)
-                
-                # 检查是否为主键
-                is_primary = "PRIMARY KEY" in field_def.upper()
-                if is_primary:
-                    primary_keys.append(field_name)
-                
-                # 检查是否为非空
-                not_null = "NOT NULL" in field_def.upper()
-                
-                # 检查默认值
-                default_match = re.search(r'DEFAULT\s+([^,\s]+)', field_def, re.IGNORECASE)
-                default_value = default_match.group(1) if default_match else None
-                
-                # 添加字段信息
-                field_info = {
-                    "name": field_name,
-                    "type": clean_type(field_type),
-                    "is_primary": is_primary,
-                    "not_null": not_null
-                }
-                
-                if default_value:
-                    field_info["default"] = default_value
-                    
-                fields.append(field_info)
-        
-        # 更新主键标记
-        for field in fields:
-            if field["name"] in primary_keys and not field["is_primary"]:
-                field["is_primary"] = True
-        
-        # 返回结果,包含schema信息
-        result = {
-            "table_name": table_name,
-            "fields": fields
-        }
-        
-        # 如果有schema,添加到结果中
-        if schema:
-            result["schema"] = schema
-            
-        return result
-        
-    except Exception as e:
-        logger.error(f"解析表定义SQL失败: {str(e)}")
-        return None 
+    })

+ 139 - 0
app/core/llm/ddl_parser.py

@@ -0,0 +1,139 @@
+import os
+import requests
+import re
+import json
+from flask import current_app
+
+class DDLParser:
+    def __init__(self, api_key=None):
+        """
+        初始化DDL解析器
+        
+        参数:
+            api_key: LLM API密钥,如果未提供,将从应用配置或环境变量中获取
+        """
+        # 如果在Flask应用上下文中,则从应用配置获取参数
+       
+        self.api_key = api_key or current_app.config.get('LLM_API_KEY')
+        self.base_url = current_app.config.get('LLM_BASE_URL')
+        self.model_name = current_app.config.get('LLM_MODEL_NAME')
+        
+        
+        self.headers = {
+            "Authorization": f"Bearer {self.api_key}",
+            "Content-Type": "application/json"
+        }
+
+    def parse_ddl(self, sql_content):
+        """
+        解析DDL语句,返回标准化的结构
+        
+        参数:
+            sql_content: 要解析的DDL语句
+            
+        返回:
+            解析结果的JSON对象
+        """
+        prompt = self._optimize_prompt()
+        payload = {
+            "model": self.model_name,
+            "messages": [
+                {
+                    "role": "system",
+                    "content": "你是一个专业的数据库分析专家,擅长解析SQL DDL语句并提取表结构信息。"
+                },
+                {
+                    "role": "user", 
+                    "content": f"{prompt}\n\n{sql_content}"
+                }
+            ]
+        }
+        
+        try:
+            response = requests.post(
+                f"{self.base_url}/chat/completions",
+                headers=self.headers,
+                json=payload,
+                timeout=30
+            )
+            response.raise_for_status()
+            
+            result = response.json()
+            
+            if "choices" in result and len(result["choices"]) > 0:
+                content = result["choices"][0]["message"]["content"]
+                
+                try:
+                    json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
+                    if json_match:
+                        json_content = json_match.group(1)
+                    else:
+                        json_content = content
+                        
+                    parsed_result = json.loads(json_content)
+                    return parsed_result
+                except json.JSONDecodeError as e:
+                    return {
+                        "code": 500,
+                        "message": f"无法解析返回的JSON: {str(e)}",
+                        "original_response": content
+                    }
+            
+            return {
+                "code": 500,
+                "message": "无法获取有效响应",
+                "original_response": result
+            }
+            
+        except requests.RequestException as e:
+            return {
+                "code": 500,
+                "message": f"API请求失败: {str(e)}"
+            }
+
+    def _optimize_prompt(self):
+        """返回优化后的提示词模板"""
+        return """
+请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
+
+规则说明:
+1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
+2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
+3. 字段中文名称(name)的确定规则:
+   - 如有COMMENT注释,直接使用注释内容
+   - 如无注释但字段名有明确含义,将英文名翻译为中文
+   - 如字段名是无意义的拼音缩写,则name为空字符串
+4. 数据库连接串处理:
+   - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
+   - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2
+   - data_source.name格式为: "{hostname或ip地址(点替换为中划线)}_{端口}_{数据库名称}",如某个元素无法识别,则跳过不添加到data_source.name
+   - 无法确定数据库类型时,type设为"unknown"
+   - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签
+5. 参考格式如下:
+{
+    "users": { //表名
+        "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
+        "schema": "public",
+        "meta": [{
+                "en_name": "id",
+                "data_type": "integer",
+                "name": "用户ID"
+            },
+            {
+                "en_name": "username",
+                "data_type": "varchar",
+                "name": "用户名"
+            }
+        ]
+    },
+    "data_source": [{
+        "name": "10-52-31-104_5432_inventory_management", //hostname或ip地址(点替换为中划线)_端口_数据库名称
+        "type": "postgresql",
+        "host": "10.52.31.104",
+        "port": 5432 "database": "inventory_management"
+    }]
+}
+
+请仅返回JSON格式结果,不要包含任何其他解释文字。
+"""
+