weaviate使用笔记

参考链接

概述

Demo项目需要用到向量数据库来进行验证,故需要快速搭建个环境来进行配合;让大模型帮忙选择了weaviate,并以docker方式进行部署;以下记录本次环境构建笔记;
环境信息:Ubuntu 22.04.5 LTS

环境构建

  1. 依赖环境构建(docker,docker-composes)
  • docker
    a.) 更新软件索引
apt-get update
apt-get upgrade

b.) 安装所需依赖

apt-get install ca-certificates curl gnupg lsb-release

c.) 添加 Docker 的官方 GPG 密钥

curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg

d.) 设置 Docker 的稳定版仓库

echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

e.) 安装最新版本的 Docker 引擎和 containerd

apt-get update
apt-get install docker-ce docker-ce-cli containerd.io

f.) 设置开机启动并验证

systemctl start docker
systemctl enable docker
systemctl status docker
docker run hello-world

g.) 设置本地镜像路径及配置国内镜像

## 如果没有对应文件则创建,存在直接修改
[touch /etc/docker/daemon.json]
vim /etc/docker/daemon.json
## 文件内容如下:
{
    "registry-mirrors": [
        "https://docker.xuanyuan.me",
        "https://docker.m.daocloud.io",
        "https://docker.xuanyuan.me",
        "https://docker.1ms.run",
        "https://docker.1panel.live",
        "https://hub.rat.dev",
        "https://docker-mirror.aigc2d.com"
    ],
    "data-root": "/data/opt/docker-images"
}
## 重启docker
systemctl stop docker
systemctl start docker
## 查看docker信息
docker info
  • docker-composes
    a.) 下载 Docker Compose 二进制文件
## 在github上查找最新版本文件或想要的版本,https://github.com/docker/compose/releases
curl -SL https://github.com/docker/compose/releases/download/v2.40.3/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose
## 也可以通过如下地址加速
curl -SL https://ghfast.top/https://github.com/docker/compose/releases/download/v2.40.3/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose

b.) 设置可执行权限并验证

chmod +x /usr/local/bin/docker-compose
docker compose version
  1. 运行weaviate及相关向量化模型
    a.) 编写docker-compose.yml,设置weaviate,transformers镜像
services:
  weaviate:
    image: semitechnologies/weaviate:1.33.4
    ports:
      - "8080:8080"  # 本地端口映射(可修改为 8081 等避免冲突)
      - "50051:50051"
    environment:
      PERSISTENCE_DATA_PATH: '/var/lib/weaviate'  # 容器内数据存储路径(固定值)
      AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'false'  # 禁用匿名访问
      AUTHENTICATION_APIKEY_ENABLED: 'true'  # 启用 API Key 认证
      AUTHENTICATION_APIKEY_ALLOWED_KEYS: 'WVF5YTh*********S231gsX3tD5ngdN8pkih123'   #大于32位,可设置多个,用","分隔
      AUTHENTICATION_APIKEY_USERS: 'test001' # 同AUTHENTICATION_APIKEY_ALLOWED_KEYS的配置一一映射,多个用","分隔
      CLUSTER_HOSTNAME: 'weaviate-node-1'  # 节点名称(单机部署固定值)
      MEMORY_LIMIT: '4GB'  # 内存限制(根据机器配置调整,建议 ≥ 2GB)
      LOG_LEVEL: 'debug'  # 日志级别(info/debug/error,调试时用 debug)
      TRANSFORMERS_INFERENCE_API: 'http://transformers:8080'
      ENABLE_MODULES: 'text2vec-transformers'
      DEFAULT_VECTORIZER_MODULE: 'text2vec-transformers'  ## 向量化模型
    volumes:
      - /data/weaviate-data:/var/lib/weaviate  # 替换为宿主机实际目录(如 ./weaviate-data 表示当前目录下的 weaviate-data)
    restart: always  # 开机自启 + 故障自动重启
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/v1/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    networks:
      - weaviate-network

  # transformers-inference 服务(提供向量生成能力)
  transformers:
    # 官方镜像:内置常用模型(all-MiniLM-L6-v2,轻量高效)
    image: semitechnologies/transformers-inference:sentence-transformers-all-MiniLM-L6-v2
    environment:
      - ENABLE_CUDA=false  # 若宿主机有 NVIDIA GPU,可改为 true(需安装 Docker GPU 支持)
      - PORT=8080  # 服务端口,与 Weaviate 配置的地址一致
    networks:
      - weaviate-network

