123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- # from langchain_core.runnables import (
- # RunnableBranch,
- # RunnableLambda,
- # )
- # from langchain_core.output_parsers import StrOutputParser
- # from langchain_core.runnables import RunnableParallel, RunnablePassthrough
- # from langchain.prompts import (
- # ChatPromptTemplate,
- # )
- # from typing import List, Tuple
- # from langchain.prompts import PromptTemplate
- # from langchain_core.messages import AIMessage, HumanMessage
- # from qa_chain.get_vectordb import get_vectordb
- # from graph.graph_retrieval import connect, structured_retriever
- # from llm.llm import deepseek_llm
- # # from llm.llm import qwen_llm
- #
- #
- # class Chat_QA_chain_self:
- # """
- # 带历史记录的问答链
- # - model:调用的模型名称
- # - temperature:温度系数,控制生成的随机性
- # - top_k:返回检索的前k个相似文档
- # - chat_history:历史记录,输入一个列表,默认是一个空列表
- # - file_path:建库文件所在路径
- # - persist_path:向量数据库持久化路径
- # - embeddings:使用的embedding模型
- # """
- #
- # def __init__(self, temperature: float = 0.0, top_k: int = 4, chat_history: List[Tuple[str, str]] = [],
- # file_path: str = None, persist_path: str = None, embedding: str = "m3e"):
- # self.temperature = temperature
- # self.top_k = top_k
- # self.chat_history = chat_history
- # self.file_path = file_path
- # self.persist_path = persist_path
- # self.embedding = embedding
- # self.llm = deepseek_llm
- # self.vectordb = get_vectordb(self.file_path, self.persist_path, self.embedding)
- # self.graph = connect()
- #
- # def clear_chat_history(self):
- # """
- # 清空历史记录
- # :return:
- # """
- # self.chat_history = []
- # # print("Chat history has been cleared.")
- #
- # def add_to_chat_history(self, human_message: str, ai_message: str):
- # """
- # 添加一条聊天记录到历史记录中
- # :param human_message: 人类用户的消息
- # :param ai_message: AI的回复消息
- # :return:
- # """
- # self.chat_history.append((human_message, ai_message))
- #
- # def get_chat_history(self):
- # """
- # 获取所有的聊天历史记录
- # :return: 聊天历史记录列表
- # """
- # return self.chat_history
- #
- # # 原来的函数
- # # def _format_chat_history(self, chat_history: List[Tuple[str, str]]) -> List:
- # # buffer = []
- # # for human, ai in chat_history:
- # # buffer.append(HumanMessage(content=human))
- # # buffer.append(AIMessage(content=ai))
- # # buffer.append(chat_history)
- # # return buffer
- #
- # def _format_chat_history(self, chat_history: List[Tuple[str, str]]) -> List:
- # buffer = []
- # for human, ai in chat_history:
- # buffer.append(HumanMessage(content=human))
- # buffer.append(AIMessage(content=ai))
- # return buffer
- #
- # def retriever(self, question: str):
- # # print(f"Search query: {question}")
- # structured_data = structured_retriever(self.llm, self.graph, question)
- # unstructured_data = self.vectordb.as_retriever(search_type="similarity",
- # search_kwargs={'k': self.top_k}) # 默认similarity,k=4
- # final_data = f"""Unstructured data:{unstructured_data}\n
- # Structured data:{structured_data}
- # """
- # # final_data = f"""Unstructured data:{unstructured_data}\n"""
- # # print(f"unstructured_data:{unstructured_data}")
- # return final_data
- #
- # # # def build_chain(self, question: str):
- # def build_chain(self):
- # llm = self.llm
- #
- # # Condense a chat history and follow-up question into a standalone question
- # _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question,
- # in its original language.
- # Chat History:
- # {chat_history}
- # Follow Up Input: {question}
- # Standalone question:""" # noqa: E501
- # CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template)
- #
- # _search_query = RunnableBranch(
- # # If input includes chat_history, we condense it with the follow-up question
- # (
- # RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config(
- # run_name="HasChatHistoryCheck"
- # ), # Condense follow-up question and chat into a standalone_question
- # RunnablePassthrough.assign(
- # chat_history=lambda x: self._format_chat_history(x["chat_history"])
- # )
- # | CONDENSE_QUESTION_PROMPT
- # | llm
- # | StrOutputParser(),
- # ),
- # # Else, we have no chat history, so just pass through the question
- # RunnableLambda(lambda x: x["question"]),
- # )
- #
- # template = """Answer the question based only on the following context:
- # {context}
- #
- # Question: {question}
- # Use natural language and be concise.
- # Answer:"""
- # prompt = ChatPromptTemplate.from_template(template)
- #
- # chain = (
- # RunnableParallel(
- # {
- # "context": _search_query | self.retriever,
- # "question": RunnablePassthrough(),
- # }
- # )
- # | prompt
- # | llm
- # | StrOutputParser()
- # )
- # return chain
- from langchain_core.runnables import (
- RunnableBranch,
- RunnableLambda,
- )
- from langchain_core.output_parsers import StrOutputParser
- from langchain_core.runnables import RunnableParallel, RunnablePassthrough
- from langchain.prompts import (
- ChatPromptTemplate,
- )
- from typing import List, Tuple
- from langchain.prompts import PromptTemplate
- from langchain_core.messages import AIMessage, HumanMessage
- from embedding.embedding import get_embedding
- from qa_chain.get_vectordb import get_vectordb
- from graph.graph_retrieval import connect, structured_retriever, text_structured_retriever
- from llm.llm import LLM
- class Chat_QA_chain_self:
- """
- 带历史记录的问答链
- - model:调用的模型名称
- - temperature:温度系数,控制生成的随机性
- - top_k:返回检索的前k个相似文档
- - chat_history:历史记录,输入一个列表,默认是一个空列表
- - file_path:建库文件所在路径
- - persist_path:向量数据库持久化路径
- - embeddings:使用的embedding模型
- """
- def __init__(self, temperature: float = 0.0, top_k: int = 2, chat_history: List[Tuple[str, str]] = [],
- file_path: str = None, persist_path: str = None, embedding: str = "m3e"):
- self.temperature = temperature
- self.top_k = top_k
- self.chat_history = chat_history
- self.file_path = file_path
- self.persist_path = persist_path
- self.embedding = get_embedding(embedding)
- self.llm_instance = LLM(model_name='qwen')
- self.llm = self.llm_instance.get_llm()
- self.vectordb = get_vectordb(self.file_path, self.persist_path, self.embedding)
- self.graph = connect()
- def clear_chat_history(self):
- """
- 清空历史记录
- :return:
- """
- self.chat_history = []
- # print("Chat history has been cleared.")
- def add_to_chat_history(self, human_message: str, ai_message: str):
- """
- 添加一条聊天记录到历史记录中
- :param human_message: 人类用户的消息
- :param ai_message: AI的回复消息
- :return:
- """
- self.chat_history.append((human_message, ai_message))
- def get_chat_history(self):
- """
- 获取所有的聊天历史记录
- :return: 聊天历史记录列表
- """
- return self.chat_history
- def _format_chat_history(self, chat_history: List[Tuple[str, str]]) -> List:
- buffer = []
- for human, ai in chat_history:
- buffer.append(HumanMessage(content=human))
- buffer.append(AIMessage(content=ai))
- return buffer
- def retriever(self, question: str):
- # print(f"Search query: {question}")
- structured_data = structured_retriever(self.llm, self.graph, question)
- # unstructured_data = self.vectordb.as_retriever(search_type="similarity",
- # search_kwargs={'k': self.top_k}) # 默认similarity,k=4
- unstructured_data = self.rag_retriever(question)
- final_data = f"""Unstructured data:{unstructured_data}\n
- Structured data:{structured_data}
- """
- # final_data = f"""Unstructured data:{unstructured_data}\n"""
- # print(f"unstructured_data:{unstructured_data}")
- return final_data
- # 非结构化文本图谱+rag
- def text_retriever(self, question: str):
- # print(f"Search query: {question}")
- structured_data = text_structured_retriever(self.llm, self.graph, question)
- # unstructured_data = self.vectordb.as_retriever(search_type="similarity",
- # search_kwargs={'k': self.top_k}) # 默认similarity,k=4
- unstructured_data = self.rag_retriever(question)
- final_data = f"""Structured data:{structured_data}\n
- Unstructured data:{unstructured_data}\n
- """
- # final_data = f"""Unstructured data:{unstructured_data}\n"""
- print(f"final_data:{final_data}")
- return final_data
- # 单纯的rag
- def rag_retriever(self, question: str):
- # 获取与查询问题最相似的文档
- # docs = self.vectordb.similarity_search(question, k=self.top_k)
- # docs = self.vectordb.max_marginal_relevance_search_by_vector(question)
- # 将文档内容拼接成一个字符串
- # final_data = "\n".join([doc.page_content for doc in docs])
- # print(f"unstructured_data:{final_data}")
- retriever = self.vectordb.as_retriever(search_type = 'mmr',search_kwargs = {'k':self.top_k})
- docs = retriever.get_relevant_documents(question)
- final_data = "\n".join([doc.page_content for doc in docs])
- return final_data
- def build_chain(self):
- llm = self.llm
- # Condense a chat history and follow-up question into a standalone question
- _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question,
- in its original language.
- Chat History:
- {chat_history}
- Follow Up Input: {question}
- Standalone question:""" # noqa: E501
- CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template)
- _search_query = RunnableBranch(
- # If input includes chat_history, we condense it with the follow-up question
- (
- RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config(
- run_name="HasChatHistoryCheck"
- ), # Condense follow-up question and chat into a standalone_question
- RunnablePassthrough.assign(
- chat_history=lambda x: self._format_chat_history(x["chat_history"])
- )
- | CONDENSE_QUESTION_PROMPT
- | llm
- | StrOutputParser(),
- ),
- # Else, we have no chat history, so just pass through the question
- RunnableLambda(lambda x: x["question"]),
- )
- template = """Answer the question based only on the following context:
- {context}
- Question: {question}
- Use natural language and be concise.
- Answer:"""
- prompt = ChatPromptTemplate.from_template(template)
- chain = (
- RunnableParallel(
- {
- "context": _search_query | self.retriever,
- "question": RunnablePassthrough(),
- }
- )
- | prompt
- | llm
- | StrOutputParser()
- )
- return chain
- def build_rag_chain(self):
- llm = self.llm
- # Condense a chat history and follow-up question into a standalone question
- _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question,
- in its original language.
- Chat History:
- {chat_history}
- Follow Up Input: {question}
- Standalone question:""" # noqa: E501
- CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template)
- _search_query = RunnableBranch(
- # If input includes chat_history, we condense it with the follow-up question
- (
- RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config(
- run_name="HasChatHistoryCheck"
- ), # Condense follow-up question and chat into a standalone_question
- RunnablePassthrough.assign(
- chat_history=lambda x: self._format_chat_history(x["chat_history"])
- )
- | CONDENSE_QUESTION_PROMPT
- | llm
- | StrOutputParser(),
- ),
- # Else, we have no chat history, so just pass through the question
- RunnableLambda(lambda x: x["question"]),
- )
- template = """Answer the question based only on the following context:
- {context}
- Question: {question}
- Use natural language and be concise.
- Answer:"""
- prompt = ChatPromptTemplate.from_template(template)
- chain = (
- RunnableParallel(
- {
- "context": _search_query | self.rag_retriever,
- "question": RunnablePassthrough(),
- }
- )
- | prompt
- | llm
- | StrOutputParser()
- )
- return chain
- # 非结构化+图谱
- def build_text_chain(self):
- llm = self.llm
- # Condense a chat history and follow-up question into a standalone question
- _template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question,
- in its original language.
- Chat History:
- {chat_history}
- Follow Up Input: {question}
- Standalone question:""" # noqa: E501
- CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template)
- _search_query = RunnableBranch(
- # If input includes chat_history, we condense it with the follow-up question
- (
- RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config(
- run_name="HasChatHistoryCheck"
- ), # Condense follow-up question and chat into a standalone_question
- RunnablePassthrough.assign(
- chat_history=lambda x: self._format_chat_history(x["chat_history"])
- )
- | CONDENSE_QUESTION_PROMPT
- | llm
- | StrOutputParser(),
- ),
- # Else, we have no chat history, so just pass through the question
- RunnableLambda(lambda x: x["question"]),
- )
- template = """Answer the question based only on the following context:
- {context}
- Question: {question}
- Use natural language and be concise.
- Answer:"""
- prompt = ChatPromptTemplate.from_template(template)
- chain = (
- RunnableParallel(
- {
- "context": _search_query | self.text_retriever,
- "question": RunnablePassthrough(),
- }
- )
- | prompt
- | llm
- | StrOutputParser()
- )
- return chain
|