go-redis 管道和事务,

在Eino 中indexer 组件中, 会使用redis的pipeline功能, 批量将文档的内容(content), 向量对应的bytes(content_vactor)及元数据(matadata)信息写入的hset进行存储。

Redis对于命令的批量处理, 提供pipeline 和 TxPipeline 两种方式执行,

  • pipeline : 通过在一次通信中将多个命令一起发送到服务器执行,避免了网络和处理开销
  • TxPipeline : 在pipeline 的基础上, 保证所有包含的命令将完整执行,不会被其他客户端的命令中断。

pipeline代码示例 :

func TestRedisStackClient_Do2(t *testing.T) {
    cli := NewRedisStackClient("localhost:6379", "", 0)
    pipe := cli.Client.Pipeline()
    ctx := context.Background()
    for i := 0; i < 5; i++ {
        pipe.Set(ctx, fmt.Sprintf("seat:%v", i), fmt.Sprintf("#%v", i), 0)
    }
    cmds, err := pipe.Exec(ctx)
    if err != nil {
        panic(err)
    }
    for _, c := range cmds {
        fmt.Printf("%v;", c.(*redis.StatusCmd).Val())
    }
    fmt.Println("")
    pipe = cli.Client.Pipeline()
    get0Result := pipe.Get(ctx, "seat:0")
    get3Result := pipe.Get(ctx, "seat:3")
    get4Result := pipe.Get(ctx, "seat:4")
    cmds, err = pipe.Exec(ctx)
    // The results are available only after the pipeline
    // has finished executing.
    fmt.Println(get0Result.Val()) // >>> #0
    fmt.Println(get3Result.Val()) // >>> #3
    fmt.Println(get4Result.Val()) // >>> #4
}

TxPipeline 代码示例:

func TestRedisStackClient_Do3(t *testing.T) {
    cli := NewRedisStackClient("localhost:6379", "", 0)
    trans := cli.Client.TxPipeline()
    ctx := context.Background()
    trans.IncrBy(ctx, "count:test1", 1)
    trans.IncrBy(ctx, "count:test1", 2)
    trans.IncrBy(ctx, "count:test1", 3)
    cmds, err := trans.Exec(ctx)
    if err != nil {
        panic(err)
    }
    for _, c := range cmds {
        fmt.Println(c.(*redis.IntCmd).Val())
    }
    cmd := cli.Client.Do(ctx, "GET", "count:test1")
    fmt.Println(cmd.Val())
}

Redis 的所有单条命令(包括 INCR、INCRBY、DECR、DECRBY、HINCRBY 等)都是原子操作。
无论多少个客户端并发调用,Redis 都会以单线程方式依次执行每一条指令,因此不会出现竞态条件,天然线程安全。

Redis 不支持事务回滚,那么如何保证操作的原子性呢?

redis没有回滚机制,但可以通过以下 2 种手段在不同场景下保证“操作的完整性”——即 要么全部成功,要么全部不生效。

乐观锁 + 重试(WATCH)

使用 WATCH 监视关键 key,事务里一旦发现 key 被其他客户端修改,就让 EXEC 失败并 重新执行整个流程。
下面用 WATCH 实现“ZPOP”——即原子地弹出有序集合中的最小/最大元素

// zPopMin 原子弹出分值最小的成员
func zPopMin(ctx context.Context, rdb *redis.Client, key string) (member string, score float64, err error) {
    for {
        err = rdb.Watch(ctx, func(tx *redis.Tx) error {
            // 1. 先读最小成员
            res, err := tx.ZRangeWithScores(ctx, key, 0, 0).Result()
            if err != nil || len(res) == 0 {
                return redis.Nil // 集合为空
            }
            member, score = res[0].Member.(string), res[0].Score

            // 2. 事务里删除
            _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
                pipe.ZRem(ctx, key, member)
                return nil
            })
            return err
        }, key)

        switch err {
        case nil: // 成功
            return member, score, nil
        case redis.TxFailedErr: // 乐观锁冲突,重试
            continue
        default: // 其他错误
            return "", 0, err
        }
    }
}