# 声明网络(让两个服务可通过容器名通信)
networks:
  weaviate-network:
    driver: bridge
  1. 整体服务运行&观察
    a.) 运行服务镜像
## 在docker-compose.yml目录下运行
docker compose up -d

b.) 查看日志

## 查看运行容器
docker ps
## 查看指定容器滚动日志
docker logs -f CONTAINER_ID

测试代码

  1. Python依赖包
pip install -U weaviate-client
  1. 连接测试
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout

print(weaviate.__version__)

client = weaviate.connect_to_custom(http_host="localhost",http_port=8080,http_secure=False, grpc_host="localhost",grpc_port=50051,grpc_secure=False,auth_credentials=Auth.api_key("WVF5YTha*********USmCRgsX3tD5ngdN8pkih"),)
print(f'client is ready : {client.is_ready()}')
meta = client.get_meta()
print(f'meta :{meta}')
client.close()
  1. 查询Class测试
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout

print(weaviate.__version__)
def get_weaviate_classes(client):
    """
    获取 Weaviate 库中的所有 class 及其详细信息
    """
    try:
        # 获取所有 collection 名称
        collection_names = client.collections.list_all()

        if not collection_names:
            print("库中没有找到任何 Class")
            return

        print(f"找到 {len(collection_names)} 个 Class:")
        print("=" * 50)

        # 获取每个 class 的详细信息
        for i, collection_name in enumerate(collection_names, 1):
            print(f"\n{i}. Class名称: {collection_name}")

            # 获取具体的 collection 对象
            collection = client.collections.get(collection_name)

            # 获取 class 配置信息
            config = collection.config.get()

            print(f"   - 向量化器: {config.vectorizer}")
            print(f"   - 向量索引类型: {config.vector_index_type}")

            # 获取对象数量
            count = collection.aggregate.over_all(total_count=True)
            print(f"   - 对象数量: {count.total_count}")

    except weaviate.exceptions.WeaviateConnectionError:
        print("错误: 无法连接到 Weaviate 服务器")
        print("请检查:")
        print("1. Weaviate 服务是否正在运行")
        print("2. 主机和端口是否正确")
        print("3. 认证信息是否正确(如果启用了认证)")

    except weaviate.exceptions.WeaviateGRPCError as e:
        print(f"gRPC 错误: {e}")

    except weaviate.exceptions.WeaviateAuthError:
        print("认证失败: 请检查 API Key 是否正确")

    except Exception as e:
        print(f"发生未知错误: {e}")

    finally:
        # 确保连接被关闭
        if 'client' in locals():
            client.close()
            print("\n连接已关闭")

client = weaviate.connect_to_custom(http_host="localhost",http_port=8080,http_secure=False, grpc_host="localhost",grpc_port=50051,grpc_secure=False,auth_credentials=Auth.api_key("WVF5YTha*********USmCRgsX3tD5ngdN8pkih"),)
print('=============== 遍历输出所有Class ====================')
get_weaviate_classes(client)

print('==================== 关闭连接 =======================')
client.close()

  1. 写入测试
    a.) 单次写入
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.exceptions import WeaviateConnectionError, WeaviateInvalidInputError
import uuid
print(weaviate.__version__)

