import os import requests import re import json import logging from flask import current_app logger = logging.getLogger(__name__) class DDLParser: def __init__(self, api_key=None): """ 初始化DDL解析器 参数: api_key: LLM API密钥,如果未提供,将从应用配置或环境变量中获取 """ # 如果在Flask应用上下文中,则从应用配置获取参数 self.api_key = api_key or current_app.config.get('LLM_API_KEY') self.base_url = current_app.config.get('LLM_BASE_URL') self.model_name = current_app.config.get('LLM_MODEL_NAME') self.headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } def parse_ddl(self, sql_content): """ 解析DDL语句,返回标准化的结构 参数: sql_content: 要解析的DDL语句 返回: 解析结果的JSON对象 """ prompt = self._optimize_ddl_prompt() payload = { "model": self.model_name, "messages": [ { "role": "system", "content": "你是一个专业的数据库分析专家,擅长解析SQL DDL语句并提取表结构信息。" }, { "role": "user", "content": f"{prompt}\n\n{sql_content}" } ] } try: response = requests.post( f"{self.base_url}/chat/completions", headers=self.headers, json=payload, timeout=30 ) response.raise_for_status() result = response.json() if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"] try: json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content) if json_match: json_content = json_match.group(1) else: json_content = content parsed_result = json.loads(json_content) return parsed_result except json.JSONDecodeError as e: return { "code": 500, "message": f"无法解析返回的JSON: {str(e)}", "original_response": content } return { "code": 500, "message": "无法获取有效响应", "original_response": result } except requests.RequestException as e: return { "code": 500, "message": f"API请求失败: {str(e)}" } def parse_db_conn_str(self, conn_str): """ 解析数据库连接字符串 参数: conn_str: 要解析的数据库连接字符串 返回: 解析结果的JSON对象 """ prompt = self._optimize_connstr_parse_prompt() payload = { "model": self.model_name, "messages": [ { "role": "system", "content": "你是一个专业的数据库连接字符串解析专家,擅长解析各种数据库的连接字符串并提取关键信息。" }, { "role": "user", "content": f"{prompt}\n\n{conn_str}" } ] } try: response = requests.post( f"{self.base_url}/chat/completions", headers=self.headers, json=payload, timeout=30 ) response.raise_for_status() result = response.json() if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"] try: json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content) if json_match: json_content = json_match.group(1) else: json_content = content parsed_result = json.loads(json_content) return parsed_result except json.JSONDecodeError as e: return { "code": 500, "message": f"无法解析返回的JSON: {str(e)}", "original_response": content } return { "code": 500, "message": "无法获取有效响应", "original_response": result } except requests.RequestException as e: return { "code": 500, "message": f"API请求失败: {str(e)}" } def _optimize_ddl_prompt(self): """返回优化后的提示词模板""" return """ 请解析以下DDL建表语句,并按照指定的JSON格式返回结果: 规则说明: 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。 - 中文表名中不要出现标点符号 - 表中的字段对应输出json中的meta对象,en_name对应表的字段名,data_type对应表的字段类型. 3. 返回结果的中文名称(name)的确定规则: - 对于COMMENT注释,直接使用注释内容作为name - 如sql中无注释但字段名en_name有明确含义,将英文名en_name翻译为中文 - 如字段名en_name是无意义的拼音缩写,则name为空字符串 - 中文字段名name中不要出现逗号,以及"主键"、"外键"、"索引"等字样 4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。 - 对于每个表的字段都要检查它的en_name和name,name不能为空,首选字段的注释,如果没有注释,则尝试翻译en_name作为name。 5. 忽略sql文件中除了表的定义和注释信息COMMIT以外的内容。比如,忽略sql中的数据库的连接字符串。 6. 参考格式如下: { "users_table": { //表名 "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串 "schema": "public", "meta": [{ "en_name": "id", //表的字段名 "data_type": "integer", //表的字段类型 "name": "用户ID" //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串 }, { "en_name": "username", "data_type": "varchar", "name": "用户名" } ] } } 请仅返回JSON格式结果,不要包含任何其他解释文字。 """ def _optimize_ddl_source_prompt(self): """返回优化后的提示词模板""" return """ 请解析以下DDL建表语句,并按照指定的JSON格式返回结果: 规则说明: 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。 - 中文表名中不要出现标点符号 3. 字段中文名称(name)的确定规则: - 如有COMMENT注释,直接使用注释内容 - 如无注释但字段名有明确含义,将英文名翻译为中文 - 如字段名是无意义的拼音缩写,则name为空字符串 - 字段名中不要出现逗号,以及"主键"、"外键"、"索引"等字样 4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。 5. data_source对象,请放在data_source标签中,它与tables对象同级。 6. 数据库连接串处理: - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。 - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2/sybase - data_source.en_name格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加. - data_source.name留空. - 无法确定数据库类型时,type设为"unknown" - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签 - 除了database,password,username,en_name,host,port,type,name 之外,连接串的其它字段放在param属性中。 7. 参考格式如下: { "tables": { "users": { //表名 "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串 "schema": "public", "meta": [{ "en_name": "id", "data_type": "integer", "name": "用户ID" }, { "en_name": "username", "data_type": "varchar", "name": "用户名" } ] } }, "data_source": [{ "en_name": "mydatabase_10.52.31.104_5432_myuser", //{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名} "name": "", //如果没有注释,这里留空 "type": "postgresql", "host": "10.52.31.104", "port": 5432, "database": "mydatabase", "username": "myuser", "password": "mypassword", "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC" }] } 请仅返回JSON格式结果,不要包含任何其他解释文字。 """ def _optimize_connstr_parse_prompt(self): """返回优化后的连接字符串解析提示词模板""" return """ 请解析以下数据库连接字符串,并按照指定的JSON格式返回结果: 规则说明: 1. 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。 2. 根据连接串格式识别数据库类型,数据库类型请使用小写,如 mysql/postgresql/sqlserver/oracle/db2/sybase 3. data_source.en_name格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加 4. data_source.name留空 5. 无法确定数据库类型时,type设为"unknown" 6. 除了database,password,username,en_name,host,port,type,name 之外,连接串的其它字段放在param属性中 返回格式示例: { "data_source": { "en_name": "mydatabase_10.52.31.104_5432_myuser", "name": "", "type": "postgresql", "host": "10.52.31.104", "port": 5432, "database": "mydatabase", "username": "myuser", "password": "mypassword", "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC" } } 请仅返回JSON格式结果,不要包含任何其他解释文字。 """ def _optimize_connstr_valid_prompt(self): """返回优化后的连接字符串验证提示词模板""" return """ 请验证以下数据库连接信息是否符合规则: 规则说明: 1. 必填字段检查: - database: 数据库名称,不能为空,符合数据库名称的命名规范。 - en_name: 格式必须为 "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}" - host: 主机名或IP地址,不能为空 - port: 端口号,必须为数字 - type: 数据库类型,必须为以下之一:mysql/postgresql/sqlserver/oracle/db2/sybase - username: 用户名,不能为空,名称中间不能有空格。 2. 字段格式检查: - en_name中的各个部分必须与对应的字段值匹配 - port必须是有效的端口号(1-65535) - type必须是小写的数据库类型名称 - param中的参数格式必须正确(key=value格式) 3. 可选字段: - password: 密码(可选) - name: 中文名称(可选) - desc: 描述(可选) 请检查提供的连接信息是否符合以上规则,如果符合则返回"success",否则返回"failure"。 请仅返回"success"或"failure",不要包含任何其他解释文字。 """ def valid_db_conn_str(self, conn_str): """ 验证数据库连接字符串是否符合规则 参数: conn_str: 要验证的数据库连接信息(JSON格式) 返回: "success" 或 "failure" """ prompt = self._optimize_connstr_valid_prompt() payload = { "model": self.model_name, "messages": [ { "role": "system", "content": "你是一个专业的数据库连接信息验证专家,擅长验证数据库连接信息的完整性和正确性。" }, { "role": "user", "content": f"{prompt}\n\n{json.dumps(conn_str, ensure_ascii=False)}" } ] } try: response = requests.post( f"{self.base_url}/chat/completions", headers=self.headers, json=payload, timeout=30 ) response.raise_for_status() result = response.json() if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"].strip().lower() return "success" if content == "success" else "failure" return "failure" except Exception as e: logger.error(f"LLM 验证数据库连接字符串失败: {str(e)}") return "failure"