indexer负责把向量化后的文档塞进向量数据库, 如Redis, VikingDB 等, indexer 只负责写, 查询由 Retriever 负责。
接口定义
// Indexer is the interface for the indexer.
// Indexer is used to store the documents.
//
//go:generate mockgen -destination ../../internal/mock/components/indexer/indexer_mock.go --package indexer -source interface.go
type Indexer interface {
// Store stores the documents.
Store(ctx context.Context, docs []*schema.Document, opts ...Option) (ids []string, err error) // invoke
}
官方RedisIndexer 实现
IndexerConfig , 主要依赖上游用于文本向量化的Embedding 和 redis存储 客户端
type IndexerConfig struct {
// Client is a Redis client representing a pool of zero or more underlying connections.
// It's safe for concurrent use by multiple goroutines, which means is okay to pass
// an existed Client to create a new Indexer component.
Client *redis.Client
// KeyPrefix prefix for each key, hset key would be KeyPrefix+Hashes.Key.
// If not set, make sure each key from DocumentToHashes contains same prefix, for ft.Create requires.
// see: https://redis.io/docs/latest/develop/interact/search-and-query/advanced-concepts/vectors/#create-a-vector-index
KeyPrefix string
// DocumentToHashes supports customize key, field and value for redis hash.
// field2EmbeddingValue is field - text pairs, which text will be embedded, then field and embedding will join field2Value.
// field2Value is field - value pairs for hset.
// key is hash key, is okay to use document ID if it's unique.
// Eventually, command will look like: hset $(KeyPrefix+key) field_1 val_1 field_2 val_2 ...
// Default defaultDocumentToFields.
DocumentToHashes func(ctx context.Context, doc *schema.Document) (*Hashes, error)
// BatchSize controls embedding texts size.
// Default 10.
BatchSize int `json:"batch_size"`
// Embedding vectorization method for values need to be embedded from FieldValue.
Embedding embedding.Embedder
}
store 方法实现:
func (i *Indexer) Store(ctx context.Context, docs []*schema.Document, opts ...indexer.Option) (ids []string, err error) {
defer func() {
if err != nil {
callbacks.OnError(ctx, err)
}
}()
options := indexer.GetCommonOptions(&indexer.Options{
Embedding: i.config.Embedding,
}, opts...)
ctx = callbacks.OnStart(ctx, &indexer.CallbackInput{Docs: docs})
if err = i.pipelineHSet(ctx, docs, options); err != nil {
return nil, err
}
ids = make([]string, 0, len(docs))
for _, doc := range docs {
// If you need hash key returned by FieldMapping, set doc.ID with key manually in DocumentToHashes.
ids = append(ids, doc.ID)
}
callbacks.OnEnd(ctx, &indexer.CallbackOutput{IDs: ids})
return ids, nil
}
核心方法, 及通过go-redis的pipeline, 批量将文档的内容(content), 向量对应的bytes(content_vactor)及元数据(matadata)信息写入的redis的hset:

