在Eino 中indexer 组件中, 会使用redis的pipeline功能, 批量将文档的内容(content), 向量对应的bytes(content_vactor)及元数据(matadata)信息写入的hset进行存储。
Redis对于命令的批量处理, 提供pipeline 和 TxPipeline 两种方式执行,
- pipeline : 通过在一次通信中将多个命令一起发送到服务器执行,避免了网络和处理开销
- TxPipeline : 在pipeline 的基础上, 保证所有包含的命令将完整执行,不会被其他客户端的命令中断。
pipeline代码示例 :
func TestRedisStackClient_Do2(t *testing.T) {
cli := NewRedisStackClient("localhost:6379", "", 0)
pipe := cli.Client.Pipeline()
ctx := context.Background()
for i := 0; i < 5; i++ {
pipe.Set(ctx, fmt.Sprintf("seat:%v", i), fmt.Sprintf("#%v", i), 0)
}
cmds, err := pipe.Exec(ctx)
if err != nil {
panic(err)
}
for _, c := range cmds {
fmt.Printf("%v;", c.(*redis.StatusCmd).Val())
}
fmt.Println("")
pipe = cli.Client.Pipeline()
get0Result := pipe.Get(ctx, "seat:0")
get3Result := pipe.Get(ctx, "seat:3")
get4Result := pipe.Get(ctx, "seat:4")
cmds, err = pipe.Exec(ctx)
// The results are available only after the pipeline
// has finished executing.
fmt.Println(get0Result.Val()) // >>> #0
fmt.Println(get3Result.Val()) // >>> #3
fmt.Println(get4Result.Val()) // >>> #4
}
TxPipeline 代码示例:
func TestRedisStackClient_Do3(t *testing.T) {
cli := NewRedisStackClient("localhost:6379", "", 0)
trans := cli.Client.TxPipeline()
ctx := context.Background()
trans.IncrBy(ctx, "count:test1", 1)
trans.IncrBy(ctx, "count:test1", 2)
trans.IncrBy(ctx, "count:test1", 3)
cmds, err := trans.Exec(ctx)
if err != nil {
panic(err)
}
for _, c := range cmds {
fmt.Println(c.(*redis.IntCmd).Val())
}
cmd := cli.Client.Do(ctx, "GET", "count:test1")
fmt.Println(cmd.Val())
}
Redis 的所有单条命令(包括 INCR、INCRBY、DECR、DECRBY、HINCRBY 等)都是原子操作。
无论多少个客户端并发调用,Redis 都会以单线程方式依次执行每一条指令,因此不会出现竞态条件,天然线程安全。
Redis 不支持事务回滚,那么如何保证操作的原子性呢?
redis没有回滚机制,但可以通过以下 2 种手段在不同场景下保证“操作的完整性”——即 要么全部成功,要么全部不生效。
乐观锁 + 重试(WATCH)
使用 WATCH 监视关键 key,事务里一旦发现 key 被其他客户端修改,就让 EXEC 失败并 重新执行整个流程。
下面用 WATCH 实现“ZPOP”——即原子地弹出有序集合中的最小/最大元素
// zPopMin 原子弹出分值最小的成员
func zPopMin(ctx context.Context, rdb *redis.Client, key string) (member string, score float64, err error) {
for {
err = rdb.Watch(ctx, func(tx *redis.Tx) error {
// 1. 先读最小成员
res, err := tx.ZRangeWithScores(ctx, key, 0, 0).Result()
if err != nil || len(res) == 0 {
return redis.Nil // 集合为空
}
member, score = res[0].Member.(string), res[0].Score
// 2. 事务里删除
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.ZRem(ctx, key, member)
return nil
})
return err
}, key)
switch err {
case nil: // 成功
return member, score, nil
case redis.TxFailedErr: // 乐观锁冲突,重试
continue
default: // 其他错误
return "", 0, err
}
}
}
Lua 脚本(EVAL)——真正原子性
把需要一次性完成的 所有读写逻辑 写成 Lua 脚本,Redis 会 单线程原子执行 整个脚本;脚本里一旦检测到异常,可通过 return redis.error_reply(...) 终止,不会留下中间状态。
如下:实现账户A转账100元到账户B:
lua:
-- atomic_transfer.lua
local bal = tonumber(redis.call('GET', KEYS[1]))
if bal < tonumber(ARGV[1]) then
return redis.error_reply("insufficient")
end
redis.call('DECRBY', KEYS[1], ARGV[1])
redis.call('INCRBY', KEYS[2], ARGV[1])
return "OK"
bash :
EVAL "$(cat atomic_transfer.lua)" 2 accountA accountB 100
- 2 → 脚本需要 2 个 key(accountA 和 accountB )
- accountA accountB 100 → 依次为 KEYS[1], KEYS[2], ARGV[1]
golang:
func TestRedisStackClient_Do4(t *testing.T) {
cli := NewRedisStackClient("localhost:6379", "", 0)
ctx := context.Background()
script := redis.NewScript(`
-- atomic_transfer.lua
local bal = tonumber(redis.call('GET', KEYS[1]))
if bal < tonumber(ARGV[1]) then
return redis.error_reply("insufficient")
end
redis.call('DECRBY', KEYS[1], ARGV[1])
redis.call('INCRBY', KEYS[2], ARGV[1])
return {"OK", bal - tonumber(ARGV[1])}
`)
val, err := script.Run(ctx, cli.Client, []string{"accountB", "accountA"}, 100).Result()
if err != nil {
panic(err)
}
// 4. 解析结果
arr := val.([]interface{})
result := arr[0].(string)
score, _ := arr[1].(int64)
fmt.Printf("pop => result=%s, account balance=%d\n", result, score)
}
Redis Cluster 环境下, 如果事务跨多个redis节点如何保证操作的原子性?
- 可以通过Lua 脚本 + Hash Tag的方式实现 跨节点的事务原子性, 仅对 Redis Cluster 生效;单机 / 哨兵无意义。
Hash Tag 是 Redis Cluster 为 把多个 key 强制映射到同一个 slot 而设计的 特殊语法标记,用来解决“跨 slot 操作”或“Lua 脚本多 key 必须落在同一 slot”的问题。
用一对花括号 {} 包住 任意子串,只要 {} 里的子串相同,key 就落在 同一 slot(进而同一节点,Cluster 通过 哈希槽分片 把 0-16383 共 16384 个槽分配给集群中的节点)
| 真实 key | 参与 CRC16 的片段 | slot 结果 | 是否同 slot |
|---|---|---|---|
user:{100}:balance |
100 |
slot 1234 | ✅ |
user:{100}:ledger |
100 |
slot 1234 | ✅ |
user:100:balance |
user:100:balance |
slot 5678 | ❌ |
func TestRedisStackClient_Do5(t *testing.T) {
cli := NewRedisStackClient("localhost:6379", "", 0)
ctx := context.Background()
script := `
-- atomic_transfer.lua
local bal = tonumber(redis.call('GET', KEYS[1]))
if bal < tonumber(ARGV[1]) then
return redis.error_reply("insufficient")
end
redis.call('DECRBY', KEYS[1], ARGV[1])
redis.call('INCRBY', KEYS[2], ARGV[1])
return {"OK", bal - tonumber(ARGV[1])}
`
sha, _ := cli.Client.ScriptLoad(ctx, script).Result()
res, err := cli.Client.EvalSha(ctx, sha,
[]string{"user:{100}:balance", "user:{100}:ledger"},
100,
).Result()
if err != nil {
panic(err)
}
// 4. 解析结果
arr := res.([]interface{})
result := arr[0].(string)
score, _ := arr[1].(int64)
fmt.Printf("pop => result=%s, account balance=%d\n", result, score)
}