Eino中的组件-indexer

indexer负责把向量化后的文档塞进向量数据库, 如Redis, VikingDB 等, indexer 只负责写, 查询由 Retriever 负责。

接口定义

// Indexer is the interface for the indexer.
// Indexer is used to store the documents.
//
//go:generate  mockgen -destination ../../internal/mock/components/indexer/indexer_mock.go --package indexer -source interface.go
type Indexer interface {
    // Store stores the documents.
    Store(ctx context.Context, docs []*schema.Document, opts ...Option) (ids []string, err error) // invoke
}

官方RedisIndexer 实现

IndexerConfig , 主要依赖上游用于文本向量化的Embedding 和 redis存储 客户端

type IndexerConfig struct {
    // Client is a Redis client representing a pool of zero or more underlying connections.
    // It's safe for concurrent use by multiple goroutines, which means is okay to pass
    // an existed Client to create a new Indexer component.
    Client *redis.Client
    // KeyPrefix prefix for each key, hset key would be KeyPrefix+Hashes.Key.
    // If not set, make sure each key from DocumentToHashes contains same prefix, for ft.Create requires.
    // see: https://redis.io/docs/latest/develop/interact/search-and-query/advanced-concepts/vectors/#create-a-vector-index
    KeyPrefix string
    // DocumentToHashes supports customize key, field and value for redis hash.
    // field2EmbeddingValue is field - text pairs, which text will be embedded, then field and embedding will join field2Value.
    // field2Value is field - value pairs for hset.
    // key is hash key, is okay to use document ID if it's unique.
    // Eventually, command will look like: hset $(KeyPrefix+key) field_1 val_1 field_2 val_2 ...
    // Default defaultDocumentToFields.
    DocumentToHashes func(ctx context.Context, doc *schema.Document) (*Hashes, error)
    // BatchSize controls embedding texts size.
    // Default 10.
    BatchSize int `json:"batch_size"`
    // Embedding vectorization method for values need to be embedded from FieldValue.
    Embedding embedding.Embedder
}

store 方法实现:

func (i *Indexer) Store(ctx context.Context, docs []*schema.Document, opts ...indexer.Option) (ids []string, err error) {
    defer func() {
        if err != nil {
            callbacks.OnError(ctx, err)
        }
    }()

    options := indexer.GetCommonOptions(&indexer.Options{
        Embedding: i.config.Embedding,
    }, opts...)

    ctx = callbacks.OnStart(ctx, &indexer.CallbackInput{Docs: docs})

    if err = i.pipelineHSet(ctx, docs, options); err != nil {
        return nil, err
    }

    ids = make([]string, 0, len(docs))
    for _, doc := range docs {
        // If you need hash key returned by FieldMapping, set doc.ID with key manually in DocumentToHashes.
        ids = append(ids, doc.ID)
    }

    callbacks.OnEnd(ctx, &indexer.CallbackOutput{IDs: ids})

    return ids, nil
}

核心方法, 及通过go-redis的pipeline, 批量将文档的内容(content), 向量对应的bytes(content_vactor)及元数据(matadata)信息写入的redis的hset:


image.png

pipelineHSet方法实现:

