# from langchain_core.runnables import ( # RunnableBranch, # RunnableLambda, # ) # from langchain_core.output_parsers import StrOutputParser # from langchain_core.runnables import RunnableParallel, RunnablePassthrough # from langchain.prompts import ( # ChatPromptTemplate, # ) # from typing import List, Tuple # from langchain.prompts import PromptTemplate # from langchain_core.messages import AIMessage, HumanMessage # from qa_chain.get_vectordb import get_vectordb # from graph.graph_retrieval import connect, structured_retriever # from llm.llm import deepseek_llm # # from llm.llm import qwen_llm # # # class Chat_QA_chain_self: # """ # 带历史记录的问答链 # - model:调用的模型名称 # - temperature:温度系数,控制生成的随机性 # - top_k:返回检索的前k个相似文档 # - chat_history:历史记录,输入一个列表,默认是一个空列表 # - file_path:建库文件所在路径 # - persist_path:向量数据库持久化路径 # - embeddings:使用的embedding模型 # """ # # def __init__(self, temperature: float = 0.0, top_k: int = 4, chat_history: List[Tuple[str, str]] = [], # file_path: str = None, persist_path: str = None, embedding: str = "m3e"): # self.temperature = temperature # self.top_k = top_k # self.chat_history = chat_history # self.file_path = file_path # self.persist_path = persist_path # self.embedding = embedding # self.llm = deepseek_llm # self.vectordb = get_vectordb(self.file_path, self.persist_path, self.embedding) # self.graph = connect() # # def clear_chat_history(self): # """ # 清空历史记录 # :return: # """ # self.chat_history = [] # # print("Chat history has been cleared.") # # def add_to_chat_history(self, human_message: str, ai_message: str): # """ # 添加一条聊天记录到历史记录中 # :param human_message: 人类用户的消息 # :param ai_message: AI的回复消息 # :return: # """ # self.chat_history.append((human_message, ai_message)) # # def get_chat_history(self): # """ # 获取所有的聊天历史记录 # :return: 聊天历史记录列表 # """ # return self.chat_history # # # 原来的函数 # # def _format_chat_history(self, chat_history: List[Tuple[str, str]]) -> List: # # buffer = [] # # for human, ai in chat_history: # # buffer.append(HumanMessage(content=human)) # # buffer.append(AIMessage(content=ai)) # # buffer.append(chat_history) # # return buffer # # def _format_chat_history(self, chat_history: List[Tuple[str, str]]) -> List: # buffer = [] # for human, ai in chat_history: # buffer.append(HumanMessage(content=human)) # buffer.append(AIMessage(content=ai)) # return buffer # # def retriever(self, question: str): # # print(f"Search query: {question}") # structured_data = structured_retriever(self.llm, self.graph, question) # unstructured_data = self.vectordb.as_retriever(search_type="similarity", # search_kwargs={'k': self.top_k}) # 默认similarity,k=4 # final_data = f"""Unstructured data:{unstructured_data}\n # Structured data:{structured_data} # """ # # final_data = f"""Unstructured data:{unstructured_data}\n""" # # print(f"unstructured_data:{unstructured_data}") # return final_data # # # # def build_chain(self, question: str): # def build_chain(self): # llm = self.llm # # # Condense a chat history and follow-up question into a standalone question # _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, # in its original language. # Chat History: # {chat_history} # Follow Up Input: {question} # Standalone question:""" # noqa: E501 # CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template) # # _search_query = RunnableBranch( # # If input includes chat_history, we condense it with the follow-up question # ( # RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config( # run_name="HasChatHistoryCheck" # ), # Condense follow-up question and chat into a standalone_question # RunnablePassthrough.assign( # chat_history=lambda x: self._format_chat_history(x["chat_history"]) # ) # | CONDENSE_QUESTION_PROMPT # | llm # | StrOutputParser(), # ), # # Else, we have no chat history, so just pass through the question # RunnableLambda(lambda x: x["question"]), # ) # # template = """Answer the question based only on the following context: # {context} # # Question: {question} # Use natural language and be concise. # Answer:""" # prompt = ChatPromptTemplate.from_template(template) # # chain = ( # RunnableParallel( # { # "context": _search_query | self.retriever, # "question": RunnablePassthrough(), # } # ) # | prompt # | llm # | StrOutputParser() # ) # return chain from langchain_core.runnables import ( RunnableBranch, RunnableLambda, ) from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnableParallel, RunnablePassthrough from langchain.prompts import ( ChatPromptTemplate, ) from typing import List, Tuple from langchain.prompts import PromptTemplate from langchain_core.messages import AIMessage, HumanMessage from embedding.embedding import get_embedding from qa_chain.get_vectordb import get_vectordb from graph.graph_retrieval import connect, structured_retriever, text_structured_retriever from llm.llm import LLM class Chat_QA_chain_self: """ 带历史记录的问答链 - model:调用的模型名称 - temperature:温度系数,控制生成的随机性 - top_k:返回检索的前k个相似文档 - chat_history:历史记录,输入一个列表,默认是一个空列表 - file_path:建库文件所在路径 - persist_path:向量数据库持久化路径 - embeddings:使用的embedding模型 """ def __init__(self, temperature: float = 0.0, top_k: int = 2, chat_history: List[Tuple[str, str]] = [], file_path: str = None, persist_path: str = None, embedding: str = "m3e"): self.temperature = temperature self.top_k = top_k self.chat_history = chat_history self.file_path = file_path self.persist_path = persist_path self.embedding = get_embedding(embedding) self.llm_instance = LLM(model_name='qwen') self.llm = self.llm_instance.get_llm() self.vectordb = get_vectordb(self.file_path, self.persist_path, self.embedding) self.graph = connect() def clear_chat_history(self): """ 清空历史记录 :return: """ self.chat_history = [] # print("Chat history has been cleared.") def add_to_chat_history(self, human_message: str, ai_message: str): """ 添加一条聊天记录到历史记录中 :param human_message: 人类用户的消息 :param ai_message: AI的回复消息 :return: """ self.chat_history.append((human_message, ai_message)) def get_chat_history(self): """ 获取所有的聊天历史记录 :return: 聊天历史记录列表 """ return self.chat_history def _format_chat_history(self, chat_history: List[Tuple[str, str]]) -> List: buffer = [] for human, ai in chat_history: buffer.append(HumanMessage(content=human)) buffer.append(AIMessage(content=ai)) return buffer def retriever(self, question: str): # print(f"Search query: {question}") structured_data = structured_retriever(self.llm, self.graph, question) # unstructured_data = self.vectordb.as_retriever(search_type="similarity", # search_kwargs={'k': self.top_k}) # 默认similarity,k=4 unstructured_data = self.rag_retriever(question) final_data = f"""Unstructured data:{unstructured_data}\n Structured data:{structured_data} """ # final_data = f"""Unstructured data:{unstructured_data}\n""" # print(f"unstructured_data:{unstructured_data}") return final_data # 非结构化文本图谱+rag def text_retriever(self, question: str): # print(f"Search query: {question}") structured_data = text_structured_retriever(self.llm, self.graph, question) # unstructured_data = self.vectordb.as_retriever(search_type="similarity", # search_kwargs={'k': self.top_k}) # 默认similarity,k=4 unstructured_data = self.rag_retriever(question) final_data = f"""Structured data:{structured_data}\n Unstructured data:{unstructured_data}\n """ # final_data = f"""Unstructured data:{unstructured_data}\n""" print(f"final_data:{final_data}") return final_data # 单纯的rag def rag_retriever(self, question: str): # 获取与查询问题最相似的文档 # docs = self.vectordb.similarity_search(question, k=self.top_k) # docs = self.vectordb.max_marginal_relevance_search_by_vector(question) # 将文档内容拼接成一个字符串 # final_data = "\n".join([doc.page_content for doc in docs]) # print(f"unstructured_data:{final_data}") retriever = self.vectordb.as_retriever(search_type = 'mmr',search_kwargs = {'k':self.top_k}) docs = retriever.get_relevant_documents(question) final_data = "\n".join([doc.page_content for doc in docs]) return final_data def build_chain(self): llm = self.llm # Condense a chat history and follow-up question into a standalone question _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language. Chat History: {chat_history} Follow Up Input: {question} Standalone question:""" # noqa: E501 CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template) _search_query = RunnableBranch( # If input includes chat_history, we condense it with the follow-up question ( RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config( run_name="HasChatHistoryCheck" ), # Condense follow-up question and chat into a standalone_question RunnablePassthrough.assign( chat_history=lambda x: self._format_chat_history(x["chat_history"]) ) | CONDENSE_QUESTION_PROMPT | llm | StrOutputParser(), ), # Else, we have no chat history, so just pass through the question RunnableLambda(lambda x: x["question"]), ) template = """Answer the question based only on the following context: {context} Question: {question} Use natural language and be concise. Answer:""" prompt = ChatPromptTemplate.from_template(template) chain = ( RunnableParallel( { "context": _search_query | self.retriever, "question": RunnablePassthrough(), } ) | prompt | llm | StrOutputParser() ) return chain def build_rag_chain(self): llm = self.llm # Condense a chat history and follow-up question into a standalone question _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language. Chat History: {chat_history} Follow Up Input: {question} Standalone question:""" # noqa: E501 CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template) _search_query = RunnableBranch( # If input includes chat_history, we condense it with the follow-up question ( RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config( run_name="HasChatHistoryCheck" ), # Condense follow-up question and chat into a standalone_question RunnablePassthrough.assign( chat_history=lambda x: self._format_chat_history(x["chat_history"]) ) | CONDENSE_QUESTION_PROMPT | llm | StrOutputParser(), ), # Else, we have no chat history, so just pass through the question RunnableLambda(lambda x: x["question"]), ) template = """Answer the question based only on the following context: {context} Question: {question} Use natural language and be concise. Answer:""" prompt = ChatPromptTemplate.from_template(template) chain = ( RunnableParallel( { "context": _search_query | self.rag_retriever, "question": RunnablePassthrough(), } ) | prompt | llm | StrOutputParser() ) return chain # 非结构化+图谱 def build_text_chain(self): llm = self.llm # Condense a chat history and follow-up question into a standalone question _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language. Chat History: {chat_history} Follow Up Input: {question} Standalone question:""" # noqa: E501 CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template) _search_query = RunnableBranch( # If input includes chat_history, we condense it with the follow-up question ( RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config( run_name="HasChatHistoryCheck" ), # Condense follow-up question and chat into a standalone_question RunnablePassthrough.assign( chat_history=lambda x: self._format_chat_history(x["chat_history"]) ) | CONDENSE_QUESTION_PROMPT | llm | StrOutputParser(), ), # Else, we have no chat history, so just pass through the question RunnableLambda(lambda x: x["question"]), ) template = """Answer the question based only on the following context: {context} Question: {question} Use natural language and be concise. Answer:""" prompt = ChatPromptTemplate.from_template(template) chain = ( RunnableParallel( { "context": _search_query | self.text_retriever, "question": RunnablePassthrough(), } ) | prompt | llm | StrOutputParser() ) return chain