def basic_insert_demo(client):
    """
    基础数据插入示例
    """
    try:
        # 获取或创建 collection
        collection_name = "Article"
        # 检查 collection 是否存在,如果不存在则创建
        if collection_name not in client.collections.list_all():
            client.collections.create(
                name=collection_name,
                # 可选:配置向量化器
                # vectorizer_config=Configure.Vectorizer.text2vec_transformers()
            )
            print(f"创建了新的 Collection: {collection_name}")
        # 获取 collection
        articles = client.collections.get(collection_name)
        # 插入单条数据
        article_data = {
            "title": "人工智能的发展历程",
            "content": "人工智能从1956年达特茅斯会议诞生至今,经历了多次发展浪潮...",
            "author": "张三",
            "category": "科技",
            "published": True
        }

        # 插入数据
        result = articles.data.insert(
            properties=article_data,
            # uuid=uuid.uuid4()  # 可选:指定 UUID,不指定会自动生成
        )
        print(f"成功插入数据,UUID: {result}")

    finally:
        client.close()

b.) 批量写入

import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.exceptions import WeaviateConnectionError, WeaviateInvalidInputError
import uuid

print(weaviate.__version__)

def batch_insert_demo(client):
    """
    基础数据插入示例
    """
    try:
        # 获取或创建 collection
        collection_name = "Article"
        # 检查 collection 是否存在,如果不存在则创建
        if collection_name not in client.collections.list_all():
            client.collections.create(
                name=collection_name,
                # 可选:配置向量化器
                # vectorizer_config=Configure.Vectorizer.text2vec_transformers()
            )
            print(f"创建了新的 Collection: {collection_name}")

        # 获取 collection
        articles = client.collections.get(collection_name)
        # 准备批量数据
        articles_data = [
            {
                "title": "机器学习基础",
                "content": "机器学习是人工智能的重要分支,主要包括监督学习、无监督学习和强化学习...",
                "author": "李四",
                "category": "技术",
                "published": True,
                "wordCount": 1500
            },
            {
                "title": "深度学习应用",
                "content": "深度学习在计算机视觉、自然语言处理等领域取得了突破性进展...",
                "author": "王五",
                "category": "AI",
                "published": True,
                "wordCount": 2000
            },
            {
                "title": "大数据技术",
                "content": "随着互联网发展,大数据技术成为处理海量数据的关键工具...",
                "author": "赵六",
                "category": "数据",
                "published": False,
                "wordCount": 1800
            }
        ]

        # 批量插入
        with articles.batch.dynamic() as batch:
            for i, article in enumerate(articles_data):
                batch.add_object(
                    properties=article,
                    uuid=uuid.uuid4()  # 可选
                )
                print(f"已添加第 {i+1} 条数据到批量队列")

        print("批量插入完成!")

    finally:
        client.close()

c.) 批量写入并向量化

import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.exceptions import WeaviateConnectionError, WeaviateInvalidInputError
import uuid
from weaviate.classes.config import Configure, Property, DataType

print(weaviate.__version__)

def batch_insert_demo(client):
    """
    基础数据插入示例
    """
    try:
        # 获取或创建 collection
        collection_name = "Article_vector"

        # 检查 collection 是否存在,如果不存在则创建
        if collection_name not in client.collections.list_all():
            client.collections.create(
                name=collection_name,
                # 可选:配置向量化器
                vector_config=Configure.Vectorizer.text2vec_transformers()
            )
            print(f"创建了新的 Collection: {collection_name}")

        # 获取 collection
        articles = client.collections.get(collection_name)
        # 准备批量数据
        articles_data = [
            {
                "title": "机器学习基础",
                "content": "机器学习是人工智能的重要分支,主要包括监督学习、无监督学习和强化学习...",
                "author": "李四",
                "category": "技术",
                "published": True,
                "wordCount": 1500
            },
            {
                "title": "深度学习应用",
                "content": "深度学习在计算机视觉、自然语言处理等领域取得了突破性进展...",
                "author": "王五",
                "category": "AI",
                "published": True,
                "wordCount": 2000
            },
            {
                "title": "大数据技术",
                "content": "随着互联网发展,大数据技术成为处理海量数据的关键工具...",
                "author": "赵六",
                "category": "数据",
                "published": False,
                "wordCount": 1800
            }
        ]

        # 批量插入
        with articles.batch.dynamic() as batch:
            for i, article in enumerate(articles_data):
                batch.add_object(
                    properties=article,
                    uuid=uuid.uuid4()  # 可选
                )
                print(f"已添加第 {i+1} 条数据到批量队列")

        print("批量插入完成!")

    finally:
        client.close()

  1. 简单查询测试
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout

