from __future__ import annotations import io import json import logging import re import time from typing import Any import requests from flask import current_app logger = logging.getLogger(__name__) class DDLParser: def __init__(self, api_key=None, timeout=60, max_retries=3): """ 初始化DDL解析器 参数: api_key: LLM API密钥,如果未提供,将从应用配置或环境变量中获取 timeout: API请求超时时间(秒),默认60秒 max_retries: 最大重试次数,默认3次 """ # 如果在Flask应用上下文中,则从应用配置获取参数 self.api_key = api_key or current_app.config.get("LLM_API_KEY") self.base_url = current_app.config.get("LLM_BASE_URL") self.model_name = current_app.config.get("LLM_MODEL_NAME") self.timeout = timeout self.max_retries = max_retries self.headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } def _make_llm_request(self, payload, operation_name="LLM请求"): """ 发送LLM请求,支持自动重试 参数: payload: 请求payload operation_name: 操作名称,用于日志 返回: API响应结果 """ last_error = None for attempt in range(self.max_retries): try: if attempt > 0: wait_time = 2**attempt # 指数退避: 2, 4, 8秒 logger.info( f"{operation_name} 第{attempt + 1}次重试,等待{wait_time}秒..." ) time.sleep(wait_time) logger.info( f"{operation_name} 尝试 {attempt + 1}/{self.max_retries},超时时间: {self.timeout}秒" ) response = requests.post( f"{self.base_url}/chat/completions", headers=self.headers, json=payload, timeout=self.timeout, ) response.raise_for_status() result = response.json() logger.info(f"{operation_name} 成功") return result except requests.Timeout as e: last_error = f"请求超时(超过{self.timeout}秒): {str(e)}" logger.warning(f"{operation_name} 超时: {str(e)}") except requests.RequestException as e: last_error = f"API请求失败: {str(e)}" logger.warning(f"{operation_name} 失败: {str(e)}") except Exception as e: last_error = f"未知错误: {str(e)}" logger.error(f"{operation_name} 异常: {str(e)}") break # 对于非网络错误,不重试 # 所有重试都失败 logger.error(f"{operation_name} 在{self.max_retries}次尝试后失败: {last_error}") return None def parse_ddl(self, sql_content): """ 解析DDL语句,返回标准化的结构 参数: sql_content: 要解析的DDL语句 返回: 解析结果的JSON对象 """ prompt = self._optimize_ddl_prompt() payload = { "model": self.model_name, "messages": [ { "role": "system", "content": "你是一个专业的SQL DDL语句解析专家,擅长从DDL语句中提取表结构信息并转换为结构化的JSON格式。", }, {"role": "user", "content": f"{prompt}\n\n{sql_content}"}, ], } try: result = self._make_llm_request(payload, "DDL解析") if not result: return { "code": 500, "message": f"API请求失败: 在{self.max_retries}次尝试后仍然失败", } if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"] try: json_match = re.search(r"```json\s*([\s\S]*?)\s*```", content) if json_match: json_content = json_match.group(1) else: json_content = content parsed_result = json.loads(json_content) return parsed_result except json.JSONDecodeError as e: return { "code": 500, "message": f"无法解析返回的JSON: {str(e)}", "original_response": content, } return { "code": 500, "message": "无法获取有效响应", "original_response": result, } except Exception as e: logger.error(f"DDL解析异常: {str(e)}") return {"code": 500, "message": f"解析失败: {str(e)}"} def parse_db_conn_str(self, conn_str): """ 解析数据库连接字符串 参数: conn_str: 要解析的数据库连接字符串 返回: 解析结果的JSON对象 """ prompt = self._optimize_connstr_parse_prompt() payload = { "model": self.model_name, "messages": [ { "role": "system", "content": "你是一个专业的数据库连接字符串解析专家,擅长解析各种数据库的连接字符串并提取关键信息。", }, {"role": "user", "content": f"{prompt}\n\n{conn_str}"}, ], } try: result = self._make_llm_request(payload, "连接字符串解析") if not result: return { "code": 500, "message": f"API请求失败: 在{self.max_retries}次尝试后仍然失败", } if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"] try: json_match = re.search(r"```json\s*([\s\S]*?)\s*```", content) if json_match: json_content = json_match.group(1) else: json_content = content parsed_result = json.loads(json_content) return parsed_result except json.JSONDecodeError as e: return { "code": 500, "message": f"无法解析返回的JSON: {str(e)}", "original_response": content, } return { "code": 500, "message": "无法获取有效响应", "original_response": result, } except Exception as e: logger.error(f"连接字符串解析异常: {str(e)}") return {"code": 500, "message": f"解析失败: {str(e)}"} def _optimize_ddl_prompt(self): """返回优化后的提示词模板""" return """ 请解析以下DDL建表语句,并按照指定的JSON格式返回结果: 规则说明: 1. 从DDL语句中识别所有表,可能会有多个表。将所有表放在一个数组中返回。 2. 表的英文名称(name_en)使用原始大小写,不要转换为小写。 3. 表的中文名称(name_zh)提取规则: - 优先从COMMENT ON TABLE语句中提取 - 如果没有注释,则name_zh为空字符串 - 中文名称中不要出现标点符号、"主键"、"外键"、"索引"等字样 4. 对于每个表,提取所有字段信息到columns数组中,每个字段包含: - name_zh: 字段中文名称(从COMMENT ON COLUMN提取,如果没有注释则翻译英文名,如果是无意义缩写则为空) - name_en: 字段英文名称(保持原始大小写) - data_type: 数据类型(包含长度信息,如VARCHAR(22)) - is_primary: 是否主键("是"或"否",从PRIMARY KEY约束判断) - comment: 注释内容(从COMMENT ON COLUMN提取完整注释,如果没有则为空字符串) - nullable: 是否可为空("是"或"否",从NOT NULL约束判断,默认为"是") 5. 中文字段名不要出现逗号、"主键"、"外键"、"索引"等字样。 6. 返回格式(使用数组支持多表): [ { "table_info": { "name_zh": "科室对照表", "name_en": "TB_JC_KSDZB" }, "columns": [ { "name_zh": "医疗机构代码", "name_en": "YLJGDM", "data_type": "VARCHAR(22)", "is_primary": "是", "comment": "医疗机构代码,复合主键", "nullable": "否" }, { "name_zh": "HIS科室代码", "name_en": "HISKSDM", "data_type": "CHAR(20)", "is_primary": "是", "comment": "HIS科室代码,主键、唯一", "nullable": "否" }, { "name_zh": "HIS科室名称", "name_en": "HISKSMC", "data_type": "CHAR(20)", "is_primary": "否", "comment": "HIS科室名称", "nullable": "否" } ] } ] 注意: - 如果只有一个表,也要返回数组格式:[{table_info: {...}, columns: [...]}] - 如果有多个表,数组中包含多个元素:[{表1}, {表2}, {表3}] 请仅返回JSON格式结果,不要包含任何其他解释文字。 """ def _optimize_ddl_source_prompt(self): """返回优化后的提示词模板""" return """ 请解析以下DDL建表语句,并按照指定的JSON格式返回结果: 规则说明: 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。 - 中文表名中不要出现标点符号 3. 字段中文名称(name_zh)的确定规则: - 如有COMMENT注释,直接使用注释内容 - 如无注释但字段名有明确含义,将英文名翻译为中文 - 如字段名是无意义的拼音缩写,则name_zh为空字符串 - 字段名中不要出现逗号,以及"主键"、"外键"、"索引"等字样 4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。 5. data_source对象,请放在data_source标签中,它与tables对象同级。 6. 数据库连接串处理: - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。 - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2/sybase - data_source.name_en格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加. - data_source.name_zh留空. - 无法确定数据库类型时,type设为"unknown" - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签 - 除了database,password,username,name_en,host,port,type,name_zh 之外,连接串的其它字段放在param属性中。 7. 参考格式如下: { "tables": { "users": { //表名 "name_zh": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name_zh为空字符串 "schema": "public", "meta": [{ "name_en": "id", "data_type": "integer", "name_zh": "用户ID" }, { "name_en": "username", "data_type": "varchar", "name_zh": "用户名" } ] } }, "data_source": [{ "name_en": "mydatabase_10.52.31.104_5432_myuser", //{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名} "name_zh": "", //如果没有注释,这里留空 "type": "postgresql", "host": "10.52.31.104", "port": 5432, "database": "mydatabase", "username": "myuser", "password": "mypassword", "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC" }] } 请仅返回JSON格式结果,不要包含任何其他解释文字。 """ def _optimize_connstr_parse_prompt(self): """返回优化后的连接字符串解析提示词模板""" return """ 请解析以下数据库连接字符串,并按照指定的JSON格式返回结果: 规则说明: 1. 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。 2. 根据连接串格式识别数据库类型,数据库类型请使用小写,如 mysql/postgresql/sqlserver/oracle/db2/sybase 3. data_source.name_en格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加 4. data_source.name_zh留空 5. 无法确定数据库类型时,type设为"unknown" 6. 除了database,password,username,name_en,host,port,type,name_zh 之外,连接串的其它字段放在param属性中 返回格式示例: { "data_source": { "name_en": "mydatabase_10.52.31.104_5432_myuser", "name_zh": "", "type": "postgresql", "host": "10.52.31.104", "port": 5432, "database": "mydatabase", "username": "myuser", "password": "mypassword", "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC" } } 请仅返回JSON格式结果,不要包含任何其他解释文字。 """ def _optimize_connstr_valid_prompt(self): """返回优化后的连接字符串验证提示词模板""" return """ 请验证以下数据库连接信息是否符合规则: 规则说明: 1. 必填字段检查: - database: 数据库名称,不能为空,符合数据库名称的命名规范。 - name_en: 格式必须为 "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}" - host: 主机名或IP地址,不能为空 - port: 端口号,必须为数字 - type: 数据库类型,必须为以下之一:mysql/postgresql/sqlserver/oracle/db2/sybase - username: 用户名,不能为空,名称中间不能有空格。 2. 字段格式检查: - en_name中的各个部分必须与对应的字段值匹配 - port必须是有效的端口号(1-65535) - type必须是小写的数据库类型名称 - param中的参数格式必须正确(key=value格式) 3. 可选字段: - password: 密码(可选) - name: 中文名称(可选) - desc: 描述(可选) 请检查提供的连接信息是否符合以上规则,如果符合则返回"success",否则返回"failure"。 请仅返回"success"或"failure",不要包含任何其他解释文字。 """ def valid_db_conn_str(self, conn_str): """ 验证数据库连接字符串是否符合规则 参数: conn_str: 要验证的数据库连接信息(JSON格式) 返回: "success" 或 "failure" """ prompt = self._optimize_connstr_valid_prompt() payload = { "model": self.model_name, "messages": [ { "role": "system", "content": "你是一个专业的数据库连接信息验证专家,擅长验证数据库连接信息的完整性和正确性。", }, { "role": "user", "content": f"{prompt}\n\n{json.dumps(conn_str, ensure_ascii=False)}", }, ], } try: result = self._make_llm_request(payload, "连接字符串验证") if not result: logger.error( f"连接字符串验证失败: 在{self.max_retries}次尝试后仍然失败" ) return "failure" if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"].strip().lower() return "success" if content == "success" else "failure" return "failure" except Exception as e: logger.error(f"LLM 验证数据库连接字符串失败: {str(e)}") return "failure" def parse_excel_content(self, file_content: bytes) -> list[dict[str, Any]]: """ 解析 Excel 文件内容,提取数据表定义信息 Args: file_content: Excel 文件的二进制内容 Returns: 解析后的表结构列表 """ try: import pandas as pd # 读取 Excel 文件的所有 sheet excel_file = io.BytesIO(file_content) xl = pd.ExcelFile(excel_file) # 将所有 sheet 的内容转换为文本 all_content = [] for sheet_name in xl.sheet_names: df = pd.read_excel(xl, sheet_name=sheet_name) # 将 DataFrame 转换为 markdown 表格格式 sheet_content = f"## Sheet: {sheet_name}\n" sheet_content += df.to_markdown(index=False) all_content.append(sheet_content) combined_content = "\n\n".join(all_content) logger.info(f"Excel 文件解析完成,共 {len(xl.sheet_names)} 个 sheet") # 使用 LLM 解析表结构 return self._parse_document_content(combined_content, "Excel") except Exception as e: logger.error(f"Excel 文件解析失败: {str(e)}") raise ValueError(f"Excel 文件解析失败: {str(e)}") from e def parse_word_content(self, file_content: bytes) -> list[dict[str, Any]]: """ 解析 Word 文件内容,提取数据表定义信息 Args: file_content: Word 文件的二进制内容 Returns: 解析后的表结构列表 """ try: from docx import Document # 读取 Word 文件 doc = Document(io.BytesIO(file_content)) # 提取所有段落文本 paragraphs = [para.text for para in doc.paragraphs if para.text.strip()] # 提取所有表格 tables_content = [] for table_idx, table in enumerate(doc.tables): table_text = f"\n### 表格 {table_idx + 1}:\n" for row in table.rows: row_text = " | ".join(cell.text.strip() for cell in row.cells) table_text += row_text + "\n" tables_content.append(table_text) # 组合内容 combined_content = "\n".join(paragraphs) if tables_content: combined_content += "\n\n## 文档中的表格:\n" + "\n".join(tables_content) logger.info( f"Word 文件解析完成,共 {len(paragraphs)} 个段落,{len(doc.tables)} 个表格" ) # 使用 LLM 解析表结构 return self._parse_document_content(combined_content, "Word") except Exception as e: logger.error(f"Word 文件解析失败: {str(e)}") raise ValueError(f"Word 文件解析失败: {str(e)}") from e def parse_pdf_content(self, file_content: bytes) -> list[dict[str, Any]]: """ 解析 PDF 文件内容,提取数据表定义信息 Args: file_content: PDF 文件的二进制内容 Returns: 解析后的表结构列表 """ try: import pdfplumber # 读取 PDF 文件 pdf = pdfplumber.open(io.BytesIO(file_content)) all_content = [] for page_num, page in enumerate(pdf.pages): page_text = f"## 第 {page_num + 1} 页:\n" # 提取页面文本 text = page.extract_text() if text: page_text += text + "\n" # 提取页面中的表格 tables = page.extract_tables() for table_idx, table in enumerate(tables): page_text += f"\n### 表格 {table_idx + 1}:\n" for row in table: row_text = " | ".join(str(cell) if cell else "" for cell in row) page_text += row_text + "\n" all_content.append(page_text) pdf.close() combined_content = "\n\n".join(all_content) logger.info(f"PDF 文件解析完成,共 {len(pdf.pages)} 页") # 使用 LLM 解析表结构 return self._parse_document_content(combined_content, "PDF") except Exception as e: logger.error(f"PDF 文件解析失败: {str(e)}") raise ValueError(f"PDF 文件解析失败: {str(e)}") from e def _parse_document_content( self, content: str, file_type: str ) -> list[dict[str, Any]]: """ 使用 LLM 解析文档内容,提取数据表定义信息 Args: content: 文档的文本内容 file_type: 文件类型(用于日志记录) Returns: 解析后的表结构列表 """ prompt = self._get_document_parse_prompt() payload = { "model": self.model_name, "messages": [ { "role": "system", "content": "你是一个专业的数据表结构解析专家,擅长从各种文档中识别和提取数据表定义信息并转换为结构化的JSON格式。", }, {"role": "user", "content": f"{prompt}\n\n{content}"}, ], } try: result = self._make_llm_request(payload, f"{file_type}文档解析") if not result: raise ValueError(f"API请求失败: 在{self.max_retries}次尝试后仍然失败") if "choices" in result and len(result["choices"]) > 0: response_content = result["choices"][0]["message"]["content"] try: json_match = re.search( r"```json\s*([\s\S]*?)\s*```", response_content ) if json_match: json_content = json_match.group(1) else: json_content = response_content parsed_result = json.loads(json_content) # 确保返回的是列表格式 if isinstance(parsed_result, dict): parsed_result = [parsed_result] return parsed_result except json.JSONDecodeError as e: raise ValueError(f"无法解析返回的JSON: {str(e)}") from e raise ValueError("无法获取有效响应") except Exception as e: logger.error(f"{file_type}文档解析异常: {str(e)}") raise def _get_document_parse_prompt(self) -> str: """返回文档解析的提示词模板""" return """ 请从以下文档内容中识别并提取所有数据表的定义信息,按照指定的JSON格式返回结果。 规则说明: 1. 仔细阅读文档内容,识别所有描述数据表结构的部分。 2. 一个文档可能包含一个或多个数据表的定义,请将所有表放在一个JSON数组中返回。 3. 表的英文名称(name_en): - 如果文档中有英文表名,使用原始大小写 - 如果没有英文名,尝试根据中文名翻译或生成合适的英文名 4. 表的中文名称(name_zh): - 从文档中提取表的中文名称或描述 - 如果没有明确的中文名,根据内容推断 5. 对于每个表,提取所有字段信息到columns数组中,每个字段包含: - name_zh: 字段中文名称 - name_en: 字段英文名称(如果没有,根据中文名翻译) - data_type: 数据类型(如VARCHAR(255)、INTEGER、DATE等,如果文档未指定则根据字段用途推断) - is_primary: 是否主键("是"或"否") - comment: 字段说明或注释 - nullable: 是否可为空("是"或"否",如果文档未指定默认为"是") 6. 返回格式(必须是JSON数组): [ { "table_info": { "name_zh": "用户信息表", "name_en": "user_info" }, "columns": [ { "name_zh": "用户ID", "name_en": "user_id", "data_type": "INTEGER", "is_primary": "是", "comment": "用户唯一标识", "nullable": "否" }, { "name_zh": "用户名", "name_en": "username", "data_type": "VARCHAR(50)", "is_primary": "否", "comment": "用户登录名", "nullable": "否" } ] } ] 注意: - 即使只识别到一个表,也必须返回数组格式:[{table_info: {...}, columns: [...]}] - 如果文档中没有找到任何数据表定义,返回空数组:[] - 请仅返回JSON格式结果,不要包含任何其他解释文字。 """