func (i *Indexer) pipelineHSet(ctx context.Context, docs []*schema.Document, options *indexer.Options) (err error) {
    emb := options.Embedding
    pipeline := i.config.Client.Pipeline()

    var (
        tuples []tuple
        texts  []string
    )

    embAndAdd := func() error {
        var vectors [][]float64

        if len(texts) > 0 {
            if emb == nil {
                return fmt.Errorf("[pipelineHSet] embedding method not provided")
            }

            vectors, err = emb.EmbedStrings(i.makeEmbeddingCtx(ctx, emb), texts)
            if err != nil {
                return fmt.Errorf("[pipelineHSet] embedding failed, %w", err)
            }

            if len(vectors) != len(texts) {
                return fmt.Errorf("[pipelineHSet] invalid vector length, expected=%d, got=%d", len(texts), len(vectors))
            }
        }

        for _, t := range tuples {
            fields := t.fields
            for k, idx := range t.key2Idx {
                fields[k] = vector2Bytes(vectors[idx])
            }

            pipeline.HSet(ctx, i.config.KeyPrefix+t.key, flatten(fields)...)
        }

        tuples = tuples[:0]
        texts = texts[:0]

        return nil
    }

    for _, doc := range docs {
        hashes, err := i.config.DocumentToHashes(ctx, doc)
        if err != nil {
            return err
        }

        key := hashes.Key
        field2Value := hashes.Field2Value
        fields := make(map[string]any, len(field2Value))
        embSize := 0
        for k, v := range field2Value {
            fields[k] = v.Value
            if v.EmbedKey != "" {
                embSize++
            }
        }

        if embSize > i.config.BatchSize {
            return fmt.Errorf("[pipelineHSet] embedding size over batch size, batch size=%d, got size=%d",
                i.config.BatchSize, embSize)
        }

        if len(texts)+embSize > i.config.BatchSize {
            if err = embAndAdd(); err != nil {
                return err
            }
        }

        key2Idx := make(map[string]int, embSize)
        for k, v := range field2Value {
            if v.EmbedKey != "" {
                if _, found := fields[v.EmbedKey]; found {
                    return fmt.Errorf("[pipelineHSet] duplicate key for value and vector, field=%s", k)
                }

                var text string
                if v.Stringify != nil {
                    text, err = v.Stringify(v.Value)
                    if err != nil {
                        return err
                    }
                } else {
                    var ok bool
                    text, ok = v.Value.(string)
                    if !ok {
                        return fmt.Errorf("[pipelineHSet] assert value as string failed, key=%s, emb_key=%s", k, v.EmbedKey)
                    }
                }

                key2Idx[v.EmbedKey] = len(texts)
                texts = append(texts, text)
            }
        }

        tuples = append(tuples, tuple{
            key:     key,
            fields:  fields,
            key2Idx: key2Idx,
        })
    }

    if len(tuples) > 0 {
        if err = embAndAdd(); err != nil {
            return err
        }
    }

    if _, err = pipeline.Exec(ctx); err != nil {
        return err
    }

    return nil
}

完整的运行示例:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/cloudwego/eino-ext/components/embedding/ark"
    "github.com/cloudwego/eino-ext/components/indexer/redis"
    "github.com/cloudwego/eino/components/embedding"
    "github.com/cloudwego/eino/schema"
    "github.com/google/uuid"
    redisCli "github.com/redis/go-redis/v9"
    "os"
)

const (
    RedisPrefix   = "eino:test:"
    ContentField  = "content"
    MetadataField = "metadata"
    VectorField   = "content_vector"
)

func main() {
    redisAddr := os.Getenv("REDIS_ADDR")
    redisClient := redisCli.NewClient(&redisCli.Options{
        Addr:     redisAddr,
        Protocol: 2,
    })

    config := &redis.IndexerConfig{
        Client:    redisClient,
        KeyPrefix: RedisPrefix,
        BatchSize: 1,
        DocumentToHashes: func(ctx context.Context, doc *schema.Document) (*redis.Hashes, error) {
            if doc.ID == "" {
                doc.ID = uuid.New().String()
            }
            key := doc.ID

            metadataBytes, err := json.Marshal(doc.MetaData)
            if err != nil {
                return nil, fmt.Errorf("failed to marshal metadata: %w", err)
            }

            return &redis.Hashes{
                Key: key,
                Field2Value: map[string]redis.FieldValue{
                    ContentField:  {Value: doc.Content, EmbedKey: VectorField},
                    MetadataField: {Value: metadataBytes},
                },
            }, nil
        },
    }

    ctx := context.Background()

    embeddingIns11, err := newEmbedding(ctx)
    if err != nil {
        panic(err)
    }

    config.Embedding = embeddingIns11
    idr, err := redis.NewIndexer(ctx, config)
    if err != nil {
        panic(err)
    }

    markdownDoc := &schema.Document{
        Content: " #Title 0\n   测试内容 \n ## Title 1\nHello Word\n## Title 2\nWord Hello  \n ",
    }

    ids, err := idr.Store(ctx, []*schema.Document{markdownDoc})
    if err != nil {
        panic(err)
    }
    for _, v := range ids {
        fmt.Println(v)
    }

}

func newEmbedding(ctx context.Context) (eb embedding.Embedder, err error) {
    // TODO Modify component configuration here.
    config := &ark.EmbeddingConfig{
        BaseURL: "https://ark.cn-beijing.volces.com/api/v3",
        APIKey:  os.Getenv("ARK_API_KEY"),
        Model:   os.Getenv("ARK_EMBEDDING_MODEL"),
    }
    eb, err = ark.NewEmbedder(ctx, config)
    if err != nil {
        return nil, err
    }
    return eb, nil
}


写入redis的内容:

image.png
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容