print(weaviate.__version__)

def basic_query_demo(client):
    """
    基础数据查询示例
    """
    try:
        collection_name = "Article"
        articles = client.collections.get(collection_name)

        # 查询所有数据(限制数量)
        print("=== 查询所有文章 ===")
        response = articles.query.fetch_objects(
            limit=10,
            return_properties=["title", "author", "category", "published","wordCount"]
        )

        print(f"找到 {len(response.objects)} 篇文章:")
        for i, obj in enumerate(response.objects, 1):
            print(f"{i}. 标题: {obj.properties['title']}")
            print(f"   作者: {obj.properties.get('author', '未知')}")
            print(f"   分类: {obj.properties.get('category', '未知')}")
            print(f"   发布状态: {obj.properties.get('published', '未知')}")
            print(f"   字数: {obj.properties.get('wordCount', '未知')}")
            print(f"   UUID: {obj.uuid}")
            print()

    finally:
        print('======== 基本查询完成 ============')
  1. 高阶使用测试
import weaviate, os
from weaviate.classes.init import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.classes.query import Filter
from weaviate.classes.aggregate import GroupByAggregate

print(weaviate.__version__)

def basic_query_demo(client):
    """
    基础数据查询示例
    """
    try:
        collection_name = "Article"
        articles = client.collections.get(collection_name)

        # 查询所有数据(限制数量)
        print("=== 查询所有文章 ===")
        response = articles.query.fetch_objects(
            limit=10,
            return_properties=["title", "author", "category", "published"]
        )

        print(f"找到 {len(response.objects)} 篇文章:")
        for i, obj in enumerate(response.objects, 1):
            print(f"{i}. 标题: {obj.properties['title']}")
            print(f"   作者: {obj.properties.get('author', '未知')}")
            print(f"   分类: {obj.properties.get('category', '未知')}")
            print(f"   发布状态: {obj.properties.get('published', '未知')}")
            print(f"   UUID: {obj.uuid}")
            print()

    finally:
        print('======== 基本查询完成 ============')

def filter_query_demo(client):
    """
    条件查询示例
    """
    try:
        collection_name = "Article"
        articles = client.collections.get(collection_name)

        # 查询已发布的文章
        print("=== 查询已发布的文章 ===")
        response = articles.query.fetch_objects(
            limit=5,
            filters=Filter.by_property("published").equal(True),
            return_properties=["title", "author", "category"]
        )

        print(f"找到 {len(response.objects)} 篇已发布文章:")
        for obj in response.objects:
            print(f"- {obj.properties['title']} (作者: {obj.properties['author']})")
        print()

        # 查询特定分类的文章
        print("=== 查询科技类文章 ===")
        response = articles.query.fetch_objects(
            limit=5,
            filters=Filter.by_property("category").equal("科技"),
            return_properties=["title", "author"]
        )

        print(f"找到 {len(response.objects)} 篇科技类文章:")
        for obj in response.objects:
            print(f"- {obj.properties['title']}")
        print()

    finally:
        print('======== 条件查询完成 ============')

