首先可以学习一些使用langchain的项目:
- https://dagster.io/blog/chatgpt-langchain
- https://github.com/namuan/dr-doc-search
- https://simonwillison.net/2023/Jan/13/semantic-search-answers/
- https://github.com/slavingia/askmybook
用到的python库:
pydantic: 数据验证: 参考: https://www.cnblogs.com/fengqiang626/p/13307771.html
typing: 类型约束
manifest-ml: 多个大模型(不包含所有)的统一调用客户端
promptlayer: 记录对openapi的调用, 搜索历史, 性能追踪.
unstructured: 若用户在使用 0.4.9 以下版本, 读取到的信息没有metadata字段, 而如果大于等于0.4.9则有
tenacity: 重试
用到的python特性:
dataclass: 合于存储数据对象(data object)的Python类, 参考 https://zhuanlan.zhihu.com/p/59657729
LLM 模型层
BaseLanguageModel : 抽象基类, 和各个模型交互的通用行为:基于用户的输入生成prompt
BaseLLM: 通用的基础大模型基类, 增加了缓存选项, 回调选项, 有部分序列化能力, 持有各种参数.
LLM : 和大模型的交互抽象, 所有子类都有自己的交互实现. 对它的调用, 将直接获取完全的prompt, 配合大模型特有的参数, 如temperture, length, top_p等等, 组装后, 利用一个client(组装的逻辑,既可以让专有client,即sdk吃掉,也可以组装后, 给到httpclient后者本地进程)发送给背后的大模型.
BaseOpenAI 为了OpenAI设计的专有类, 因为OpenAI的模型有不同的调用平台, 如部署在azure的, 官网自己的. NOTICE: 如果模型是"gpt-3.5-turbo" 则, 直接返回OpenAIChat
. 否则说明是其他模型
贴一段基类的模板代码, 分析:
- 获取调用参数(组装openAI模型调用的请求体,请求参数)
- 获取子prompts
- 监测token的消耗
- 对每个prompt开启调用(重试机制)
- 生成统一的结果
def _generate(
self, prompts: List[str], stop: Optional[List[str]] = None
) -> LLMResult:
"""Call out to OpenAI's endpoint with k unique prompts.
Args:
prompts: The prompts to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The full LLM output.
Example:
.. code-block:: python
response = openai.generate(["Tell me a joke."])
"""
# TODO: write a unit test for this
params = self._invocation_params
sub_prompts = self.get_sub_prompts(params, prompts, stop)
choices = []
token_usage: Dict[str, int] = {}
# Get the token usage from the response.
# Includes prompt, completion, and total tokens used.
_keys = {"completion_tokens", "prompt_tokens", "total_tokens"}
for _prompts in sub_prompts:
if self.streaming:
if len(_prompts) > 1:
raise ValueError("Cannot stream results with multiple prompts.")
params["stream"] = True
response = _streaming_response_template()
for stream_resp in completion_with_retry(
self, prompt=_prompts, **params
):
self.callback_manager.on_llm_new_token(
stream_resp["choices"][0]["text"],
verbose=self.verbose,
logprobs=stream_resp["choices"][0]["logprobs"],
)
_update_response(response, stream_resp)
choices.extend(response["choices"])
else:
response = completion_with_retry(self, prompt=_prompts, **params)
choices.extend(response["choices"])
if not self.streaming:
# Can't update token usage if streaming
update_token_usage(_keys, response, token_usage)
return self.create_llm_result(choices, prompts, token_usage)
PromptLayerOpenAI... : 只是增加了promptlayer
层
OpenAIChat : 专和OpenAI的3.5模型交互的类
贴一段参数准备的代码:
- 校验参数的个数, 只有一个prompt
- 预设参数, user, prefix, max_tokens
def _get_chat_params(
self, prompts: List[str], stop: Optional[List[str]] = None
) -> Tuple:
if len(prompts) > 1:
raise ValueError(
f"OpenAIChat currently only supports single prompt, got {prompts}"
)
messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}]
params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params}
if stop is not None:
if "stop" in params:
raise ValueError("`stop` found in both the input and default params.")
params["stop"] = stop
if params.get("max_tokens") == -1:
# for ChatGPT api, omitting max_tokens is equivalent to having no limit
del params["max_tokens"]
return messages, params
Chains 模块
Chains是组合其他各层的胶水, 亦是用户程序的调用入口.
Chain: 基类, 是所有chain对象的基本入口. 与用户程序交互, 处理用户的输入, 准备其他模块的输入, 提供内存能力, chain的回调能力. 其他所有的 Chain 类都继承自这个基类,并根据需要实现特定的功能。chain通过传入string值, 来控制接受的输入和给出的输出. 如input_key为["abc", "def"], 那么它只会处理用户输入的dict里面的这两个参数, 如果output_key为["uvw"], 那么它在输出的时候会过滤掉其他的dict值.
继承Chain的子类主要有两种类型:
- 通用 generic chain: 不在乎chain的具体类型, 控制chain的调用顺序, 是否调用. 他们可以用来合并构造其他的chain.
- 具体chain: 和通用chain比较来说, 他们承担了具体的某项任务, 可以和通用的chain组合起来使用, 也可以直接使用.
LLMChain : 针对语言模型LLM的查询, 可以格式化prompt以及调用语言模型.
其他 Chain 类 :除了 LLMChain,还有其他继承自 Chain 的类,它们根据不同的需求实现了特定的功能。例如,有些 Chain 类可能用于处理文本数据,有些可能用于处理图像数据,有些可能用于处理音频数据等。这些类都继承自 Chain 基类,并根据需要实现特定的输入和输出处理方法。
具体的chain就像是大模型在某种业务场景下的应用模式总结, 如VectorDBQA
就是利用 vector存储和大模型, 在vectorstore中用某种相似算法找到和问题类似的doc, 之后利用大模型将doc和问题一起给到到模型, 让大模型解释给出结果. 杜宇这类合并文档的任务: BaseCombineDocumentsChain
有四种不同的模式.
generic chain
loading from hub
从hub里获取某个配置好的chain, 实际的chain类型不会超过库里已经定义的. 举例:
https://langchain.readthedocs.io/en/latest/modules/chains/generic/from_hub.html
从服务端拉下来一个VectorDBQA类型的chain, 本地根据拉下来的配置初始化chain.
LLM Chain
对大模型最直接的调用chain, 常常被用来组合成其他的chain.
如 APIChain中, SequentialChain中
template = """Write a {adjective} poem about {subject}."""
prompt = PromptTemplate(template=template, input_variables=["adjective", "subject"])
llm_chain = LLMChain(prompt=prompt, llm=OpenAI(temperature=0), verbose=True)
llm_chain.predict(adjective="sad", subject="ducks")
Sequentical Chain
顺序执行chains
- SimpleSequentialChain: 前者的输出就是后者的输入
- SequentialChain: 允许多个输入和输出
Serialization
所有的chain都可以持久化, 以及从持久化中恢复. 具体在
langchain.chains.load_chain
方法中, load_chain_from_file
从file(虚拟file,可能是网络) 中读取到配置后, load_chain_from_config
从配置中获取所有的chain的配置信息,
针对每一种chain都有对应的load方法
type_to_loader_dict = {
"api_chain": _load_api_chain,
"hyde_chain": _load_hyde_chain,
"llm_chain": _load_llm_chain,
"llm_bash_chain": _load_llm_bash_chain,
"llm_checker_chain": _load_llm_checker_chain,
"llm_math_chain": _load_llm_math_chain,
"llm_requests_chain": _load_llm_requests_chain,
"pal_chain": _load_pal_chain,
"qa_with_sources_chain": _load_qa_with_sources_chain,
"stuff_documents_chain": _load_stuff_documents_chain,
"map_reduce_documents_chain": _load_map_reduce_documents_chain,
"map_rerank_documents_chain": _load_map_rerank_documents_chain,
"refine_documents_chain": _load_refine_documents_chain,
"sql_database_chain": _load_sql_database_chain,
"vector_db_qa_with_sources_chain": _load_vector_db_qa_with_sources_chain,
"vector_db_qa": _load_vector_db_qa,
}
举例:
def _load_stuff_documents_chain(config: dict, **kwargs: Any) -> StuffDocumentsChain:
if "llm_chain" in config:
llm_chain_config = config.pop("llm_chain")
llm_chain = load_chain_from_config(llm_chain_config)
elif "llm_chain_path" in config:
llm_chain = load_chain(config.pop("llm_chain_path"))
else:
raise ValueError("One of `llm_chain` or `llm_chain_config` must be present.")
if not isinstance(llm_chain, LLMChain):
raise ValueError(f"Expected LLMChain, got {llm_chain}")
if "document_prompt" in config:
prompt_config = config.pop("document_prompt")
document_prompt = load_prompt_from_config(prompt_config)
elif "document_prompt_path" in config:
document_prompt = load_prompt(config.pop("document_prompt_path"))
else:
raise ValueError(
"One of `document_prompt` or `document_prompt_path` must be present."
)
return StuffDocumentsChain(
llm_chain=llm_chain, document_prompt=document_prompt, **config
)
更加细致的组件有:
llm的loader, prompt的loader, 等等, 分别在每个模块下的loading.py文件中
transformation chain
提供了一个机制, 对用户的输入进行修改. 举例:
def transform_func(inputs: dict) -> dict:
text = inputs["text"]
shortened_text = "\n\n".join(text.split("\n\n")[:3])
return {"output_text": shortened_text}
transform_chain = TransformChain(input_variables=["text"], output_variables=["output_text"], transform=transform_func)
输入层 prompt
prompt传递给大模型的消息模板和抽象. 一般会在prompt中放三类内容:
- 对大模型的指示, 会话的设置, 如: 你是一个技术精湛的程序员
- 一些example, 以帮助大模型更好的理解输入和给出输出
- 提出的问题
BasePromptTemplate
作为基类, 暴露格式化prompt模板的方法, 返回一个prompt.
参数:
- input_variables, promptTemplate内部需要接受的参数
- output_parser: 对于大模型的返回, 可以用output_parser来解析它的返回. output_parser 的解析侧重是: 持有大模型返回的消息的结构, 做信息的提取, 如返回是一个json, json有字段为realAns, resAns是prompt需要的答案, 再或者返回的数据需要以
;
切分为列表; 而不是侧重网络数据解析, 序列化反序列化.
@abstractmethod def format_prompt(self, **kwargs: Any) -> PromptValue:
格式化用户输入, 得到prompt
@root_validator() def validate_variable_names(cls, values: Dict) -> Dict:
验证输入, 用户输入是否覆盖了partial 输入
需要关注的是两个子类: PromptTemplate
, ChatPromptTemplate
前者是一次问答型业务常用的, 后者是问答聊天型业务设计的.
PromptTemplate 利用的语言自己的format能力 或者 其他库(如web开发常用的ninja引擎), 举例, 如从template中提取用户需要输入的变量, template如: i am a {someadj} student
其中someadj
就是用户要输入的变量.
input_variables = {
v for _, v, _, _ in Formatter().parse(template) if v is not None
}
ChatPromptTemplate :
专属chat的prompt设计, 存储着chat的messages/templates
@classmethod
def from_role_strings(
cls, string_messages: List[Tuple[str, str]]
) -> ChatPromptTemplate:
messages = [
ChatMessagePromptTemplate(
content=PromptTemplate.from_template(template), role=role
)
for role, template in string_messages
]
return cls.from_messages(messages)
BaseMessagePromptTemplate
是chat中的一条消息/模板, 对应消息有speaker, AI, Human,System => ChatMessage, 如下, 本质和StringPromptTemplate没太大区别, 只是它的含义是chat的一条message, 且不能(适合)像StringPromptTemplate一样放多个问题在里面.
数据导入- loader
document_loaders
中含有大量的不同数据源的loader, loader的基本逻辑是: 连接到数据源, 拉取数据, 按照指定的大小切块.
BaseLoader
接口
Document
统一的数据表示, 不同的数据源的数据都要表示成Document, 方便Splitter的设计和处理.
以UnstructuredBaseLoader
为例
def _get_elements(self) -> List:
"""Get elements."""
@abstractmethod
def _get_metadata(self) -> dict:
"""Get metadata."""
def load(self) -> List[Document]:
"""Load file."""
elements = self._get_elements()
if self.mode == "elements":
docs: List[Document] = list()
for element in elements:
metadata = self._get_metadata()
# NOTE(MthwRobinson) - the attribute check is for backward compatibility
# with unstructured<0.4.9. The metadata attributed was added in 0.4.9.
if hasattr(element, "metadata"):
metadata.update(element.metadata.to_dict())
if hasattr(element, "category"):
metadata["category"] = element.category
docs.append(Document(page_content=str(element), metadata=metadata))
elif self.mode == "single":
metadata = self._get_metadata()
text = "\n\n".join([str(el) for el in elements])
docs = [Document(page_content=text, metadata=metadata)]
else:
raise ValueError(f"mode of {self.mode} not supported.")
return docs
数据访问 - store
Docstore
访问存储了docs的接口, 任何的数据库,网络,甚至内存块都可以成为一个store, 只要实现了search
接口, 能够从中搜索doc.
举例: 把wiki当做是一个store
def search(self, search: str) -> Union[str, Document]:
"""Try to search for wiki page.
If page exists, return the page summary, and a PageWithLookups object.
If page does not exist, return similar entries.
"""
import wikipedia
try:
page_content = wikipedia.page(search).content
url = wikipedia.page(search).url
result: Union[str, Document] = Document(
page_content=page_content, metadata={"page": url}
)
except wikipedia.PageError:
result = f"Could not find [{search}]. Similar: {wikipedia.search(search)}"
except wikipedia.DisambiguationError:
result = f"Could not find [{search}]. Similar: {wikipedia.search(search)}"
return result
记忆模块 memory
大模型在会话的时候, 多数是无状态的, 需要调用者自己维持context. memory是通用的记忆模块, 帮助所有基于langchain做开发的应用维持会话的context, 或者是(a concept of state around through a user's interactions).
有两种使用机制: 一种是可以从memory中提取一定的信息, 如messages的序列. 还可以是 在chain中直接使用memory, memory和chain本身会将 context 以某种形式(可能是经过llm精炼过的, 不是原始的kv) 存储下来.
举例: ConversationBufferMemory
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context from this conversation to buffer."""
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
if self.output_key is None:
if len(outputs) != 1:
raise ValueError(f"One output key expected, got {outputs.keys()}")
output_key = list(outputs.keys())[0]
else:
output_key = self.output_key
self.chat_memory.add_user_message(inputs[prompt_input_key])
self.chat_memory.add_ai_message(outputs[output_key])
举例: ConversationEntityMemory
先是使用大模型从历史对话的k条内容中提取名词实体, 之后作为参数
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Return history buffer."""
chain = LLMChain(llm=self.llm, prompt=self.entity_extraction_prompt)
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
buffer_string = get_buffer_string(
self.buffer[-self.k * 2 :],
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
output = chain.predict(
history=buffer_string,
input=inputs[prompt_input_key],
)
if output.strip() == "NONE":
entities = []
else:
entities = [w.strip() for w in output.split(",")] // 历史中提取到的实体
entity_summaries = {}
for entity in entities:
entity_summaries[entity] = self.store.get(entity, "") // 获取每个实体的值, 并且更新
self.entity_cache = entities // 将此次抽取的实体们保存下来
if self.return_messages:
buffer: Any = self.buffer[-self.k * 2 :]
else:
buffer = buffer_string
return {
self.chat_history_key: buffer,
"entities": entity_summaries,
}
将历史保存起来: 从上一次的实体抽取中获取每个抽取的实体, 之后利用大模型进行总结, 将每个实体的总结结果保存起来. 稍微留一下代码chain.predict : 在langchain项目中, 很多用到这样的传参方式, 虽然chain用不到那些参数, 但是这个chain实例内的prompt会用. 对于熟悉静态类型编程的同学,一般都不太习惯这种方式, 一般情况下更多的会用一个context类来向下传递参数吧. todo: 优化: 设计ParamContext来传递参数, 使用者使用ParamConsumer来从中提取关注的信息, 这样可以避免dict, kwargs满天飞的场景
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context from this conversation to buffer."""
super().save_context(inputs, outputs)
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
for entity in self.entity_cache:
chain = LLMChain(llm=self.llm, prompt=self.entity_summarization_prompt)
# key value store for entity
existing_summary = self.store.get(entity, "")
buffer_string = get_buffer_string(
self.buffer[-self.k * 2 :],
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
output = chain.predict(
summary=existing_summary,
history=buffer_string,
input=inputs[prompt_input_key],
entity=entity,
)
self.store[entity] = output.strip()
agent and tools 和其他项目的交互
agent
Agent
负责调用大模型, 并且决定接下来的动作. 调用大模型的结果是做决定的参考:
plan
-> _get_next_action
-> llm(专属prompt).predict
-> _extract_tool_and_input 获取下一个action以及要给它的输入, 得到 AgentAction
AgentAction
: agent要执行的动作封装.
AgentExecutor
: 从命名上理解是 Agent的执行环境, 执行器. Agent
只能决定接下来要执行的动作, 而AgentExecutor
才是具体发起执行, 进行执行的执行者. _take_next_step
-> Agent.plan
-> tool.run
tools
BaseTool
子类要重写 run
以赋予不同的运行逻辑
向量化 和 存储 vector
VectorStore
: 向量存储接口, 子类需要实现抽象方法(加粗的)以完成对应引擎的访问.
- add_texts
- add_documents
- similarity_search
- similarity_search_by_vector
- max_marginal_relevance_search
- max_marginal_relevance_search_by_vector
- from_documents
- from_texts
举例: OpenSearchVectorStore
调用 大模型的embedding 能力, 将其向量化, 并且写入索引. (需预先创建好索引, 自动创建的索引数据类型不对)
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
bulk_size: int = 500,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
bulk_size: Bulk API request count; Default: 500
Returns:
List of ids from adding the texts into the vectorstore.
"""
embeddings = [
self.embedding_function.embed_documents(list(text))[0] for text in texts
]
_validate_embeddings_and_bulk_size(len(embeddings), bulk_size)
return _bulk_ingest_embeddings(
self.client, self.index_name, embeddings, texts, metadatas
)
相似度搜索, 还是用大模型的embedding能力, 将查询转化为向量, 然后利用存储引擎自身的相似度搜索能力, 搜索出文档. 搜索完成后, 取k条结果, 返回给上层.
embedding = self.embedding_function.embed_query(query)
search_type = _get_kwargs_value(kwargs, "search_type", "approximate_search")
if search_type == "approximate_search":
size = _get_kwargs_value(kwargs, "size", 4)
search_query = _default_approximate_search_query(embedding, size, k)
elif search_type == SCRIPT_SCORING_SEARCH:
space_type = _get_kwargs_value(kwargs, "space_type", "l2")
pre_filter = _get_kwargs_value(kwargs, "pre_filter", MATCH_ALL_QUERY)
search_query = _default_script_query(embedding, space_type, pre_filter)
elif search_type == PAINLESS_SCRIPTING_SEARCH:
space_type = _get_kwargs_value(kwargs, "space_type", "l2Squared")
pre_filter = _get_kwargs_value(kwargs, "pre_filter", MATCH_ALL_QUERY)
search_query = _default_painless_scripting_query(
embedding, space_type, pre_filter
)
from_texts: 构造vectorstore, 创建新索引, 且将数据写入. 封装了这一系列过程.
embedding
向量接口, 每个子类需要实现两个方法, 分别向量化文档和查询. 在实现中要注意的细节点是, 和大模型交互, 控制每次发送的数据量. 以及增加重试机制.
def embed_documents(self, texts: List[str]) -> List[List[float]]:
def embed_query(self, text: str) -> List[float]: