ddl_parser.py 17 KB


  1. import os
  2. import requests
  3. import re
  4. import json
  5. import logging
  6. import time
  7. from flask import current_app
  8. logger = logging.getLogger(__name__)
  9. class DDLParser:
  10. def __init__(self, api_key=None, timeout=60, max_retries=3):
  11. """
  12. 初始化DDL解析器
  13. 参数:
  14. api_key: LLM API密钥,如果未提供,将从应用配置或环境变量中获取
  15. timeout: API请求超时时间(秒),默认60秒
  16. max_retries: 最大重试次数,默认3次
  17. """
  18. # 如果在Flask应用上下文中,则从应用配置获取参数
  19. self.api_key = api_key or current_app.config.get('LLM_API_KEY')
  20. self.base_url = current_app.config.get('LLM_BASE_URL')
  21. self.model_name = current_app.config.get('LLM_MODEL_NAME')
  22. self.timeout = timeout
  23. self.max_retries = max_retries
  24. self.headers = {
  25. "Authorization": f"Bearer {self.api_key}",
  26. "Content-Type": "application/json"
  27. }
  28. def _make_llm_request(self, payload, operation_name="LLM请求"):
  29. """
  30. 发送LLM请求,支持自动重试
  31. 参数:
  32. payload: 请求payload
  33. operation_name: 操作名称,用于日志
  34. 返回:
  35. API响应结果
  36. """
  37. last_error = None
  38. for attempt in range(self.max_retries):
  39. try:
  40. if attempt > 0:
  41. wait_time = 2 ** attempt # 指数退避: 2, 4, 8秒
  42. logger.info(f"{operation_name} 第{attempt + 1}次重试,等待{wait_time}秒...")
  43. time.sleep(wait_time)
  44. logger.info(f"{operation_name} 尝试 {attempt + 1}/{self.max_retries},超时时间: {self.timeout}秒")
  45. response = requests.post(
  46. f"{self.base_url}/chat/completions",
  47. headers=self.headers,
  48. json=payload,
  49. timeout=self.timeout
  50. )
  51. response.raise_for_status()
  52. result = response.json()
  53. logger.info(f"{operation_name} 成功")
  54. return result
  55. except requests.Timeout as e:
  56. last_error = f"请求超时(超过{self.timeout}秒): {str(e)}"
  57. logger.warning(f"{operation_name} 超时: {str(e)}")
  58. except requests.RequestException as e:
  59. last_error = f"API请求失败: {str(e)}"
  60. logger.warning(f"{operation_name} 失败: {str(e)}")
  61. except Exception as e:
  62. last_error = f"未知错误: {str(e)}"
  63. logger.error(f"{operation_name} 异常: {str(e)}")
  64. break # 对于非网络错误,不重试
  65. # 所有重试都失败
  66. logger.error(f"{operation_name} 在{self.max_retries}次尝试后失败: {last_error}")
  67. return None
  68. def parse_ddl(self, sql_content):
  69. """
  70. 解析DDL语句,返回标准化的结构
  71. 参数:
  72. sql_content: 要解析的DDL语句
  73. 返回:
  74. 解析结果的JSON对象
  75. """
  76. prompt = self._optimize_ddl_prompt()
  77. payload = {
  78. "model": self.model_name,
  79. "messages": [
  80. {
  81. "role": "system",
  82. "content": "你是一个专业的SQL DDL语句解析专家,擅长从DDL语句中提取表结构信息并转换为结构化的JSON格式。"
  83. },
  84. {
  85. "role": "user",
  86. "content": f"{prompt}\n\n{sql_content}"
  87. }
  88. ]
  89. }
  90. try:
  91. result = self._make_llm_request(payload, "DDL解析")
  92. if not result:
  93. return {
  94. "code": 500,
  95. "message": f"API请求失败: 在{self.max_retries}次尝试后仍然失败"
  96. }
  97. if "choices" in result and len(result["choices"]) > 0:
  98. content = result["choices"][0]["message"]["content"]
  99. try:
  100. json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
  101. if json_match:
  102. json_content = json_match.group(1)
  103. else:
  104. json_content = content
  105. parsed_result = json.loads(json_content)
  106. return parsed_result
  107. except json.JSONDecodeError as e:
  108. return {
  109. "code": 500,
  110. "message": f"无法解析返回的JSON: {str(e)}",
  111. "original_response": content
  112. }
  113. return {
  114. "code": 500,
  115. "message": "无法获取有效响应",
  116. "original_response": result
  117. }
  118. except Exception as e:
  119. logger.error(f"DDL解析异常: {str(e)}")
  120. return {
  121. "code": 500,
  122. "message": f"解析失败: {str(e)}"
  123. }
  124. def parse_db_conn_str(self, conn_str):
  125. """
  126. 解析数据库连接字符串
  127. 参数:
  128. conn_str: 要解析的数据库连接字符串
  129. 返回:
  130. 解析结果的JSON对象
  131. """
  132. prompt = self._optimize_connstr_parse_prompt()
  133. payload = {
  134. "model": self.model_name,
  135. "messages": [
  136. {
  137. "role": "system",
  138. "content": "你是一个专业的数据库连接字符串解析专家,擅长解析各种数据库的连接字符串并提取关键信息。"
  139. },
  140. {
  141. "role": "user",
  142. "content": f"{prompt}\n\n{conn_str}"
  143. }
  144. ]
  145. }
  146. try:
  147. result = self._make_llm_request(payload, "连接字符串解析")
  148. if not result:
  149. return {
  150. "code": 500,
  151. "message": f"API请求失败: 在{self.max_retries}次尝试后仍然失败"
  152. }
  153. if "choices" in result and len(result["choices"]) > 0:
  154. content = result["choices"][0]["message"]["content"]
  155. try:
  156. json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
  157. if json_match:
  158. json_content = json_match.group(1)
  159. else:
  160. json_content = content
  161. parsed_result = json.loads(json_content)
  162. return parsed_result
  163. except json.JSONDecodeError as e:
  164. return {
  165. "code": 500,
  166. "message": f"无法解析返回的JSON: {str(e)}",
  167. "original_response": content
  168. }
  169. return {
  170. "code": 500,
  171. "message": "无法获取有效响应",
  172. "original_response": result
  173. }
  174. except Exception as e:
  175. logger.error(f"连接字符串解析异常: {str(e)}")
  176. return {
  177. "code": 500,
  178. "message": f"解析失败: {str(e)}"
  179. }
  180. def _optimize_ddl_prompt(self):
  181. """返回优化后的提示词模板"""
  182. return """
  183. 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
  184. 规则说明:
  185. 1. 从DDL语句中识别所有表,可能会有多个表。将所有表放在一个数组中返回。
  186. 2. 表的英文名称(name_en)使用原始大小写,不要转换为小写。
  187. 3. 表的中文名称(name_zh)提取规则:
  188. - 优先从COMMENT ON TABLE语句中提取
  189. - 如果没有注释,则name_zh为空字符串
  190. - 中文名称中不要出现标点符号、"主键"、"外键"、"索引"等字样
  191. 4. 对于每个表,提取所有字段信息到columns数组中,每个字段包含:
  192. - name_zh: 字段中文名称(从COMMENT ON COLUMN提取,如果没有注释则翻译英文名,如果是无意义缩写则为空)
  193. - name_en: 字段英文名称(保持原始大小写)
  194. - data_type: 数据类型(包含长度信息,如VARCHAR(22))
  195. - is_primary: 是否主键("是"或"否",从PRIMARY KEY约束判断)
  196. - comment: 注释内容(从COMMENT ON COLUMN提取完整注释,如果没有则为空字符串)
  197. - nullable: 是否可为空("是"或"否",从NOT NULL约束判断,默认为"是")
  198. 5. 中文字段名不要出现逗号、"主键"、"外键"、"索引"等字样。
  199. 6. 返回格式(使用数组支持多表):
  200. [
  201. {
  202. "table_info": {
  203. "name_zh": "科室对照表",
  204. "name_en": "TB_JC_KSDZB"
  205. },
  206. "columns": [
  207. {
  208. "name_zh": "医疗机构代码",
  209. "name_en": "YLJGDM",
  210. "data_type": "VARCHAR(22)",
  211. "is_primary": "是",
  212. "comment": "医疗机构代码,复合主键",
  213. "nullable": "否"
  214. },
  215. {
  216. "name_zh": "HIS科室代码",
  217. "name_en": "HISKSDM",
  218. "data_type": "CHAR(20)",
  219. "is_primary": "是",
  220. "comment": "HIS科室代码,主键、唯一",
  221. "nullable": "否"
  222. },
  223. {
  224. "name_zh": "HIS科室名称",
  225. "name_en": "HISKSMC",
  226. "data_type": "CHAR(20)",
  227. "is_primary": "否",
  228. "comment": "HIS科室名称",
  229. "nullable": "否"
  230. }
  231. ]
  232. }
  233. ]
  234. 注意:
  235. - 如果只有一个表,也要返回数组格式:[{table_info: {...}, columns: [...]}]
  236. - 如果有多个表,数组中包含多个元素:[{表1}, {表2}, {表3}]
  237. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  238. """
  239. def _optimize_ddl_source_prompt(self):
  240. """返回优化后的提示词模板"""
  241. return """
  242. 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
  243. 规则说明:
  244. 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
  245. 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
  246. - 中文表名中不要出现标点符号
  247. 3. 字段中文名称(name_zh)的确定规则:
  248. - 如有COMMENT注释,直接使用注释内容
  249. - 如无注释但字段名有明确含义,将英文名翻译为中文
  250. - 如字段名是无意义的拼音缩写,则name_zh为空字符串
  251. - 字段名中不要出现逗号,以及"主键"、"外键"、"索引"等字样
  252. 4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。
  253. 5. data_source对象,请放在data_source标签中,它与tables对象同级。
  254. 6. 数据库连接串处理:
  255. - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
  256. - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2/sybase
  257. - data_source.name_en格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加.
  258. - data_source.name_zh留空.
  259. - 无法确定数据库类型时,type设为"unknown"
  260. - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签
  261. - 除了database,password,username,name_en,host,port,type,name_zh 之外,连接串的其它字段放在param属性中。
  262. 7. 参考格式如下:
  263. {
  264. "tables": {
  265. "users": { //表名
  266. "name_zh": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name_zh为空字符串
  267. "schema": "public",
  268. "meta": [{
  269. "name_en": "id",
  270. "data_type": "integer",
  271. "name_zh": "用户ID"
  272. },
  273. {
  274. "name_en": "username",
  275. "data_type": "varchar",
  276. "name_zh": "用户名"
  277. }
  278. ]
  279. }
  280. },
  281. "data_source": [{
  282. "name_en": "mydatabase_10.52.31.104_5432_myuser", //{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}
  283. "name_zh": "", //如果没有注释,这里留空
  284. "type": "postgresql",
  285. "host": "10.52.31.104",
  286. "port": 5432,
  287. "database": "mydatabase",
  288. "username": "myuser",
  289. "password": "mypassword",
  290. "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
  291. }]
  292. }
  293. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  294. """
  295. def _optimize_connstr_parse_prompt(self):
  296. """返回优化后的连接字符串解析提示词模板"""
  297. return """
  298. 请解析以下数据库连接字符串,并按照指定的JSON格式返回结果:
  299. 规则说明:
  300. 1. 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
  301. 2. 根据连接串格式识别数据库类型,数据库类型请使用小写,如 mysql/postgresql/sqlserver/oracle/db2/sybase
  302. 3. data_source.name_en格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加
  303. 4. data_source.name_zh留空
  304. 5. 无法确定数据库类型时,type设为"unknown"
  305. 6. 除了database,password,username,name_en,host,port,type,name_zh 之外,连接串的其它字段放在param属性中
  306. 返回格式示例:
  307. {
  308. "data_source": {
  309. "name_en": "mydatabase_10.52.31.104_5432_myuser",
  310. "name_zh": "",
  311. "type": "postgresql",
  312. "host": "10.52.31.104",
  313. "port": 5432,
  314. "database": "mydatabase",
  315. "username": "myuser",
  316. "password": "mypassword",
  317. "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
  318. }
  319. }
  320. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  321. """
  322. def _optimize_connstr_valid_prompt(self):
  323. """返回优化后的连接字符串验证提示词模板"""
  324. return """
  325. 请验证以下数据库连接信息是否符合规则:
  326. 规则说明:
  327. 1. 必填字段检查:
  328. - database: 数据库名称,不能为空,符合数据库名称的命名规范。
  329. - name_en: 格式必须为 "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}"
  330. - host: 主机名或IP地址,不能为空
  331. - port: 端口号,必须为数字
  332. - type: 数据库类型,必须为以下之一:mysql/postgresql/sqlserver/oracle/db2/sybase
  333. - username: 用户名,不能为空,名称中间不能有空格。
  334. 2. 字段格式检查:
  335. - en_name中的各个部分必须与对应的字段值匹配
  336. - port必须是有效的端口号(1-65535)
  337. - type必须是小写的数据库类型名称
  338. - param中的参数格式必须正确(key=value格式)
  339. 3. 可选字段:
  340. - password: 密码(可选)
  341. - name: 中文名称(可选)
  342. - desc: 描述(可选)
  343. 请检查提供的连接信息是否符合以上规则,如果符合则返回"success",否则返回"failure"。
  344. 请仅返回"success"或"failure",不要包含任何其他解释文字。
  345. """
  346. def valid_db_conn_str(self, conn_str):
  347. """
  348. 验证数据库连接字符串是否符合规则
  349. 参数:
  350. conn_str: 要验证的数据库连接信息(JSON格式)
  351. 返回:
  352. "success" 或 "failure"
  353. """
  354. prompt = self._optimize_connstr_valid_prompt()
  355. payload = {
  356. "model": self.model_name,
  357. "messages": [
  358. {
  359. "role": "system",
  360. "content": "你是一个专业的数据库连接信息验证专家,擅长验证数据库连接信息的完整性和正确性。"
  361. },
  362. {
  363. "role": "user",
  364. "content": f"{prompt}\n\n{json.dumps(conn_str, ensure_ascii=False)}"
  365. }
  366. ]
  367. }
  368. try:
  369. result = self._make_llm_request(payload, "连接字符串验证")
  370. if not result:
  371. logger.error(f"连接字符串验证失败: 在{self.max_retries}次尝试后仍然失败")
  372. return "failure"
  373. if "choices" in result and len(result["choices"]) > 0:
  374. content = result["choices"][0]["message"]["content"].strip().lower()
  375. return "success" if content == "success" else "failure"
  376. return "failure"
  377. except Exception as e:
  378. logger.error(f"LLM 验证数据库连接字符串失败: {str(e)}")
  379. return "failure"