1、golang语言Redis客户端简介
通常我们在选择某一组件的客户端包时,优先选择官方提供的包。redis 本身虽然并没有提供 go 语言的 client 包,但是提供了一份 client 包列表,并对部分包做了推荐标识,具体参考:https://redis.io/clients#go
接下来的全部使用示例,都是在 "github.com/gomodule/redigo/redis" 包的基础上实现。选择这个包的原因在于,这个包只有一个 Do 函数执行 Redis 命令,使用方法更接近Redis的原生命令,这无疑会降低我们的学习成本,同时该包对Print-alike API, Pipelining (including transactions), Pub/Sub, Connection pooling, scripting 等我们常用的功能也有良好的支持。
在开始之前,首先下载该第三方包
go get "github.com/gomodule/redigo/redis"
2、Redis连接池对象的构建和关闭
通常在初始化阶段创建 Redis 客户端连接池
var RedisClientPool *redis.Pool
func InitJimDb() {
RedisClientPool = &redis.Pool{
MaxIdle: 100,
MaxActive: 12000,
IdleTimeout: time.Duration(180),
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("xxxxxx"),redis.DialReadTimeout(time.Second),redis.DialWriteTimeout(time.Second))
if err != nil {
logger.Errorf("redisClient dial host: %s, auth: %s err: %s", "127.0.0.1:6379", "xxxxxx", err.Error())
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
if err != nil {
logger.Errorf("redisClient ping err: %s", err.Error())
}
return err
},
}
logger.Infof("init jimDb ok")
}
func CloseJimDb() {
if RedisClientPool != nil {
err := RedisClientPool.Close()
if err != nil {
logger.Errorf("do CloseJimDb error:%s", err.Error())
}
}
}
客户端初始化参数含义:
- MaxIdle:连接池中最大的空闲连接数
- MaxActive:允许的最大连接 Redis 的连接数,设置为0则没有限制
- IdleTimeout:空闲超时时间,超过此时间后,则会关闭连接。若此值设置为0,则不会关闭连接,应用应设置一个小于服务超时的值
- Wait:若为 true,则当连接数达到 MaxActive 时,使用 Get() 获取新的连接时将会等待,直到有连接释放连接
- MaxConnLifetime:最大连接生命时长,当连接存活时间超过改值,则会被关闭,若设置为0,则不会因为存活时间关闭连接
- Dial:用于创建和配置连接的支持方法,通常用于 DB 选择/连接超时时间/读超时时间/写超时时间/密码认证等的初始化
在执行 Redis 数据库操作时,需要首先获取连接,同时记得在操作结束后释放连接
rc := g.RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
logger.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
注:如果不执行 rc.Close() 释放连接,服务将很快打满内存,通过 pprof 将发现连接数非常大。
系统退出时,记得手动 Close() 客户端连接。
3、Redis客户端基本用法
(1)、redis基本数据类型
数据类型 | 可以存储的值 | 操作 |
---|---|---|
STRING | 字符串、整数或者浮点数 | 对整个字符串或者字符串的其中一部分执行操作 对整数和浮点数执行自增或者自减操作 |
LIST | 列表 | 从两端压入或者弹出元素 对单个或者多个元素进行修剪, 只保留一个范围内的元素 |
SET | 无序集合 | 添加、获取、移除单个元素 检查一个元素是否存在于集合中 计算交集、并集、差集 从集合里面随机获取元素 |
HASH | 包含键值对的无序散列表 | 添加、获取、移除单个键值对 获取所有键值对 检查某个键是否存在 |
ZSET | 有序集合 | 添加、获取、删除元素 根据分值范围或者成员来获取元素 计算一个键的排名 |
各类型的基本操作预发可以参考Redis,本文不做过多介绍。
注:对于 Redis 键值对的 Key 值来说,key 只有唯一的类型 String。
示例:
127.0.0.1:6379> set 1 "value1"
OK
127.0.0.1:6379> set "1" "value2"
OK
127.0.0.1:6379> get 1
"value2"
127.0.0.1:6379> get "1"
"value2"
(2)、Redigo客户端支持的操作函数
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value when the connection is not usable.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
}
Redigo 客户端支持的与数据操作相关的方法主要有4个:Do()、Send()、Flush() 和 Receive()。
其中,最多用到的方法是 Do(),其功能是单次执行某一命令,并返回执行结果。Send()、Flush() 和 Receive() 是一组操作,通常同时出现,主要实现 pipline 方式的数据写入,也可以实现消息的订阅通知功能。
(3)、Do()方法示例
Do(commandName string, args ...interface{}) (reply interface{}, err error)
根据源码可知,Do() 的入参,第一个为 commandName,也就是Redis 本身支持指令的大写字符串,arg 根据命令本身的参数按顺序填入即可。由于该客户端支持的 commandName 与 Redis cli 本身的指令名一致,因此学习成本较低。具体我们以 SET 命令为例,其余的操作一依次类推,只要命令和所需参数一致即可:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
reply, err := rc.Do("SET", "1", "value1")
if err != nil {
t.Errorf("do set %s,%s error:%s", "1", "valuea", err.Error())
return
}
reply, err := rc.Do("MSET", "1", "value1","2","value2")
if err != nil {
t.Errorf("do set %s,%s error:%s", "1", "valuea", err.Error())
return
}
上面示例在操作参数很少时还是十分方便的,但是当我们想批量写入大量数据时,就会显得异常繁琐,那么有没有简单的方式执行数据的批量写入呢?
我们可以通过自己构建 []interface{} 类型的参数数组实现,具体入如下所示:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
var values = []string{"1", "2", "3", "4", "5", "6", "7", "8", "8", "10"}
args := []interface{}{"redis-list"}
for _, v := range values {
args = append(args, v)
}
reply, err := rc.Do("RPUSH", args...)
if err != nil {
t.Errorf("do set %s,%s error:%s", GetKey("test"), "valuea", err.Error())
return
}
此外,redigo 客户端其实已经给我们提供了一个参数转化的方法,而无需我们自己去手动将参数转化为 []interface{}
具体的源码如下:
// Args is a helper for constructing command arguments from structured values.
type Args []interface{}
// Add returns the result of appending value to args.
func (args Args) Add(value ...interface{}) Args {
return append(args, value...)
}
// AddFlat returns the result of appending the flattened value of v to args.
//
// Maps are flattened by appending the alternating keys and map values to args.
//
// Slices are flattened by appending the slice elements to args.
//
// Structs are flattened by appending the alternating names and values of
// exported fields to args. If v is a nil struct pointer, then nothing is
// appended. The 'redis' field tag overrides struct field names. See ScanStruct
// for more information on the use of the 'redis' field tag.
//
// Other types are appended to args as is.
func (args Args) AddFlat(v interface{}) Args {
rv := reflect.ValueOf(v)
switch rv.Kind() {
case reflect.Struct:
args = flattenStruct(args, rv)
case reflect.Slice:
for i := 0; i < rv.Len(); i++ {
args = append(args, rv.Index(i).Interface())
}
case reflect.Map:
for _, k := range rv.MapKeys() {
args = append(args, k.Interface(), rv.MapIndex(k).Interface())
}
case reflect.Ptr:
if rv.Type().Elem().Kind() == reflect.Struct {
if !rv.IsNil() {
args = flattenStruct(args, rv.Elem())
}
} else {
args = append(args, v)
}
default:
args = append(args, v)
}
return args
}
Add() 方法用于添加基本类型参数,例如 string、integer、float、interface 等。
AddFlat() 用于添加复杂类型参数,例如 Struce、slice、map、str 等。
根据源码可知,其基本原理都是帮助我们构建一个 []interface{}类型的参数组。只是简化了我们的代码量。具体实例如下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
fmt.Printf("Cache init redis client close err: %s", rcErr.Error())
}
}()
var tempMap = map[int64][]byte{
1:bytes,
2:bytes,
3:bytes,
4:bytes,
5:bytes,
6:bytes,
}
_,err := rc.Do("HMSET",redis.Args{}.Add("redis-hash").AddFlat(tempMap)...)
if err != nil{
fmt.Println("hmset error:",err.Error())
}
(4)、pipline数据写入方式
pipline 的数据写入方式与 Do() 这种单次写入方式最大的区别在于批量数据处理的写入速度。由于 Do() 这种单次写入需要等返回结果之后才能进行第二次操作,而 pipline 可以在第一次操作结果返回之前,继续发送后续的 Request,直到把所有的 Request 都发送完毕。因此,当操作很多时,pipeline 更高效。
Redigo客户端的pipline处理方式依赖于三个方法:Send()、Flush()和Receive()。
Send() 发送命令到输出缓冲区。
Flush() 写入命令并刷新输出缓冲区。
Receive()接收服务器的返回值。
调用 Receive() 的次数必须对应使用 Send() 发送命令的次数。具体示例如大下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
rc.Send("SET", "1","1")
rc.Send("SET", "2","2")
rc.Flush()
res,_ :=rc.Receive()
fmt.Println(res)
res,_ = rc.Receive()
fmt.Println(res)
注:虽然多个 Send() 的指令是在执行 Flush() 时才真正将输出缓冲区数据刷新到 Redis 服务器,但是 send()中的多个指令并不是事务。如果需要保证多个操作的原子性,还是需要使用事务来实现。
(5)、redis 客户端的并发
redigo 支持 Receive 方法的一个并发调用者和 Send 和 Flush 方法的一个并发调用者,但是不支持 Do() 方法的并发调用。若需要完全并发访问 redis,需要参考第2节内容,创建redis 客户端连接池,通过多个连接实现真正意义上的并发。Redis-Doc
理解起来也很容易,Do()、Send()、Flush() 和 Receive(),都必须依赖某一个连接实现,具体的:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
reply, err := rc.Do("SET", "1", "value1")
无论拥有几个并发协程,由于所有操作都基于同一个连接,那么 Do() 操作一定需要执行完一个完整的 Request-Response才可以执行下一个请求,因此 Do() 无法支持并发调用。而 Send()、Flush() 和 Receive() 方法是异步操作,无需等待Rsponse返回,因此可以支持并发调用。具体的:一个协程执行 Send() 和 Flush() 方法用于写入操作,另一个协程执行 Receive() 方法用于异步接收操作结果。具体如下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
var wg sync.WaitGroup
wg.Add(1)
go func(){
defer wg.Done()
for i:=0;i<4;i++{
rc.Send("SET", i+1,i+1)
}
rc.Flush()
}()
wg.Add(1)
go func(){
defer wg.Done()
for i:=0;i<4;i++{
res,_ := rc.Receive()
fmt.Println(res)
}
}()
wg.Wait()
fmt.Println("exec success")
如果要实现真正意义上对 Redis 的并发访问,只能依赖连接池实现。
(6)、事务操作
单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。
Redis 事务
命令 | 描述 |
---|---|
MULTI | 标记一个事务块的开始 |
EXEC | 执行所有事务块内的命令 |
DISCARD | 取消事务,放弃执行事务块内的所有命令 |
WATCH | 监视一个(或多个)key,如果在事务执行之前这个(或多个)key被其他命令所改动,那么事务将被打断 |
UNWATCH | 取消 WATCH 命令对所有 keys 的监视 |
其中我们常用的命令是前三个,例如,我们需要对 userCount 和 productCount同时进行自增处理,为保证两个字段一致,基于事务来实现,具体示例如下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
rc.Send("MULTI")
rc.Send("INCR", "userCount")
rc.Send("INCR", "produceCount")
rc.Send("EXEC")
rc.Flush()
r,_ := rc.Receive()
fmt.Println(r)
r,_ = rc.Receive()
fmt.Println(r)
r,_ = rc.Receive()
fmt.Println(r)
r,_ = rc.Receive()
fmt.Println(r)
对一个的结果如下:
OK //事务开启的返回
QUEUED //操作入队列
QUEUED //操作入队列
[5 5] //执行操作并返回结果
实际上,大多数情况下,我们并不关心中间过程的返回结果,只关注最终的结果,这种情况下,我们只需将最后一条 Send() 指令替换成 Do()即可。
Do方法结合了Send,Flush和Receive方法的功能。
Do方法首先写入命令并刷新输出缓冲区。接下来,
Do方法接收所有待处理的回复,包括Do执行的命令的回复。
如果收到的任何回复都是错误,则Do返回错误。 如果没有错误,则Do返回最后一个返回值。
如果Do方法的命令参数是“”,则Do方法将刷新输出缓冲区并接收挂起的回复而不发送命令。
具体示例如下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
rc.Send("MULTI")
rc.Send("INCR", "userCount")
rc.Send("INCR", "produceCount")
r, _ := rc.Do("EXEC")
fmt.Println(r) // [1,1]
返回结果为:
[6 6]
通过 Do() 命令完成最终事务的执行,将为我们忽略中间结果,更好的关注事务整体的运行结果。