ddl_parser.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import os
  2. import requests
  3. import re
  4. import json
  5. import logging
  6. import time
  7. from flask import current_app
  8. logger = logging.getLogger(__name__)
  9. class DDLParser:
  10. def __init__(self, api_key=None, timeout=60, max_retries=3):
  11. """
  12. 初始化DDL解析器
  13. 参数:
  14. api_key: LLM API密钥,如果未提供,将从应用配置或环境变量中获取
  15. timeout: API请求超时时间(秒),默认60秒
  16. max_retries: 最大重试次数,默认3次
  17. """
  18. # 如果在Flask应用上下文中,则从应用配置获取参数
  19. self.api_key = api_key or current_app.config.get('LLM_API_KEY')
  20. self.base_url = current_app.config.get('LLM_BASE_URL')
  21. self.model_name = current_app.config.get('LLM_MODEL_NAME')
  22. self.timeout = timeout
  23. self.max_retries = max_retries
  24. self.headers = {
  25. "Authorization": f"Bearer {self.api_key}",
  26. "Content-Type": "application/json"
  27. }
  28. def _make_llm_request(self, payload, operation_name="LLM请求"):
  29. """
  30. 发送LLM请求,支持自动重试
  31. 参数:
  32. payload: 请求payload
  33. operation_name: 操作名称,用于日志
  34. 返回:
  35. API响应结果
  36. """
  37. last_error = None
  38. for attempt in range(self.max_retries):
  39. try:
  40. if attempt > 0:
  41. wait_time = 2 ** attempt # 指数退避: 2, 4, 8秒
  42. logger.info(f"{operation_name} 第{attempt + 1}次重试,等待{wait_time}秒...")
  43. time.sleep(wait_time)
  44. logger.info(f"{operation_name} 尝试 {attempt + 1}/{self.max_retries},超时时间: {self.timeout}秒")
  45. response = requests.post(
  46. f"{self.base_url}/chat/completions",
  47. headers=self.headers,
  48. json=payload,
  49. timeout=self.timeout
  50. )
  51. response.raise_for_status()
  52. result = response.json()
  53. logger.info(f"{operation_name} 成功")
  54. return result
  55. except requests.Timeout as e:
  56. last_error = f"请求超时(超过{self.timeout}秒): {str(e)}"
  57. logger.warning(f"{operation_name} 超时: {str(e)}")
  58. except requests.RequestException as e:
  59. last_error = f"API请求失败: {str(e)}"
  60. logger.warning(f"{operation_name} 失败: {str(e)}")
  61. except Exception as e:
  62. last_error = f"未知错误: {str(e)}"
  63. logger.error(f"{operation_name} 异常: {str(e)}")
  64. break # 对于非网络错误,不重试
  65. # 所有重试都失败
  66. logger.error(f"{operation_name} 在{self.max_retries}次尝试后失败: {last_error}")
  67. return None
  68. def parse_ddl(self, sql_content):
  69. """
  70. 解析DDL语句,返回标准化的结构
  71. 参数:
  72. sql_content: 要解析的DDL语句
  73. 返回:
  74. 解析结果的JSON对象
  75. """
  76. prompt = self._optimize_ddl_prompt()
  77. payload = {
  78. "model": self.model_name,
  79. "messages": [
  80. {
  81. "role": "system",
  82. "content": "你是一个专业的数据库分析专家,擅长解析SQL DDL语句并提取表结构信息。"
  83. },
  84. {
  85. "role": "user",
  86. "content": f"{prompt}\n\n{sql_content}"
  87. }
  88. ]
  89. }
  90. try:
  91. result = self._make_llm_request(payload, "DDL解析")
  92. if not result:
  93. return {
  94. "code": 500,
  95. "message": f"API请求失败: 在{self.max_retries}次尝试后仍然失败"
  96. }
  97. if "choices" in result and len(result["choices"]) > 0:
  98. content = result["choices"][0]["message"]["content"]
  99. try:
  100. json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
  101. if json_match:
  102. json_content = json_match.group(1)
  103. else:
  104. json_content = content
  105. parsed_result = json.loads(json_content)
  106. return parsed_result
  107. except json.JSONDecodeError as e:
  108. return {
  109. "code": 500,
  110. "message": f"无法解析返回的JSON: {str(e)}",
  111. "original_response": content
  112. }
  113. return {
  114. "code": 500,
  115. "message": "无法获取有效响应",
  116. "original_response": result
  117. }
  118. except Exception as e:
  119. logger.error(f"DDL解析异常: {str(e)}")
  120. return {
  121. "code": 500,
  122. "message": f"解析失败: {str(e)}"
  123. }
  124. def parse_db_conn_str(self, conn_str):
  125. """
  126. 解析数据库连接字符串
  127. 参数:
  128. conn_str: 要解析的数据库连接字符串
  129. 返回:
  130. 解析结果的JSON对象
  131. """
  132. prompt = self._optimize_connstr_parse_prompt()
  133. payload = {
  134. "model": self.model_name,
  135. "messages": [
  136. {
  137. "role": "system",
  138. "content": "你是一个专业的数据库连接字符串解析专家,擅长解析各种数据库的连接字符串并提取关键信息。"
  139. },
  140. {
  141. "role": "user",
  142. "content": f"{prompt}\n\n{conn_str}"
  143. }
  144. ]
  145. }
  146. try:
  147. result = self._make_llm_request(payload, "连接字符串解析")
  148. if not result:
  149. return {
  150. "code": 500,
  151. "message": f"API请求失败: 在{self.max_retries}次尝试后仍然失败"
  152. }
  153. if "choices" in result and len(result["choices"]) > 0:
  154. content = result["choices"][0]["message"]["content"]
  155. try:
  156. json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
  157. if json_match:
  158. json_content = json_match.group(1)
  159. else:
  160. json_content = content
  161. parsed_result = json.loads(json_content)
  162. return parsed_result
  163. except json.JSONDecodeError as e:
  164. return {
  165. "code": 500,
  166. "message": f"无法解析返回的JSON: {str(e)}",
  167. "original_response": content
  168. }
  169. return {
  170. "code": 500,
  171. "message": "无法获取有效响应",
  172. "original_response": result
  173. }
  174. except Exception as e:
  175. logger.error(f"连接字符串解析异常: {str(e)}")
  176. return {
  177. "code": 500,
  178. "message": f"解析失败: {str(e)}"
  179. }
  180. def _optimize_ddl_prompt(self):
  181. """返回优化后的提示词模板"""
  182. return """
  183. 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
  184. 规则说明:
  185. 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
  186. 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
  187. - 中文表名中不要出现标点符号
  188. - 表中的字段对应输出json中的meta对象,en_name对应表的字段名,data_type对应表的字段类型.
  189. 3. 返回结果的中文名称(name)的确定规则:
  190. - 对于COMMENT注释,直接使用注释内容作为name
  191. - 如sql中无注释但字段名en_name有明确含义,将英文名en_name翻译为中文
  192. - 如字段名en_name是无意义的拼音缩写,则name为空字符串
  193. - 中文字段名name中不要出现逗号,以及"主键"、"外键"、"索引"等字样
  194. 4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。
  195. - 对于每个表的字段都要检查它的en_name和name,name不能为空,首选字段的注释,如果没有注释,则尝试翻译en_name作为name。
  196. 5. 忽略sql文件中除了表的定义和注释信息COMMIT以外的内容。比如,忽略sql中的数据库的连接字符串。
  197. 6. 参考格式如下:
  198. {
  199. "users_table": { //表名
  200. "name_zh": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name_zh为空字符串
  201. "schema": "public",
  202. "meta": [{
  203. "name_en": "id", //表的字段名
  204. "data_type": "integer", //表的字段类型
  205. "name_zh": "用户ID" //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name_zh为空字符串
  206. },
  207. {
  208. "name_en": "username",
  209. "data_type": "varchar",
  210. "name_zh": "用户名"
  211. }
  212. ]
  213. }
  214. }
  215. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  216. """
  217. def _optimize_ddl_source_prompt(self):
  218. """返回优化后的提示词模板"""
  219. return """
  220. 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
  221. 规则说明:
  222. 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
  223. 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
  224. - 中文表名中不要出现标点符号
  225. 3. 字段中文名称(name_zh)的确定规则:
  226. - 如有COMMENT注释,直接使用注释内容
  227. - 如无注释但字段名有明确含义,将英文名翻译为中文
  228. - 如字段名是无意义的拼音缩写,则name_zh为空字符串
  229. - 字段名中不要出现逗号,以及"主键"、"外键"、"索引"等字样
  230. 4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。
  231. 5. data_source对象,请放在data_source标签中,它与tables对象同级。
  232. 6. 数据库连接串处理:
  233. - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
  234. - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2/sybase
  235. - data_source.name_en格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加.
  236. - data_source.name_zh留空.
  237. - 无法确定数据库类型时,type设为"unknown"
  238. - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签
  239. - 除了database,password,username,name_en,host,port,type,name_zh 之外,连接串的其它字段放在param属性中。
  240. 7. 参考格式如下:
  241. {
  242. "tables": {
  243. "users": { //表名
  244. "name_zh": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name_zh为空字符串
  245. "schema": "public",
  246. "meta": [{
  247. "name_en": "id",
  248. "data_type": "integer",
  249. "name_zh": "用户ID"
  250. },
  251. {
  252. "name_en": "username",
  253. "data_type": "varchar",
  254. "name_zh": "用户名"
  255. }
  256. ]
  257. }
  258. },
  259. "data_source": [{
  260. "name_en": "mydatabase_10.52.31.104_5432_myuser", //{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}
  261. "name_zh": "", //如果没有注释,这里留空
  262. "type": "postgresql",
  263. "host": "10.52.31.104",
  264. "port": 5432,
  265. "database": "mydatabase",
  266. "username": "myuser",
  267. "password": "mypassword",
  268. "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
  269. }]
  270. }
  271. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  272. """
  273. def _optimize_connstr_parse_prompt(self):
  274. """返回优化后的连接字符串解析提示词模板"""
  275. return """
  276. 请解析以下数据库连接字符串,并按照指定的JSON格式返回结果:
  277. 规则说明:
  278. 1. 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
  279. 2. 根据连接串格式识别数据库类型,数据库类型请使用小写,如 mysql/postgresql/sqlserver/oracle/db2/sybase
  280. 3. data_source.name_en格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加
  281. 4. data_source.name_zh留空
  282. 5. 无法确定数据库类型时,type设为"unknown"
  283. 6. 除了database,password,username,name_en,host,port,type,name_zh 之外,连接串的其它字段放在param属性中
  284. 返回格式示例:
  285. {
  286. "data_source": {
  287. "name_en": "mydatabase_10.52.31.104_5432_myuser",
  288. "name_zh": "",
  289. "type": "postgresql",
  290. "host": "10.52.31.104",
  291. "port": 5432,
  292. "database": "mydatabase",
  293. "username": "myuser",
  294. "password": "mypassword",
  295. "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
  296. }
  297. }
  298. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  299. """
  300. def _optimize_connstr_valid_prompt(self):
  301. """返回优化后的连接字符串验证提示词模板"""
  302. return """
  303. 请验证以下数据库连接信息是否符合规则:
  304. 规则说明:
  305. 1. 必填字段检查:
  306. - database: 数据库名称,不能为空,符合数据库名称的命名规范。
  307. - name_en: 格式必须为 "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}"
  308. - host: 主机名或IP地址,不能为空
  309. - port: 端口号,必须为数字
  310. - type: 数据库类型,必须为以下之一:mysql/postgresql/sqlserver/oracle/db2/sybase
  311. - username: 用户名,不能为空,名称中间不能有空格。
  312. 2. 字段格式检查:
  313. - en_name中的各个部分必须与对应的字段值匹配
  314. - port必须是有效的端口号(1-65535)
  315. - type必须是小写的数据库类型名称
  316. - param中的参数格式必须正确(key=value格式)
  317. 3. 可选字段:
  318. - password: 密码(可选)
  319. - name: 中文名称(可选)
  320. - desc: 描述(可选)
  321. 请检查提供的连接信息是否符合以上规则,如果符合则返回"success",否则返回"failure"。
  322. 请仅返回"success"或"failure",不要包含任何其他解释文字。
  323. """
  324. def valid_db_conn_str(self, conn_str):
  325. """
  326. 验证数据库连接字符串是否符合规则
  327. 参数:
  328. conn_str: 要验证的数据库连接信息(JSON格式)
  329. 返回:
  330. "success" 或 "failure"
  331. """
  332. prompt = self._optimize_connstr_valid_prompt()
  333. payload = {
  334. "model": self.model_name,
  335. "messages": [
  336. {
  337. "role": "system",
  338. "content": "你是一个专业的数据库连接信息验证专家,擅长验证数据库连接信息的完整性和正确性。"
  339. },
  340. {
  341. "role": "user",
  342. "content": f"{prompt}\n\n{json.dumps(conn_str, ensure_ascii=False)}"
  343. }
  344. ]
  345. }
  346. try:
  347. result = self._make_llm_request(payload, "连接字符串验证")
  348. if not result:
  349. logger.error(f"连接字符串验证失败: 在{self.max_retries}次尝试后仍然失败")
  350. return "failure"
  351. if "choices" in result and len(result["choices"]) > 0:
  352. content = result["choices"][0]["message"]["content"].strip().lower()
  353. return "success" if content == "success" else "failure"
  354. return "failure"
  355. except Exception as e:
  356. logger.error(f"LLM 验证数据库连接字符串失败: {str(e)}")
  357. return "failure"