Milvus结合LLM大模型应用实例

背景

通过Milvus的样例数据存储案例,结合AI模型,完成embedding向量化入库,向量查询,大模型总结和提示词工程的演示,熟悉相关组件和流程。

执行前提

  • 本文的案例基于Milvus官方的案例改造而来,相关代码运行环境为python,确保安装完善相关运行环境,建议使用Python虚拟环境进行,采用Juptyer运行。
  • 需要部署好待使用的Milvus向量数据库。
  • 部分功能可能需要配置代理访问额外资源。

执行过程

  • 原始案例使用的OpenAI的embedding,这里替换为智谱。
  • 这个案例基于Milvus官方样例,通过将一个电影相关数据集写入Milvus进行搜索。在此基础上进行扩展,应用基础大模型实现RAG功能。
  1. 安装智谱AI组件
%pip install zhipuai
  1. Milvus依赖
%pip install pymilvus==2.4.1
  1. 主要代码

常量

COLLECTION_NAME = "movie_search"
DIMENSION = 1024

BATCH_SIZE = 50

智谱向量化

from zhipuai import ZhipuAI
import os

# 必要时配置代理
#DEFAULT_PROXY = 'http://proxy'
#os.environ['HTTPS_PROXY'] = DEFAULT_PROXY
#os.environ['HTTP_PROXY'] = DEFAULT_PROXY
#os.environ['NO_PROXY'] = '127.0.0.1'

MY_API_KEY = '53ca18d6d84d08ff2e4feb7748fa8d82.VVVVVV'

def my_embedding(texts):
    client = ZhipuAI(api_key=MY_API_KEY) 
    response = client.embeddings.create(
        model="embedding-2", #填写需要调用的模型编码
        input=texts,
        dimensions=DIMENSION
    )

    return response

texts = []
for i in range (1, 1000):
    texts.append('你是谁')
    texts.append('hello')
res = my_embedding(texts)

print([res_data.embedding for res_data in res.data])

Milvus连接和写入

# 连接Milvus
from pymilvus import MilvusClient

client = MilvusClient('http://127.0.0.1:19530')

client.using_database('test')

# 安装数据集和进度显示工具

%pip install datasets tqdm 

# 创建collection
if client.has_collection(COLLECTION_NAME):
    client.drop_collection(COLLECTION_NAME)

# 创建schema
from pymilvus import DataType

schema = MilvusClient.create_schema(
    auto_id=True,
    enable_dynamic_field=False,
)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=64000)
schema.add_field(field_name="type", datatype=DataType.VARCHAR, max_length=64000)
schema.add_field(field_name="release_year", datatype=DataType.INT64)
schema.add_field(field_name="rating", datatype=DataType.VARCHAR, max_length=64000)
schema.add_field(field_name="description", datatype=DataType.VARCHAR, max_length=64000)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=DIMENSION)

client.create_collection(collection_name=COLLECTION_NAME, schema=schema)


# 增加索引

index_params = client.prepare_index_params()

#IP Inner Product 内积
index_params.add_index(
    field_name="embedding", metric_type="IP", index_type="AUTOINDEX", params={}
)

client.create_index(collection_name=COLLECTION_NAME, index_params=index_params)

client.load_collection(collection_name=COLLECTION_NAME, replica_number=1)


# 加载样例数据
from datasets import load_dataset

dataset = load_dataset("hugginglearners/netflix-shows", split="train")


# 采用智谱的Embedding
def emb_texts(texts):
    res = my_embedding(texts)
    return [res_data.embedding for res_data in res.data]

全量Embedding后的数据写入Milvus

# 注意一批次如果超过1000,会报错,这里使用了50
from tqdm import tqdm
import time

batch = []

print(len(dataset))
time.sleep(5)
print(BATCH_SIZE)

for i in tqdm(range(0, len(dataset))):
    batch.append(
        {
            "title": dataset[i]["title"] or "",
            "type": dataset[i]["type"] or "",
            "release_year": dataset[i]["release_year"] or -1,
            "rating": dataset[i]["rating"] or "",
            "description": dataset[i]["description"] or "",
        }
    )

    if len(batch) % BATCH_SIZE == 0 or i == len(dataset) - 1:
        embeddings = emb_texts([item["description"] for item in batch])

        for item, emb in zip(batch, embeddings):
            item["embedding"] = emb

        client.insert(collection_name=COLLECTION_NAME, data=batch)
        batch = []

执行进度:


progress-of-embedding

数据写入结果

result-of-embedding

知识库读取结果

import textwrap

def retrieve(query, top_k=5):
    # text, expr = query"""  """
    text = query
    
    res = client.search(
        collection_name=COLLECTION_NAME,
        data=emb_texts(text),
        # filter=expr,
        limit=top_k,
        output_fields=["title", "type", "release_year", "rating", "description"],
        search_params={
            "metric_type": "IP",
            "params": {},
        },
    )

    # print("Description:", text, "Expression:", expr)

    res_text = 'Null'
    
    for hit_group in res:
        # print("Results:")
        if res_text == 'Null':
            res_text = ''
        for rank, hit in enumerate(hit_group, start=1):
            entity = hit["entity"]

            res_text += f"\tRank: {rank} Score: {hit['distance']:} Title: {entity.get('title', '')}\n"
          
            res_text += f"\t\tType: {entity.get('type', '')} "
            res_text += f"Release Year: {entity.get('release_year', '')} "
            res_text += f"Rating: {entity.get('rating', '')}"
            description = entity.get("description", "")
            # res_text = textwrap.fill(description, width=88)
            # print()
            res_text += description
    
    return res_text

# my_query = ("movie about china", 'release_year < 2019 and rating like "PG%"')

# retrieve(my_query)

稍微复杂的形式,采用自定义的提示词(PE)

translate_prompt = ''' 
你是一个专业的翻译,有用户的提问如下:

"{query}"

将其翻译为英文,只返回英文结果

'''

def translate(query):
    client = ZhipuAI(api_key=MY_API_KEY)  # 请填写您自己的APIKey
    response = client.chat.completions.create(
        model="glm-4-plus",  # 请填写您要调用的模型名称
        messages=[
            {"role": "user", "content": translate_prompt.format(query=query)},
        ],
        stream=False
    )
    # print(response.choices[0].message)
    return response.choices[0].message.content
    # for chunk in response:
        # print(chunk.choices[0].delta)
        

summary_prompt = '''
你是一个专业的知识库助理,有用户的原始提问如下:
"{query}", 

通过该提问获得的知识内容如下:

#####
"{knowledge}"
#####
 
对知识进行总结,如果知识是英文,翻译为中文,最终只输出结果。
'''

def summary(query):
    translated = translate(query)
    knowledge = retrieve(translated, top_k=10)
    
    content = summary_prompt.format(query=query, knowledge=knowledge)
    # print("====", content)
    
    client = ZhipuAI(api_key=MY_API_KEY)  # 请填写您自己的APIKey
    response = client.chat.completions.create(
        model="glm-4-plus",  # 请填写您要调用的模型名称
        messages=[
            {"role": "user", "content": content},
        ],
        stream=False
    )
    # print(response.choices[0].message)
    return response.choices[0].message.content
    

测试案例和结果


query = '科幻电影'

print(translate(query))

result = summary(query)
print(result)
result-of-llm-query

最后关闭客户端

#close milvus client
client.close()

扩展应用,让应用直接输出json

prompt
result-of-prompt.png

参考资料

  1. Movie recommendation With Milvus
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容