image.png
pipelineHSet方法实现:
func (i *Indexer) pipelineHSet(ctx context.Context, docs []*schema.Document, options *indexer.Options) (err error) {
emb := options.Embedding
pipeline := i.config.Client.Pipeline()
var (
tuples []tuple
texts []string
)
embAndAdd := func() error {
var vectors [][]float64
if len(texts) > 0 {
if emb == nil {
return fmt.Errorf("[pipelineHSet] embedding method not provided")
}
vectors, err = emb.EmbedStrings(i.makeEmbeddingCtx(ctx, emb), texts)
if err != nil {
return fmt.Errorf("[pipelineHSet] embedding failed, %w", err)
}
if len(vectors) != len(texts) {
return fmt.Errorf("[pipelineHSet] invalid vector length, expected=%d, got=%d", len(texts), len(vectors))
}
}
for _, t := range tuples {
fields := t.fields
for k, idx := range t.key2Idx {
fields[k] = vector2Bytes(vectors[idx])
}
pipeline.HSet(ctx, i.config.KeyPrefix+t.key, flatten(fields)...)
}
tuples = tuples[:0]
texts = texts[:0]
return nil
}
for _, doc := range docs {
hashes, err := i.config.DocumentToHashes(ctx, doc)
if err != nil {
return err
}
key := hashes.Key
field2Value := hashes.Field2Value
fields := make(map[string]any, len(field2Value))
embSize := 0
for k, v := range field2Value {
fields[k] = v.Value
if v.EmbedKey != "" {
embSize++
}
}
if embSize > i.config.BatchSize {
return fmt.Errorf("[pipelineHSet] embedding size over batch size, batch size=%d, got size=%d",
i.config.BatchSize, embSize)
}
if len(texts)+embSize > i.config.BatchSize {
if err = embAndAdd(); err != nil {
return err
}
}
key2Idx := make(map[string]int, embSize)
for k, v := range field2Value {
if v.EmbedKey != "" {
if _, found := fields[v.EmbedKey]; found {
return fmt.Errorf("[pipelineHSet] duplicate key for value and vector, field=%s", k)
}
var text string
if v.Stringify != nil {
text, err = v.Stringify(v.Value)
if err != nil {
return err
}
} else {
var ok bool
text, ok = v.Value.(string)
if !ok {
return fmt.Errorf("[pipelineHSet] assert value as string failed, key=%s, emb_key=%s", k, v.EmbedKey)
}
}
key2Idx[v.EmbedKey] = len(texts)
texts = append(texts, text)
}
}
tuples = append(tuples, tuple{
key: key,
fields: fields,
key2Idx: key2Idx,
})
}
if len(tuples) > 0 {
if err = embAndAdd(); err != nil {
return err
}
}
if _, err = pipeline.Exec(ctx); err != nil {
return err
}
return nil
}
完整的运行示例:
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/cloudwego/eino-ext/components/embedding/ark"
"github.com/cloudwego/eino-ext/components/indexer/redis"
"github.com/cloudwego/eino/components/embedding"
"github.com/cloudwego/eino/schema"
"github.com/google/uuid"
redisCli "github.com/redis/go-redis/v9"
"os"
)
const (
RedisPrefix = "eino:test:"
ContentField = "content"
MetadataField = "metadata"
VectorField = "content_vector"
)
func main() {
redisAddr := os.Getenv("REDIS_ADDR")
redisClient := redisCli.NewClient(&redisCli.Options{
Addr: redisAddr,
Protocol: 2,
})
config := &redis.IndexerConfig{
Client: redisClient,
KeyPrefix: RedisPrefix,
BatchSize: 1,
DocumentToHashes: func(ctx context.Context, doc *schema.Document) (*redis.Hashes, error) {
if doc.ID == "" {
doc.ID = uuid.New().String()
}
key := doc.ID
metadataBytes, err := json.Marshal(doc.MetaData)
if err != nil {
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
}
return &redis.Hashes{
Key: key,
Field2Value: map[string]redis.FieldValue{
ContentField: {Value: doc.Content, EmbedKey: VectorField},
MetadataField: {Value: metadataBytes},
},
}, nil
},
}
ctx := context.Background()
embeddingIns11, err := newEmbedding(ctx)
if err != nil {
panic(err)
}
config.Embedding = embeddingIns11
idr, err := redis.NewIndexer(ctx, config)
if err != nil {
panic(err)
}
markdownDoc := &schema.Document{
Content: " #Title 0\n 测试内容 \n ## Title 1\nHello Word\n## Title 2\nWord Hello \n ",
}
ids, err := idr.Store(ctx, []*schema.Document{markdownDoc})
if err != nil {
panic(err)
}
for _, v := range ids {
fmt.Println(v)
}
}
func newEmbedding(ctx context.Context) (eb embedding.Embedder, err error) {
// TODO Modify component configuration here.
config := &ark.EmbeddingConfig{
BaseURL: "https://ark.cn-beijing.volces.com/api/v3",
APIKey: os.Getenv("ARK_API_KEY"),
Model: os.Getenv("ARK_EMBEDDING_MODEL"),
}
eb, err = ark.NewEmbedder(ctx, config)
if err != nil {
return nil, err
}
return eb, nil
}
写入redis的内容:

image.png