背景
通过Milvus的样例数据存储案例,结合AI模型,完成embedding向量化入库,向量查询,大模型总结和提示词工程的演示,熟悉相关组件和流程。
执行前提
- 本文的案例基于Milvus官方的案例改造而来,相关代码运行环境为python,确保安装完善相关运行环境,建议使用Python虚拟环境进行,采用Juptyer运行。
- 需要部署好待使用的Milvus向量数据库。
- 部分功能可能需要配置代理访问额外资源。
执行过程
- 原始案例使用的OpenAI的embedding,这里替换为智谱。
- 这个案例基于Milvus官方样例,通过将一个电影相关数据集写入Milvus进行搜索。在此基础上进行扩展,应用基础大模型实现RAG功能。
- 安装智谱AI组件
%pip install zhipuai
- Milvus依赖
%pip install pymilvus==2.4.1
- 主要代码
常量
COLLECTION_NAME = "movie_search"
DIMENSION = 1024
BATCH_SIZE = 50
智谱向量化
from zhipuai import ZhipuAI
import os
# 必要时配置代理
#DEFAULT_PROXY = 'http://proxy'
#os.environ['HTTPS_PROXY'] = DEFAULT_PROXY
#os.environ['HTTP_PROXY'] = DEFAULT_PROXY
#os.environ['NO_PROXY'] = '127.0.0.1'
MY_API_KEY = '53ca18d6d84d08ff2e4feb7748fa8d82.VVVVVV'
def my_embedding(texts):
client = ZhipuAI(api_key=MY_API_KEY)
response = client.embeddings.create(
model="embedding-2", #填写需要调用的模型编码
input=texts,
dimensions=DIMENSION
)
return response
texts = []
for i in range (1, 1000):
texts.append('你是谁')
texts.append('hello')
res = my_embedding(texts)
print([res_data.embedding for res_data in res.data])
Milvus连接和写入
# 连接Milvus
from pymilvus import MilvusClient
client = MilvusClient('http://127.0.0.1:19530')
client.using_database('test')
# 安装数据集和进度显示工具
%pip install datasets tqdm
# 创建collection
if client.has_collection(COLLECTION_NAME):
client.drop_collection(COLLECTION_NAME)
# 创建schema
from pymilvus import DataType
schema = MilvusClient.create_schema(
auto_id=True,
enable_dynamic_field=False,
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=64000)
schema.add_field(field_name="type", datatype=DataType.VARCHAR, max_length=64000)
schema.add_field(field_name="release_year", datatype=DataType.INT64)
schema.add_field(field_name="rating", datatype=DataType.VARCHAR, max_length=64000)
schema.add_field(field_name="description", datatype=DataType.VARCHAR, max_length=64000)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=DIMENSION)
client.create_collection(collection_name=COLLECTION_NAME, schema=schema)
# 增加索引
index_params = client.prepare_index_params()
#IP Inner Product 内积
index_params.add_index(
field_name="embedding", metric_type="IP", index_type="AUTOINDEX", params={}
)
client.create_index(collection_name=COLLECTION_NAME, index_params=index_params)
client.load_collection(collection_name=COLLECTION_NAME, replica_number=1)
# 加载样例数据
from datasets import load_dataset
dataset = load_dataset("hugginglearners/netflix-shows", split="train")
# 采用智谱的Embedding
def emb_texts(texts):
res = my_embedding(texts)
return [res_data.embedding for res_data in res.data]
全量Embedding后的数据写入Milvus
# 注意一批次如果超过1000,会报错,这里使用了50
from tqdm import tqdm
import time
batch = []
print(len(dataset))
time.sleep(5)
print(BATCH_SIZE)
for i in tqdm(range(0, len(dataset))):
batch.append(
{
"title": dataset[i]["title"] or "",
"type": dataset[i]["type"] or "",
"release_year": dataset[i]["release_year"] or -1,
"rating": dataset[i]["rating"] or "",
"description": dataset[i]["description"] or "",
}
)
if len(batch) % BATCH_SIZE == 0 or i == len(dataset) - 1:
embeddings = emb_texts([item["description"] for item in batch])
for item, emb in zip(batch, embeddings):
item["embedding"] = emb
client.insert(collection_name=COLLECTION_NAME, data=batch)
batch = []
执行进度:
progress-of-embedding
数据写入结果
result-of-embedding
知识库读取结果
import textwrap
def retrieve(query, top_k=5):
# text, expr = query""" """
text = query
res = client.search(
collection_name=COLLECTION_NAME,
data=emb_texts(text),
# filter=expr,
limit=top_k,
output_fields=["title", "type", "release_year", "rating", "description"],
search_params={
"metric_type": "IP",
"params": {},
},
)
# print("Description:", text, "Expression:", expr)
res_text = 'Null'
for hit_group in res:
# print("Results:")
if res_text == 'Null':
res_text = ''
for rank, hit in enumerate(hit_group, start=1):
entity = hit["entity"]
res_text += f"\tRank: {rank} Score: {hit['distance']:} Title: {entity.get('title', '')}\n"
res_text += f"\t\tType: {entity.get('type', '')} "
res_text += f"Release Year: {entity.get('release_year', '')} "
res_text += f"Rating: {entity.get('rating', '')}"
description = entity.get("description", "")
# res_text = textwrap.fill(description, width=88)
# print()
res_text += description
return res_text
# my_query = ("movie about china", 'release_year < 2019 and rating like "PG%"')
# retrieve(my_query)
稍微复杂的形式,采用自定义的提示词(PE)
translate_prompt = '''
你是一个专业的翻译,有用户的提问如下:
"{query}"
将其翻译为英文,只返回英文结果
'''
def translate(query):
client = ZhipuAI(api_key=MY_API_KEY) # 请填写您自己的APIKey
response = client.chat.completions.create(
model="glm-4-plus", # 请填写您要调用的模型名称
messages=[
{"role": "user", "content": translate_prompt.format(query=query)},
],
stream=False
)
# print(response.choices[0].message)
return response.choices[0].message.content
# for chunk in response:
# print(chunk.choices[0].delta)
summary_prompt = '''
你是一个专业的知识库助理,有用户的原始提问如下:
"{query}",
通过该提问获得的知识内容如下:
#####
"{knowledge}"
#####
对知识进行总结,如果知识是英文,翻译为中文,最终只输出结果。
'''
def summary(query):
translated = translate(query)
knowledge = retrieve(translated, top_k=10)
content = summary_prompt.format(query=query, knowledge=knowledge)
# print("====", content)
client = ZhipuAI(api_key=MY_API_KEY) # 请填写您自己的APIKey
response = client.chat.completions.create(
model="glm-4-plus", # 请填写您要调用的模型名称
messages=[
{"role": "user", "content": content},
],
stream=False
)
# print(response.choices[0].message)
return response.choices[0].message.content
测试案例和结果
query = '科幻电影'
print(translate(query))
result = summary(query)
print(result)
result-of-llm-query
最后关闭客户端
#close milvus client
client.close()
扩展应用,让应用直接输出json
prompt
result-of-prompt.png