Lua 脚本(EVAL)——真正原子性

把需要一次性完成的 所有读写逻辑 写成 Lua 脚本,Redis 会 单线程原子执行 整个脚本;脚本里一旦检测到异常,可通过 return redis.error_reply(...) 终止,不会留下中间状态。

如下:实现账户A转账100元到账户B:
lua:

-- atomic_transfer.lua
local bal = tonumber(redis.call('GET', KEYS[1]))
if bal < tonumber(ARGV[1]) then
    return redis.error_reply("insufficient")
end
redis.call('DECRBY', KEYS[1], ARGV[1])
redis.call('INCRBY', KEYS[2], ARGV[1])
return "OK"

bash :

EVAL "$(cat atomic_transfer.lua)" 2 accountA accountB 100
  • 2 → 脚本需要 2 个 key(accountA 和 accountB )
  • accountA accountB 100 → 依次为 KEYS[1], KEYS[2], ARGV[1]

golang:

func TestRedisStackClient_Do4(t *testing.T) {
    cli := NewRedisStackClient("localhost:6379", "", 0)
    ctx := context.Background()
    script := redis.NewScript(`
        -- atomic_transfer.lua
        local bal = tonumber(redis.call('GET', KEYS[1]))
        if bal < tonumber(ARGV[1]) then
            return redis.error_reply("insufficient")
        end
        redis.call('DECRBY', KEYS[1], ARGV[1])
        redis.call('INCRBY', KEYS[2], ARGV[1])
        return {"OK",  bal - tonumber(ARGV[1])}
    `)

    val, err := script.Run(ctx, cli.Client, []string{"accountB", "accountA"}, 100).Result()
    if err != nil {
        panic(err)
    }
    // 4. 解析结果
    arr := val.([]interface{})
    result := arr[0].(string)
    score, _ := arr[1].(int64)
    fmt.Printf("pop => result=%s, account balance=%d\n", result, score)
}

Redis Cluster 环境下, 如果事务跨多个redis节点如何保证操作的原子性?

  • 可以通过Lua 脚本 + Hash Tag的方式实现 跨节点的事务原子性, 仅对 Redis Cluster 生效;单机 / 哨兵无意义。

Hash Tag 是 Redis Cluster 为 把多个 key 强制映射到同一个 slot 而设计的 特殊语法标记,用来解决“跨 slot 操作”或“Lua 脚本多 key 必须落在同一 slot”的问题。
用一对花括号 {} 包住 任意子串,只要 {} 里的子串相同,key 就落在 同一 slot(进而同一节点,Cluster 通过 哈希槽分片 把 0-16383 共 16384 个槽分配给集群中的节点)

真实 key 参与 CRC16 的片段 slot 结果 是否同 slot
user:{100}:balance 100 slot 1234
user:{100}:ledger 100 slot 1234
user:100:balance user:100:balance slot 5678
func TestRedisStackClient_Do5(t *testing.T) {
    cli := NewRedisStackClient("localhost:6379", "", 0)
    ctx := context.Background()
    script := `
        -- atomic_transfer.lua
        local bal = tonumber(redis.call('GET', KEYS[1]))
        if bal < tonumber(ARGV[1]) then
            return redis.error_reply("insufficient")
        end
        redis.call('DECRBY', KEYS[1], ARGV[1])
        redis.call('INCRBY', KEYS[2], ARGV[1])
        return {"OK",  bal - tonumber(ARGV[1])}
    `
    sha, _ := cli.Client.ScriptLoad(ctx, script).Result()
    res, err := cli.Client.EvalSha(ctx, sha,
        []string{"user:{100}:balance", "user:{100}:ledger"},
        100,
    ).Result()

    if err != nil {
        panic(err)
    }
    // 4. 解析结果
    arr := res.([]interface{})
    result := arr[0].(string)
    score, _ := arr[1].(int64)
    fmt.Printf("pop => result=%s, account balance=%d\n", result, score)
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容