ddl_parser.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. import os
  2. import requests
  3. import re
  4. import json
  5. import logging
  6. from flask import current_app
  7. logger = logging.getLogger(__name__)
  8. class DDLParser:
  9. def __init__(self, api_key=None):
  10. """
  11. 初始化DDL解析器
  12. 参数:
  13. api_key: LLM API密钥,如果未提供,将从应用配置或环境变量中获取
  14. """
  15. # 如果在Flask应用上下文中,则从应用配置获取参数
  16. self.api_key = api_key or current_app.config.get('LLM_API_KEY')
  17. self.base_url = current_app.config.get('LLM_BASE_URL')
  18. self.model_name = current_app.config.get('LLM_MODEL_NAME')
  19. self.headers = {
  20. "Authorization": f"Bearer {self.api_key}",
  21. "Content-Type": "application/json"
  22. }
  23. def parse_ddl(self, sql_content):
  24. """
  25. 解析DDL语句,返回标准化的结构
  26. 参数:
  27. sql_content: 要解析的DDL语句
  28. 返回:
  29. 解析结果的JSON对象
  30. """
  31. prompt = self._optimize_ddl_prompt()
  32. payload = {
  33. "model": self.model_name,
  34. "messages": [
  35. {
  36. "role": "system",
  37. "content": "你是一个专业的数据库分析专家,擅长解析SQL DDL语句并提取表结构信息。"
  38. },
  39. {
  40. "role": "user",
  41. "content": f"{prompt}\n\n{sql_content}"
  42. }
  43. ]
  44. }
  45. try:
  46. response = requests.post(
  47. f"{self.base_url}/chat/completions",
  48. headers=self.headers,
  49. json=payload,
  50. timeout=30
  51. )
  52. response.raise_for_status()
  53. result = response.json()
  54. if "choices" in result and len(result["choices"]) > 0:
  55. content = result["choices"][0]["message"]["content"]
  56. try:
  57. json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
  58. if json_match:
  59. json_content = json_match.group(1)
  60. else:
  61. json_content = content
  62. parsed_result = json.loads(json_content)
  63. return parsed_result
  64. except json.JSONDecodeError as e:
  65. return {
  66. "code": 500,
  67. "message": f"无法解析返回的JSON: {str(e)}",
  68. "original_response": content
  69. }
  70. return {
  71. "code": 500,
  72. "message": "无法获取有效响应",
  73. "original_response": result
  74. }
  75. except requests.RequestException as e:
  76. return {
  77. "code": 500,
  78. "message": f"API请求失败: {str(e)}"
  79. }
  80. def parse_db_conn_str(self, conn_str):
  81. """
  82. 解析数据库连接字符串
  83. 参数:
  84. conn_str: 要解析的数据库连接字符串
  85. 返回:
  86. 解析结果的JSON对象
  87. """
  88. prompt = self._optimize_connstr_parse_prompt()
  89. payload = {
  90. "model": self.model_name,
  91. "messages": [
  92. {
  93. "role": "system",
  94. "content": "你是一个专业的数据库连接字符串解析专家,擅长解析各种数据库的连接字符串并提取关键信息。"
  95. },
  96. {
  97. "role": "user",
  98. "content": f"{prompt}\n\n{conn_str}"
  99. }
  100. ]
  101. }
  102. try:
  103. response = requests.post(
  104. f"{self.base_url}/chat/completions",
  105. headers=self.headers,
  106. json=payload,
  107. timeout=30
  108. )
  109. response.raise_for_status()
  110. result = response.json()
  111. if "choices" in result and len(result["choices"]) > 0:
  112. content = result["choices"][0]["message"]["content"]
  113. try:
  114. json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content)
  115. if json_match:
  116. json_content = json_match.group(1)
  117. else:
  118. json_content = content
  119. parsed_result = json.loads(json_content)
  120. return parsed_result
  121. except json.JSONDecodeError as e:
  122. return {
  123. "code": 500,
  124. "message": f"无法解析返回的JSON: {str(e)}",
  125. "original_response": content
  126. }
  127. return {
  128. "code": 500,
  129. "message": "无法获取有效响应",
  130. "original_response": result
  131. }
  132. except requests.RequestException as e:
  133. return {
  134. "code": 500,
  135. "message": f"API请求失败: {str(e)}"
  136. }
  137. def _optimize_ddl_prompt(self):
  138. """返回优化后的提示词模板"""
  139. return """
  140. 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
  141. 规则说明:
  142. 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
  143. 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
  144. - 中文表名中不要出现标点符号
  145. - 表中的字段对应输出json中的meta对象,en_name对应表的字段名,data_type对应表的字段类型.
  146. 3. 返回结果的中文名称(name)的确定规则:
  147. - 对于COMMENT注释,直接使用注释内容作为name
  148. - 如sql中无注释但字段名en_name有明确含义,将英文名en_name翻译为中文
  149. - 如字段名en_name是无意义的拼音缩写,则name为空字符串
  150. - 中文字段名name中不要出现逗号,以及"主键"、"外键"、"索引"等字样
  151. 4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。
  152. - 对于每个表的字段都要检查它的en_name和name,name不能为空,首选字段的注释,如果没有注释,则尝试翻译en_name作为name。
  153. 5. 忽略sql文件中除了表的定义和注释信息COMMIT以外的内容。比如,忽略sql中的数据库的连接字符串。
  154. 6. 参考格式如下:
  155. {
  156. "users_table": { //表名
  157. "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
  158. "schema": "public",
  159. "meta": [{
  160. "en_name": "id", //表的字段名
  161. "data_type": "integer", //表的字段类型
  162. "name": "用户ID" //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
  163. },
  164. {
  165. "en_name": "username",
  166. "data_type": "varchar",
  167. "name": "用户名"
  168. }
  169. ]
  170. }
  171. }
  172. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  173. """
  174. def _optimize_ddl_source_prompt(self):
  175. """返回优化后的提示词模板"""
  176. return """
  177. 请解析以下DDL建表语句,并按照指定的JSON格式返回结果:
  178. 规则说明:
  179. 1. 从DDL语句中识别所有表名,并在data对象中为每个表创建条目,表名请使用小写,可能会有多个表。
  180. 2. 对于每个表,提取所有字段信息,包括名称、数据类型和注释。
  181. - 中文表名中不要出现标点符号
  182. 3. 字段中文名称(name)的确定规则:
  183. - 如有COMMENT注释,直接使用注释内容
  184. - 如无注释但字段名有明确含义,将英文名翻译为中文
  185. - 如字段名是无意义的拼音缩写,则name为空字符串
  186. - 字段名中不要出现逗号,以及"主键"、"外键"、"索引"等字样
  187. 4. 所有的表的定义信息,请放在tables对象中, tables对象的key为表名,value为表的定义信息。这里可能会有多个表,请一一识别。
  188. 5. data_source对象,请放在data_source标签中,它与tables对象同级。
  189. 6. 数据库连接串处理:
  190. - 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
  191. - 根据连接串格式识别数据库类型,数据库类型请使用小写,参考例子,如 mysql/postgresql/sqlserver/oracle/db2/sybase
  192. - data_source.en_name格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加.
  193. - data_source.name留空.
  194. - 无法确定数据库类型时,type设为"unknown"
  195. - 如果从ddl中没有识别到数据库连接串,则json不返回"data_source"标签
  196. - 除了database,password,username,en_name,host,port,type,name 之外,连接串的其它字段放在param属性中。
  197. 7. 参考格式如下:
  198. {
  199. "tables": {
  200. "users": { //表名
  201. "name": "用户表", //表的中文名,来自于COMMENT注释或LLM翻译,如果无法确定,则name为空字符串
  202. "schema": "public",
  203. "meta": [{
  204. "en_name": "id",
  205. "data_type": "integer",
  206. "name": "用户ID"
  207. },
  208. {
  209. "en_name": "username",
  210. "data_type": "varchar",
  211. "name": "用户名"
  212. }
  213. ]
  214. }
  215. },
  216. "data_source": [{
  217. "en_name": "mydatabase_10.52.31.104_5432_myuser", //{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}
  218. "name": "", //如果没有注释,这里留空
  219. "type": "postgresql",
  220. "host": "10.52.31.104",
  221. "port": 5432,
  222. "database": "mydatabase",
  223. "username": "myuser",
  224. "password": "mypassword",
  225. "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
  226. }]
  227. }
  228. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  229. """
  230. def _optimize_connstr_parse_prompt(self):
  231. """返回优化后的连接字符串解析提示词模板"""
  232. return """
  233. 请解析以下数据库连接字符串,并按照指定的JSON格式返回结果:
  234. 规则说明:
  235. 1. 将连接串识别后并拆解为:主机名/IP地址、端口、数据库名称、用户名、密码。
  236. 2. 根据连接串格式识别数据库类型,数据库类型请使用小写,如 mysql/postgresql/sqlserver/oracle/db2/sybase
  237. 3. data_source.en_name格式为: "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}",如某个元素无法识别,则跳过不添加
  238. 4. data_source.name留空
  239. 5. 无法确定数据库类型时,type设为"unknown"
  240. 6. 除了database,password,username,en_name,host,port,type,name 之外,连接串的其它字段放在param属性中
  241. 返回格式示例:
  242. {
  243. "data_source": {
  244. "en_name": "mydatabase_10.52.31.104_5432_myuser",
  245. "name": "",
  246. "type": "postgresql",
  247. "host": "10.52.31.104",
  248. "port": 5432,
  249. "database": "mydatabase",
  250. "username": "myuser",
  251. "password": "mypassword",
  252. "param": "useUnicode=true&characterEncoding=utf8&serverTimezone=UTC"
  253. }
  254. }
  255. 请仅返回JSON格式结果,不要包含任何其他解释文字。
  256. """
  257. def _optimize_connstr_valid_prompt(self):
  258. """返回优化后的连接字符串验证提示词模板"""
  259. return """
  260. 请验证以下数据库连接信息是否符合规则:
  261. 规则说明:
  262. 1. 必填字段检查:
  263. - database: 数据库名称,不能为空,符合数据库名称的命名规范。
  264. - en_name: 格式必须为 "{数据库名称}_{hostname或ip地址}_{端口}_{数据库用户名}"
  265. - host: 主机名或IP地址,不能为空
  266. - port: 端口号,必须为数字
  267. - type: 数据库类型,必须为以下之一:mysql/postgresql/sqlserver/oracle/db2/sybase
  268. - username: 用户名,不能为空,名称中间不能有空格。
  269. 2. 字段格式检查:
  270. - en_name中的各个部分必须与对应的字段值匹配
  271. - port必须是有效的端口号(1-65535)
  272. - type必须是小写的数据库类型名称
  273. - param中的参数格式必须正确(key=value格式)
  274. 3. 可选字段:
  275. - password: 密码(可选)
  276. - name: 中文名称(可选)
  277. - desc: 描述(可选)
  278. 请检查提供的连接信息是否符合以上规则,如果符合则返回"success",否则返回"failure"。
  279. 请仅返回"success"或"failure",不要包含任何其他解释文字。
  280. """
  281. def valid_db_conn_str(self, conn_str):
  282. """
  283. 验证数据库连接字符串是否符合规则
  284. 参数:
  285. conn_str: 要验证的数据库连接信息(JSON格式)
  286. 返回:
  287. "success" 或 "failure"
  288. """
  289. prompt = self._optimize_connstr_valid_prompt()
  290. payload = {
  291. "model": self.model_name,
  292. "messages": [
  293. {
  294. "role": "system",
  295. "content": "你是一个专业的数据库连接信息验证专家,擅长验证数据库连接信息的完整性和正确性。"
  296. },
  297. {
  298. "role": "user",
  299. "content": f"{prompt}\n\n{json.dumps(conn_str, ensure_ascii=False)}"
  300. }
  301. ]
  302. }
  303. try:
  304. response = requests.post(
  305. f"{self.base_url}/chat/completions",
  306. headers=self.headers,
  307. json=payload,
  308. timeout=30
  309. )
  310. response.raise_for_status()
  311. result = response.json()
  312. if "choices" in result and len(result["choices"]) > 0:
  313. content = result["choices"][0]["message"]["content"].strip().lower()
  314. return "success" if content == "success" else "failure"
  315. return "failure"
  316. except Exception as e:
  317. logger.error(f"LLM 验证数据库连接字符串失败: {str(e)}")
  318. return "failure"