def advanced_query_demo(client):
    """
    高级查询示例 - 包含排序、分页、多条件查询
    """
    try:
        collection_name = "Article"
        articles = client.collections.get(collection_name)

        # 多条件查询:已发布且字数大于1000的文章
        print("=== 多条件查询 ===")
        response = articles.query.fetch_objects(
            limit=5,
            filters=(
                Filter.by_property("published").equal(True) &
                Filter.by_property("wordCount").greater_than(1000)
            ),
            return_properties=["title", "author", "wordCount"]
        )

        print(f"找到 {len(response.objects)} 篇符合条件的文章:")
        for obj in response.objects:
            print(f"- {obj.properties['title']} (字数: {obj.properties.get('wordCount', '未知')})")
        print()

        # 排序查询
        print("=== 按字数降序排序 ===")
        response = articles.query.fetch_objects(
            limit=5,
            sort=weaviate.classes.query.Sort.by_property("wordCount", ascending=False),
            return_properties=["title", "wordCount"]
        )

        print("字数最多的5篇文章:")
        for obj in response.objects:
            print(f"- {obj.properties['title']} (字数: {obj.properties.get('wordCount', '未知')})")
        print()

        # 分页查询
        print("=== 分页查询 ===")
        page_size = 3
        after_uuid = None

        for page in range(2):  # 查询前2页
            response = articles.query.fetch_objects(
                limit=page_size,
                after=after_uuid,
                return_properties=["title", "author"]
            )

            print(f"第 {page + 1} 页:")
            for obj in response.objects:
                print(f"- {obj.properties['title']}")
                after_uuid = obj.uuid  # 记录最后一个对象的UUID用于下一页

            if len(response.objects) < page_size:
                break
            print()

    finally:
        print('======== 高级查询完成 ============')

def vector_search_demo(client):
    """
    向量搜索示例
    """
    try:
        collection_name = "Article_vector"
        articles = client.collections.get(collection_name)

        # 检查 Collection 配置
        config = articles.config.get()
        print(f'config : {config}')
        print(f'vectorizer config : {config.vectorizer}')

        if config.vectorizer is None:
            print(f"检测到 Collection({collection_name}) 没有向量化器.......")
        else:
            # 基于文本的向量搜索
            print("=== 向量搜索:查找与'人工智能'相关的文章 ===")
            response = articles.query.near_text(
                query="人工智能",
                limit=5,
                return_properties=["title", "content", "author"],
                return_metadata=weaviate.classes.query.MetadataQuery(distance=True)
            )

            print(f"找到 {len(response.objects)} 篇相关文章:")
            for obj in response.objects:
                print(f"- 标题: {obj.properties['title']}")
                print(f"  作者: {obj.properties['author']}")
                print(f"  距离: {obj.metadata.distance:.4f}")
                # 显示内容片段
                content = obj.properties.get('content', '')
                if len(content) > 100:
                    content = content[:100] + "..."
                print(f"  内容: {content}")
                print()

    finally:
        print('======== 向量搜索完成 ============')

def aggregate_query_demo(client):
    """
    聚合查询示例
    """
    try:
        collection_name = "Article"
        articles = client.collections.get(collection_name)

        # 统计总数
        print("=== 数据统计 ===")
        count_result = articles.aggregate.over_all(total_count=True)
        print(f"总文章数量: {count_result.total_count}")

        # 按分类分组统计
        print("\n=== 按分类统计 ===")
        response = articles.aggregate.over_all(
                group_by=GroupByAggregate(prop="category"),
                total_count=True
        )


        for group in response.groups:
            print(f"分类 '{group.grouped_by.value}': {group.total_count} 篇文章")

        # 数值字段统计
        print("\n=== 字数统计 ===")
        numeric_result = articles.aggregate.over_all(
                return_metrics=weaviate.classes.query.Metrics('wordCount').number(count = True,maximum = True, mean = True,minimum = True,sum_ = True)
            )

        for key,val in numeric_result.properties.items():
            if key == 'wordCount':
                print(f"平均字数: {val.mean or '无数据'}")
                print(f"最大字数: {val.maximum or '无数据'}")
                print(f"最小字数: {val.minimum or '无数据'}")
                print(f"总字数: {val.sum_ or '无数据'}")

    finally:
        print('======== 聚合查询完成 ============')

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容