参考链接
- https://docs.weaviate.io/
- https://github.com/weaviate
- https://weaviate-python-client.readthedocs.io/
概述
Demo项目需要用到向量数据库来进行验证,故需要快速搭建个环境来进行配合;让大模型帮忙选择了weaviate,并以docker方式进行部署;以下记录本次环境构建笔记;
环境信息:Ubuntu 22.04.5 LTS
环境构建
- 依赖环境构建(docker,docker-composes)
- docker
a.) 更新软件索引
apt-get update
apt-get upgrade
b.) 安装所需依赖
apt-get install ca-certificates curl gnupg lsb-release
c.) 添加 Docker 的官方 GPG 密钥
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
d.) 设置 Docker 的稳定版仓库
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
e.) 安装最新版本的 Docker 引擎和 containerd
apt-get update
apt-get install docker-ce docker-ce-cli containerd.io
f.) 设置开机启动并验证
systemctl start docker
systemctl enable docker
systemctl status docker
docker run hello-world
g.) 设置本地镜像路径及配置国内镜像
## 如果没有对应文件则创建,存在直接修改
[touch /etc/docker/daemon.json]
vim /etc/docker/daemon.json
## 文件内容如下:
{
"registry-mirrors": [
"https://docker.xuanyuan.me",
"https://docker.m.daocloud.io",
"https://docker.xuanyuan.me",
"https://docker.1ms.run",
"https://docker.1panel.live",
"https://hub.rat.dev",
"https://docker-mirror.aigc2d.com"
],
"data-root": "/data/opt/docker-images"
}
## 重启docker
systemctl stop docker
systemctl start docker
## 查看docker信息
docker info
- docker-composes
a.) 下载 Docker Compose 二进制文件
## 在github上查找最新版本文件或想要的版本,https://github.com/docker/compose/releases
curl -SL https://github.com/docker/compose/releases/download/v2.40.3/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
## 也可以通过如下地址加速
curl -SL https://ghfast.top/https://github.com/docker/compose/releases/download/v2.40.3/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
b.) 设置可执行权限并验证
chmod +x /usr/local/bin/docker-compose
docker compose version
- 运行weaviate及相关向量化模型
a.) 编写docker-compose.yml,设置weaviate,transformers镜像
services:
weaviate:
image: semitechnologies/weaviate:1.33.4
ports:
- "8080:8080" # 本地端口映射(可修改为 8081 等避免冲突)
- "50051:50051"
environment:
PERSISTENCE_DATA_PATH: '/var/lib/weaviate' # 容器内数据存储路径(固定值)
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'false' # 禁用匿名访问
AUTHENTICATION_APIKEY_ENABLED: 'true' # 启用 API Key 认证
AUTHENTICATION_APIKEY_ALLOWED_KEYS: 'WVF5YTh*********S231gsX3tD5ngdN8pkih123' #大于32位,可设置多个,用","分隔
AUTHENTICATION_APIKEY_USERS: 'test001' # 同AUTHENTICATION_APIKEY_ALLOWED_KEYS的配置一一映射,多个用","分隔
CLUSTER_HOSTNAME: 'weaviate-node-1' # 节点名称(单机部署固定值)
MEMORY_LIMIT: '4GB' # 内存限制(根据机器配置调整,建议 ≥ 2GB)
LOG_LEVEL: 'debug' # 日志级别(info/debug/error,调试时用 debug)
TRANSFORMERS_INFERENCE_API: 'http://transformers:8080'
ENABLE_MODULES: 'text2vec-transformers'
DEFAULT_VECTORIZER_MODULE: 'text2vec-transformers' ## 向量化模型
volumes:
- /data/weaviate-data:/var/lib/weaviate # 替换为宿主机实际目录(如 ./weaviate-data 表示当前目录下的 weaviate-data)
restart: always # 开机自启 + 故障自动重启
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/v1/health"]
interval: 30s
timeout: 10s
retries: 3
networks:
- weaviate-network
# transformers-inference 服务(提供向量生成能力)
transformers:
# 官方镜像:内置常用模型(all-MiniLM-L6-v2,轻量高效)
image: semitechnologies/transformers-inference:sentence-transformers-all-MiniLM-L6-v2
environment:
- ENABLE_CUDA=false # 若宿主机有 NVIDIA GPU,可改为 true(需安装 Docker GPU 支持)
- PORT=8080 # 服务端口,与 Weaviate 配置的地址一致
networks:
- weaviate-network
# 声明网络(让两个服务可通过容器名通信)
networks:
weaviate-network:
driver: bridge
- 整体服务运行&观察
a.) 运行服务镜像
## 在docker-compose.yml目录下运行
docker compose up -d
b.) 查看日志
## 查看运行容器
docker ps
## 查看指定容器滚动日志
docker logs -f CONTAINER_ID
测试代码
- Python依赖包
pip install -U weaviate-client
- 连接测试
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
print(weaviate.__version__)
client = weaviate.connect_to_custom(http_host="localhost",http_port=8080,http_secure=False, grpc_host="localhost",grpc_port=50051,grpc_secure=False,auth_credentials=Auth.api_key("WVF5YTha*********USmCRgsX3tD5ngdN8pkih"),)
print(f'client is ready : {client.is_ready()}')
meta = client.get_meta()
print(f'meta :{meta}')
client.close()
- 查询Class测试
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
print(weaviate.__version__)
def get_weaviate_classes(client):
"""
获取 Weaviate 库中的所有 class 及其详细信息
"""
try:
# 获取所有 collection 名称
collection_names = client.collections.list_all()
if not collection_names:
print("库中没有找到任何 Class")
return
print(f"找到 {len(collection_names)} 个 Class:")
print("=" * 50)
# 获取每个 class 的详细信息
for i, collection_name in enumerate(collection_names, 1):
print(f"\n{i}. Class名称: {collection_name}")
# 获取具体的 collection 对象
collection = client.collections.get(collection_name)
# 获取 class 配置信息
config = collection.config.get()
print(f" - 向量化器: {config.vectorizer}")
print(f" - 向量索引类型: {config.vector_index_type}")
# 获取对象数量
count = collection.aggregate.over_all(total_count=True)
print(f" - 对象数量: {count.total_count}")
except weaviate.exceptions.WeaviateConnectionError:
print("错误: 无法连接到 Weaviate 服务器")
print("请检查:")
print("1. Weaviate 服务是否正在运行")
print("2. 主机和端口是否正确")
print("3. 认证信息是否正确(如果启用了认证)")
except weaviate.exceptions.WeaviateGRPCError as e:
print(f"gRPC 错误: {e}")
except weaviate.exceptions.WeaviateAuthError:
print("认证失败: 请检查 API Key 是否正确")
except Exception as e:
print(f"发生未知错误: {e}")
finally:
# 确保连接被关闭
if 'client' in locals():
client.close()
print("\n连接已关闭")
client = weaviate.connect_to_custom(http_host="localhost",http_port=8080,http_secure=False, grpc_host="localhost",grpc_port=50051,grpc_secure=False,auth_credentials=Auth.api_key("WVF5YTha*********USmCRgsX3tD5ngdN8pkih"),)
print('=============== 遍历输出所有Class ====================')
get_weaviate_classes(client)
print('==================== 关闭连接 =======================')
client.close()
- 写入测试
a.) 单次写入
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.exceptions import WeaviateConnectionError, WeaviateInvalidInputError
import uuid
print(weaviate.__version__)
def basic_insert_demo(client):
"""
基础数据插入示例
"""
try:
# 获取或创建 collection
collection_name = "Article"
# 检查 collection 是否存在,如果不存在则创建
if collection_name not in client.collections.list_all():
client.collections.create(
name=collection_name,
# 可选:配置向量化器
# vectorizer_config=Configure.Vectorizer.text2vec_transformers()
)
print(f"创建了新的 Collection: {collection_name}")
# 获取 collection
articles = client.collections.get(collection_name)
# 插入单条数据
article_data = {
"title": "人工智能的发展历程",
"content": "人工智能从1956年达特茅斯会议诞生至今,经历了多次发展浪潮...",
"author": "张三",
"category": "科技",
"published": True
}
# 插入数据
result = articles.data.insert(
properties=article_data,
# uuid=uuid.uuid4() # 可选:指定 UUID,不指定会自动生成
)
print(f"成功插入数据,UUID: {result}")
finally:
client.close()
b.) 批量写入
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.exceptions import WeaviateConnectionError, WeaviateInvalidInputError
import uuid
print(weaviate.__version__)
def batch_insert_demo(client):
"""
基础数据插入示例
"""
try:
# 获取或创建 collection
collection_name = "Article"
# 检查 collection 是否存在,如果不存在则创建
if collection_name not in client.collections.list_all():
client.collections.create(
name=collection_name,
# 可选:配置向量化器
# vectorizer_config=Configure.Vectorizer.text2vec_transformers()
)
print(f"创建了新的 Collection: {collection_name}")
# 获取 collection
articles = client.collections.get(collection_name)
# 准备批量数据
articles_data = [
{
"title": "机器学习基础",
"content": "机器学习是人工智能的重要分支,主要包括监督学习、无监督学习和强化学习...",
"author": "李四",
"category": "技术",
"published": True,
"wordCount": 1500
},
{
"title": "深度学习应用",
"content": "深度学习在计算机视觉、自然语言处理等领域取得了突破性进展...",
"author": "王五",
"category": "AI",
"published": True,
"wordCount": 2000
},
{
"title": "大数据技术",
"content": "随着互联网发展,大数据技术成为处理海量数据的关键工具...",
"author": "赵六",
"category": "数据",
"published": False,
"wordCount": 1800
}
]
# 批量插入
with articles.batch.dynamic() as batch:
for i, article in enumerate(articles_data):
batch.add_object(
properties=article,
uuid=uuid.uuid4() # 可选
)
print(f"已添加第 {i+1} 条数据到批量队列")
print("批量插入完成!")
finally:
client.close()
c.) 批量写入并向量化
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.exceptions import WeaviateConnectionError, WeaviateInvalidInputError
import uuid
from weaviate.classes.config import Configure, Property, DataType
print(weaviate.__version__)
def batch_insert_demo(client):
"""
基础数据插入示例
"""
try:
# 获取或创建 collection
collection_name = "Article_vector"
# 检查 collection 是否存在,如果不存在则创建
if collection_name not in client.collections.list_all():
client.collections.create(
name=collection_name,
# 可选:配置向量化器
vector_config=Configure.Vectorizer.text2vec_transformers()
)
print(f"创建了新的 Collection: {collection_name}")
# 获取 collection
articles = client.collections.get(collection_name)
# 准备批量数据
articles_data = [
{
"title": "机器学习基础",
"content": "机器学习是人工智能的重要分支,主要包括监督学习、无监督学习和强化学习...",
"author": "李四",
"category": "技术",
"published": True,
"wordCount": 1500
},
{
"title": "深度学习应用",
"content": "深度学习在计算机视觉、自然语言处理等领域取得了突破性进展...",
"author": "王五",
"category": "AI",
"published": True,
"wordCount": 2000
},
{
"title": "大数据技术",
"content": "随着互联网发展,大数据技术成为处理海量数据的关键工具...",
"author": "赵六",
"category": "数据",
"published": False,
"wordCount": 1800
}
]
# 批量插入
with articles.batch.dynamic() as batch:
for i, article in enumerate(articles_data):
batch.add_object(
properties=article,
uuid=uuid.uuid4() # 可选
)
print(f"已添加第 {i+1} 条数据到批量队列")
print("批量插入完成!")
finally:
client.close()
- 简单查询测试
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
print(weaviate.__version__)
def basic_query_demo(client):
"""
基础数据查询示例
"""
try:
collection_name = "Article"
articles = client.collections.get(collection_name)
# 查询所有数据(限制数量)
print("=== 查询所有文章 ===")
response = articles.query.fetch_objects(
limit=10,
return_properties=["title", "author", "category", "published","wordCount"]
)
print(f"找到 {len(response.objects)} 篇文章:")
for i, obj in enumerate(response.objects, 1):
print(f"{i}. 标题: {obj.properties['title']}")
print(f" 作者: {obj.properties.get('author', '未知')}")
print(f" 分类: {obj.properties.get('category', '未知')}")
print(f" 发布状态: {obj.properties.get('published', '未知')}")
print(f" 字数: {obj.properties.get('wordCount', '未知')}")
print(f" UUID: {obj.uuid}")
print()
finally:
print('======== 基本查询完成 ============')
- 高阶使用测试
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.classes.query import Filter
from weaviate.classes.aggregate import GroupByAggregate
print(weaviate.__version__)
def basic_query_demo(client):
"""
基础数据查询示例
"""
try:
collection_name = "Article"
articles = client.collections.get(collection_name)
# 查询所有数据(限制数量)
print("=== 查询所有文章 ===")
response = articles.query.fetch_objects(
limit=10,
return_properties=["title", "author", "category", "published"]
)
print(f"找到 {len(response.objects)} 篇文章:")
for i, obj in enumerate(response.objects, 1):
print(f"{i}. 标题: {obj.properties['title']}")
print(f" 作者: {obj.properties.get('author', '未知')}")
print(f" 分类: {obj.properties.get('category', '未知')}")
print(f" 发布状态: {obj.properties.get('published', '未知')}")
print(f" UUID: {obj.uuid}")
print()
finally:
print('======== 基本查询完成 ============')
def filter_query_demo(client):
"""
条件查询示例
"""
try:
collection_name = "Article"
articles = client.collections.get(collection_name)
# 查询已发布的文章
print("=== 查询已发布的文章 ===")
response = articles.query.fetch_objects(
limit=5,
filters=Filter.by_property("published").equal(True),
return_properties=["title", "author", "category"]
)
print(f"找到 {len(response.objects)} 篇已发布文章:")
for obj in response.objects:
print(f"- {obj.properties['title']} (作者: {obj.properties['author']})")
print()
# 查询特定分类的文章
print("=== 查询科技类文章 ===")
response = articles.query.fetch_objects(
limit=5,
filters=Filter.by_property("category").equal("科技"),
return_properties=["title", "author"]
)
print(f"找到 {len(response.objects)} 篇科技类文章:")
for obj in response.objects:
print(f"- {obj.properties['title']}")
print()
finally:
print('======== 条件查询完成 ============')
def advanced_query_demo(client):
"""
高级查询示例 - 包含排序、分页、多条件查询
"""
try:
collection_name = "Article"
articles = client.collections.get(collection_name)
# 多条件查询:已发布且字数大于1000的文章
print("=== 多条件查询 ===")
response = articles.query.fetch_objects(
limit=5,
filters=(
Filter.by_property("published").equal(True) &
Filter.by_property("wordCount").greater_than(1000)
),
return_properties=["title", "author", "wordCount"]
)
print(f"找到 {len(response.objects)} 篇符合条件的文章:")
for obj in response.objects:
print(f"- {obj.properties['title']} (字数: {obj.properties.get('wordCount', '未知')})")
print()
# 排序查询
print("=== 按字数降序排序 ===")
response = articles.query.fetch_objects(
limit=5,
sort=weaviate.classes.query.Sort.by_property("wordCount", ascending=False),
return_properties=["title", "wordCount"]
)
print("字数最多的5篇文章:")
for obj in response.objects:
print(f"- {obj.properties['title']} (字数: {obj.properties.get('wordCount', '未知')})")
print()
# 分页查询
print("=== 分页查询 ===")
page_size = 3
after_uuid = None
for page in range(2): # 查询前2页
response = articles.query.fetch_objects(
limit=page_size,
after=after_uuid,
return_properties=["title", "author"]
)
print(f"第 {page + 1} 页:")
for obj in response.objects:
print(f"- {obj.properties['title']}")
after_uuid = obj.uuid # 记录最后一个对象的UUID用于下一页
if len(response.objects) < page_size:
break
print()
finally:
print('======== 高级查询完成 ============')
def vector_search_demo(client):
"""
向量搜索示例
"""
try:
collection_name = "Article_vector"
articles = client.collections.get(collection_name)
# 检查 Collection 配置
config = articles.config.get()
print(f'config : {config}')
print(f'vectorizer config : {config.vectorizer}')
if config.vectorizer is None:
print(f"检测到 Collection({collection_name}) 没有向量化器.......")
else:
# 基于文本的向量搜索
print("=== 向量搜索:查找与'人工智能'相关的文章 ===")
response = articles.query.near_text(
query="人工智能",
limit=5,
return_properties=["title", "content", "author"],
return_metadata=weaviate.classes.query.MetadataQuery(distance=True)
)
print(f"找到 {len(response.objects)} 篇相关文章:")
for obj in response.objects:
print(f"- 标题: {obj.properties['title']}")
print(f" 作者: {obj.properties['author']}")
print(f" 距离: {obj.metadata.distance:.4f}")
# 显示内容片段
content = obj.properties.get('content', '')
if len(content) > 100:
content = content[:100] + "..."
print(f" 内容: {content}")
print()
finally:
print('======== 向量搜索完成 ============')
def aggregate_query_demo(client):
"""
聚合查询示例
"""
try:
collection_name = "Article"
articles = client.collections.get(collection_name)
# 统计总数
print("=== 数据统计 ===")
count_result = articles.aggregate.over_all(total_count=True)
print(f"总文章数量: {count_result.total_count}")
# 按分类分组统计
print("\n=== 按分类统计 ===")
response = articles.aggregate.over_all(
group_by=GroupByAggregate(prop="category"),
total_count=True
)
for group in response.groups:
print(f"分类 '{group.grouped_by.value}': {group.total_count} 篇文章")
# 数值字段统计
print("\n=== 字数统计 ===")
numeric_result = articles.aggregate.over_all(
return_metrics=weaviate.classes.query.Metrics('wordCount').number(count = True,maximum = True, mean = True,minimum = True,sum_ = True)
)
for key,val in numeric_result.properties.items():
if key == 'wordCount':
print(f"平均字数: {val.mean or '无数据'}")
print(f"最大字数: {val.maximum or '无数据'}")
print(f"最小字数: {val.minimum or '无数据'}")
print(f"总字数: {val.sum_ or '无数据'}")
finally:
print('======== 聚合查询完成 ============')