embedding_function.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. import requests
  2. import time
  3. import numpy as np
  4. from typing import List, Callable
  5. from core.logging import get_vanna_logger
  6. class EmbeddingFunction:
  7. def __init__(self, model_name: str, api_key: str, base_url: str, embedding_dimension: int):
  8. self.model_name = model_name
  9. self.api_key = api_key
  10. self.base_url = base_url
  11. self.embedding_dimension = embedding_dimension
  12. self.headers = {
  13. "Authorization": f"Bearer {api_key}",
  14. "Content-Type": "application/json"
  15. }
  16. self.max_retries = 3 # 设置默认的最大重试次数
  17. self.retry_interval = 2 # 设置默认的重试间隔秒数
  18. self.normalize_embeddings = True # 设置默认是否归一化
  19. # 初始化日志
  20. self.logger = get_vanna_logger("EmbeddingFunction")
  21. def _normalize_vector(self, vector: List[float]) -> List[float]:
  22. """
  23. 对向量进行L2归一化
  24. Args:
  25. vector: 输入向量
  26. Returns:
  27. List[float]: 归一化后的向量
  28. """
  29. if not vector:
  30. return []
  31. norm = np.linalg.norm(vector)
  32. if norm == 0:
  33. return vector
  34. return (np.array(vector) / norm).tolist()
  35. def __call__(self, input) -> List[List[float]]:
  36. """
  37. 为文本列表生成嵌入向量
  38. Args:
  39. input: 要嵌入的文本或文本列表
  40. Returns:
  41. List[List[float]]: 嵌入向量列表
  42. """
  43. if not isinstance(input, list):
  44. input = [input]
  45. embeddings = []
  46. for text in input:
  47. # 直接调用generate_embedding,让它处理异常
  48. try:
  49. vector = self.generate_embedding(text)
  50. embeddings.append(vector)
  51. except Exception as e:
  52. self.logger.error(f"为文本 '{text}' 生成embedding失败: {e}")
  53. # 重新抛出异常,不返回零向量
  54. raise e
  55. return embeddings
  56. def embed_documents(self, texts: List[str]) -> List[List[float]]:
  57. """
  58. 为文档列表生成嵌入向量 (LangChain 接口)
  59. Args:
  60. texts: 要嵌入的文档列表
  61. Returns:
  62. List[List[float]]: 嵌入向量列表
  63. """
  64. return self.__call__(texts)
  65. def embed_query(self, text: str) -> List[float]:
  66. """
  67. 为查询文本生成嵌入向量 (LangChain 接口)
  68. Args:
  69. text: 要嵌入的查询文本
  70. Returns:
  71. List[float]: 嵌入向量
  72. """
  73. return self.generate_embedding(text)
  74. def generate_embedding(self, text: str) -> List[float]:
  75. """
  76. 为单个文本生成嵌入向量
  77. Args:
  78. text (str): 要嵌入的文本
  79. Returns:
  80. List[float]: 嵌入向量
  81. """
  82. # 处理空文本
  83. if not text or len(text.strip()) == 0:
  84. # 空文本返回零向量是合理的行为
  85. if self.embedding_dimension is None:
  86. raise ValueError("Embedding dimension (self.embedding_dimension) 未被正确初始化。")
  87. return [0.0] * self.embedding_dimension
  88. # 准备请求体
  89. payload = {
  90. "model": self.model_name,
  91. "input": text,
  92. "encoding_format": "float"
  93. }
  94. # 添加重试机制
  95. retries = 0
  96. while retries <= self.max_retries:
  97. try:
  98. # 发送API请求
  99. url = self.base_url
  100. if not url.endswith("/embeddings"):
  101. url = url.rstrip("/") # 移除尾部斜杠,避免双斜杠
  102. if not url.endswith("/v1/embeddings"):
  103. url = f"{url}/embeddings"
  104. response = requests.post(
  105. url,
  106. json=payload,
  107. headers=self.headers,
  108. timeout=30 # 设置超时时间
  109. )
  110. # 检查响应状态
  111. if response.status_code != 200:
  112. error_msg = f"API请求错误: {response.status_code}, {response.text}"
  113. # 根据错误码判断是否需要重试
  114. if response.status_code in (429, 500, 502, 503, 504):
  115. retries += 1
  116. if retries <= self.max_retries:
  117. wait_time = self.retry_interval * (2 ** (retries - 1)) # 指数退避
  118. self.logger.warning(f"API请求失败,等待 {wait_time} 秒后重试 ({retries}/{self.max_retries})")
  119. time.sleep(wait_time)
  120. continue
  121. raise ValueError(error_msg)
  122. # 解析响应
  123. result = response.json()
  124. # 提取embedding向量
  125. if "data" in result and len(result["data"]) > 0 and "embedding" in result["data"][0]:
  126. vector = result["data"][0]["embedding"]
  127. # 如果是首次调用且未提供维度,则自动设置
  128. if self.embedding_dimension is None:
  129. self.embedding_dimension = len(vector)
  130. else:
  131. # 验证向量维度
  132. actual_dim = len(vector)
  133. if actual_dim != self.embedding_dimension:
  134. self.logger.warning(f"向量维度不匹配: 期望 {self.embedding_dimension}, 实际 {actual_dim}")
  135. # 如果需要归一化
  136. if self.normalize_embeddings:
  137. vector = self._normalize_vector(vector)
  138. # 添加成功生成embedding的debug日志
  139. self.logger.debug(f"成功生成embedding向量,维度: {len(vector)}")
  140. return vector
  141. else:
  142. error_msg = f"API返回格式异常: {result}"
  143. raise ValueError(error_msg)
  144. except Exception as e:
  145. retries += 1
  146. if retries <= self.max_retries:
  147. wait_time = self.retry_interval * (2 ** (retries - 1)) # 指数退避
  148. self.logger.warning(f"生成embedding时出错: {str(e)}, 等待 {wait_time} 秒后重试 ({retries}/{self.max_retries})")
  149. time.sleep(wait_time)
  150. else:
  151. # 抛出异常而不是返回零向量,确保问题不被掩盖
  152. raise RuntimeError(f"生成embedding失败,已重试{self.max_retries}次: {str(e)}")
  153. # 这里不应该到达,但为了完整性添加
  154. raise RuntimeError("生成embedding失败")
  155. def test_connection(self, test_text="测试文本") -> dict:
  156. """
  157. 测试嵌入模型的连接和功能
  158. Args:
  159. test_text (str): 用于测试的文本
  160. Returns:
  161. dict: 包含测试结果的字典,包括是否成功、维度信息等
  162. """
  163. result = {
  164. "success": False,
  165. "model": self.model_name,
  166. "base_url": self.base_url,
  167. "message": "",
  168. "actual_dimension": None,
  169. "expected_dimension": self.embedding_dimension
  170. }
  171. try:
  172. self.logger.info(f"测试嵌入模型连接 - 模型: {self.model_name}")
  173. self.logger.info(f"API服务地址: {self.base_url}")
  174. # 验证配置
  175. if not self.api_key:
  176. result["message"] = "API密钥未设置或为空"
  177. return result
  178. if not self.base_url:
  179. result["message"] = "API服务地址未设置或为空"
  180. return result
  181. # 测试生成向量
  182. vector = self.generate_embedding(test_text)
  183. actual_dimension = len(vector)
  184. result["success"] = True
  185. result["actual_dimension"] = actual_dimension
  186. # 检查维度是否一致
  187. if actual_dimension != self.embedding_dimension:
  188. result["message"] = f"警告: 模型实际生成的向量维度({actual_dimension})与配置维度({self.embedding_dimension})不一致"
  189. else:
  190. result["message"] = f"连接测试成功,向量维度: {actual_dimension}"
  191. return result
  192. except Exception as e:
  193. result["message"] = f"连接测试失败: {str(e)}"
  194. return result
  195. def test_embedding_connection() -> dict:
  196. """
  197. 测试嵌入模型连接和配置是否正确
  198. Returns:
  199. dict: 测试结果,包括成功/失败状态、错误消息等
  200. """
  201. logger = get_vanna_logger("EmbeddingTest")
  202. try:
  203. # 获取嵌入函数实例
  204. embedding_function = get_embedding_function()
  205. # 测试连接
  206. test_result = embedding_function.test_connection()
  207. if test_result["success"]:
  208. logger.info(f"嵌入模型连接测试成功!")
  209. if "警告" in test_result["message"]:
  210. logger.warning(test_result["message"])
  211. logger.warning(f"建议将app_config.py中的EMBEDDING_CONFIG['embedding_dimension']修改为{test_result['actual_dimension']}")
  212. else:
  213. logger.error(f"嵌入模型连接测试失败: {test_result['message']}")
  214. return test_result
  215. except Exception as e:
  216. error_message = f"无法测试嵌入模型连接: {str(e)}"
  217. logger.error(error_message)
  218. return {
  219. "success": False,
  220. "message": error_message
  221. }
  222. def get_embedding_function():
  223. """
  224. 根据当前配置创建合适的EmbeddingFunction实例
  225. 支持API和Ollama两种提供商
  226. Returns:
  227. EmbeddingFunction或OllamaEmbeddingFunction: 根据配置类型返回相应的实例
  228. Raises:
  229. ImportError: 如果无法导入必要的模块
  230. ValueError: 如果配置无效
  231. """
  232. try:
  233. from common.utils import get_current_embedding_config, is_using_ollama_embedding
  234. except ImportError:
  235. raise ImportError("无法导入 common.utils,请确保该文件存在")
  236. # 获取当前embedding配置
  237. embedding_config = get_current_embedding_config()
  238. if is_using_ollama_embedding():
  239. # 使用Ollama Embedding
  240. try:
  241. from customembedding.ollama_embedding import OllamaEmbeddingFunction
  242. except ImportError:
  243. raise ImportError("无法导入 OllamaEmbeddingFunction,请确保 customembedding 包存在")
  244. return OllamaEmbeddingFunction(
  245. model_name=embedding_config["model_name"],
  246. base_url=embedding_config["base_url"],
  247. embedding_dimension=embedding_config["embedding_dimension"]
  248. )
  249. else:
  250. # 使用API Embedding
  251. try:
  252. api_key = embedding_config["api_key"]
  253. model_name = embedding_config["model_name"]
  254. base_url = embedding_config["base_url"]
  255. embedding_dimension = embedding_config["embedding_dimension"]
  256. if api_key is None:
  257. raise KeyError("API模式下 'api_key' 未设置 (可能环境变量 EMBEDDING_API_KEY 未定义)")
  258. except KeyError as e:
  259. raise KeyError(f"API Embedding配置中缺少必要的键或值无效:{e}")
  260. return EmbeddingFunction(
  261. model_name=model_name,
  262. api_key=api_key,
  263. base_url=base_url,
  264. embedding_dimension=embedding_dimension
  265. )