OpenResty上使用的redis工具类

最近在看360团队的openresty最佳实践,一边学lua这门语言,一边熟悉openresty技术栈。里边提供了一个封装的redis客户端工具类,这里稍微改了一下。

-- file name: resty/redis_iresty.lua
local redis_c = require "resty.redis"

--当前版本是否支持table.new,做下兼容处理
local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
    new_tab = function (narr, nrec) return {} end
end

--[[
table.new(narray,nhash)
预分配new一个table,指定array长度或者hash长度。
一般一个table推荐同时只用一种类型,所以一般是table.new(0,n)或table.new(n,0)
]]
local _M = new_tab(0, 155) -- 新建一个155长度的hash table
_M._VERSION = '0.01'

--redis命令
local commands = {
    "append",            "auth",              "bgrewriteaof",
    "bgsave",            "bitcount",          "bitop",
    "blpop",             "brpop",
    "brpoplpush",        "client",            "config",
    "dbsize",
    "debug",             "decr",              "decrby",
    "del",               "discard",           "dump",
    "echo",
    "eval",              "exec",              "exists",
    "expire",            "expireat",          "flushall",
    "flushdb",           "get",               "getbit",
    "getrange",          "getset",            "hdel",
    "hexists",           "hget",              "hgetall",
    "hincrby",           "hincrbyfloat",      "hkeys",
    "hlen",
    "hmget",              "hmset",      "hscan",
    "hset",
    "hsetnx",            "hvals",             "incr",
    "incrby",            "incrbyfloat",       "info",
    "keys",
    "lastsave",          "lindex",            "linsert",
    "llen",              "lpop",              "lpush",
    "lpushx",            "lrange",            "lrem",
    "lset",              "ltrim",             "mget",
    "migrate",
    "monitor",           "move",              "mset",
    "msetnx",            "multi",             "object",
    "persist",           "pexpire",           "pexpireat",
    "ping",              "psetex",            "psubscribe",
    "pttl",
    "publish",      --[[ "punsubscribe", ]]   "pubsub",
    "quit",
    "randomkey",         "rename",            "renamenx",
    "restore",
    "rpop",              "rpoplpush",         "rpush",
    "rpushx",            "sadd",              "save",
    "scan",              "scard",             "script",
    "sdiff",             "sdiffstore",
    "select",            "set",               "setbit",
    "setex",             "setnx",             "setrange",
    "shutdown",          "sinter",            "sinterstore",
    "sismember",         "slaveof",           "slowlog",
    "smembers",          "smove",             "sort",
    "spop",              "srandmember",       "srem",
    "sscan",
    "strlen",       --[[ "subscribe",  ]]     "sunion",
    "sunionstore",       "sync",              "time",
    "ttl",
    "type",         --[[ "unsubscribe", ]]    "unwatch",
    "watch",             "zadd",              "zcard",
    "zcount",            "zincrby",           "zinterstore",
    "zrange",            "zrangebyscore",     "zrank",
    "zrem",              "zremrangebyrank",   "zremrangebyscore",
    "zrevrange",         "zrevrangebyscore",  "zrevrank",
    "zscan",
    "zscore",            "zunionstore",       "evalsha"
}

--元表,用在返回的模块对象上
local mt = { __index = _M }


local function is_redis_null( res )
    if type(res) == "table" then
        for k,v in pairs(res) do
            if v ~= ngx.null then
                return false
            end
        end
        return true
    elseif res == ngx.null then
        return true
    elseif res == nil then
        return true
    end

    return false
end


-- change connect address as you need
function _M.connect_mod( self, redis )
    redis:set_timeout(self.timeout)
    --return redis:connect("127.0.0.1", 6379)
    --return redis:connect(self.ip, self.port)
    local ok ,err = redis:connect(self.ip, self.port)
    if not ok then
        return {}, err
    end

    local count
    count, err = redis:get_reused_times()
    if 0 == count then
        --local ok, err = redis:auth("123456")
        ok, err = redis:auth(self.password)
        if not ok then
            --ngx.say("failed to auth: ", err)
            return {},  err
        end
    elseif err then
        --ngx.say("failed to get reused times: ", err)
        return {}, err
    end

    return ok, err
end


function _M.set_keepalive_mod( self, redis )
    -- put it into the connection pool of size 100, with 60 seconds max idle time
    --return redis:set_keepalive(60000, 100)
    return redis:set_keepalive(self.max_idle_ms, self.pool_size)
end


function _M.init_pipeline( self )
    self._reqs = {}
end


function _M.commit_pipeline( self )
    local reqs = self._reqs

    if nil == reqs or 0 == #reqs then
        return {}, "no pipeline"
    else
        self._reqs = nil
    end

    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok then
        return {}, err
    end

    redis:init_pipeline()
    for _, vals in ipairs(reqs) do
        local fun = redis[vals[1]]
        table.remove(vals , 1)

        fun(redis, unpack(vals))
    end

    local results, err = redis:commit_pipeline()
    if not results or err then
        return {}, err
    end

    if is_redis_null(results) then
        results = {}
        ngx.log(ngx.WARN, "is null")
    end
    -- table.remove (results , 1)

    self.set_keepalive_mod(redis)

    for i,value in ipairs(results) do
        if is_redis_null(value) then
            results[i] = nil
        end
    end

    return results, err
end


function _M.subscribe( self, channel )
    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok or err then
        return nil, err
    end

    local res, err = redis:subscribe(channel)
    if not res then
        return nil, err
    end

    res, err = redis:read_reply()
    if not res then
        return nil, err
    end

    redis:unsubscribe(channel)
    self.set_keepalive_mod(redis)

    return res, err
end


local function do_command(self, cmd, ... )
    if self._reqs then
        table.insert(self._reqs, {cmd, ...})
        return
    end

    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok or err then
        return nil, err
    end

    local fun = redis[cmd]
    local result, err = fun(redis, ...)
    if not result or err then
        -- ngx.log(ngx.ERR, "pipeline result:", result, " err:", err)
        return nil, err
    end

    if is_redis_null(result) then
        result = nil
    end

    self:set_keepalive_mod(redis)  -- self.set_keepalive_mod(self, redis)

    return result, err
end


for i = 1, #commands do
    local cmd = commands[i]
    _M[cmd] =
            function (self, ...)
                return do_command(self, cmd, ...)
            end
end


function _M.new(self, opts)
    opts = opts or {}
    --local timeout = (opts.timeout and opts.timeout * 1000) or 1000
    local timeout = opts.timeout or 1000
    local db_index= opts.db_index or 0

    --补齐其他可配置项
    local ip = opts.ip or "127.0.0.1"   --默认去连本地redis,目前不支持哨兵和集群模式
    local port = opts.port or 6379
    local max_idle_ms = opts.max_idle_ms or 60000
    local pool_size = opts.pool_size or 64
    local password = opts.password or "password"
    
    --ngx.log(ngx.INFO, "redis timeout: ", timeout, " , db_index: ", db_index)
    --ngx.log(ngx.INFO, "redis ip: ", ip, " , port: ", port)
    --ngx.log(ngx.INFO, "pool_size: ", pool_size, " , max_idle_ms: ", max_idle_ms)

    return setmetatable({
            timeout = timeout,
            db_index = db_index,
            ip = ip,
            port = port,
            max_idle_ms = max_idle_ms,
            pool_size = pool_size,
            password = password,
            _reqs = nil }, mt)
end


return _M

代码使用:

local redis_client = require "wangan.common.redis_iresty"
local red = redis_client:new({
    ip = "127.0.0.1",
    port = 6379,
    password = "123456",
    timeout = 2000,
    db_index = 0,
    max_idle_ms = 60000,
    pool_size = 32
})

local ok, err = red:get("my-account")

if not ok then
    ngx.say("failed to get my-account: ", err)
    return
end

ngx.say("my-account: ", ok)

然后如果不使用封装的工具类,代码是这样的:

--[[
    https://github.com/openresty/lua-resty-redis
    openresty连接redis的例子
    官方还不支持redis哨兵和集群模式,第三方库倒是有一个https://github.com/steve0511/resty-redis-cluster
--]]

local redis = require "resty.redis"
local red = redis:new()
--red:set_timeouts(connect_timeout, send_timeout, read_timeout)
red:set_timeouts(1000, 1000, 1000)

--red:connect是新建连接、还有可能是去内建连接池里取一个连接
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
    ngx.say("failed to connect: ", err)
    return
end


--每个连接只需要auth一次,这里get_reused_times判定是否是新连接

local count
count, err = red:get_reused_times()
if 0 == count then
    ok, err = red:auth("123456")
    if not ok then
        ngx.say("failed to auth: ", err)
        return
    end
elseif err then
    ngx.say("failed to get reused times: ", err)
    return
end


local res, err = red:get("my-account")

if not res then
    ngx.say("failed to get my-account: ", err)
    return
end

if res == ngx.null then
    ngx.say("my-account not found.")
    return
end

ngx.say("my-account: ", res)

--连接正常使用之后,将连接放入连接池
--允许空闲10s,最大连接数100
local ok , err = red:set_keepalive(10000, 100)
if not ok then
    ngx.say("failed to set_keepalive: ", err)
    return
end

--[[
    关于将连接放入连接池,一定要确保连接使用过程中未发生异常、即顺利的使用完以后在放入。
    并且要注意连接的状态,因为放入连接池的连接将会带着这个状态,会被其他的业务模块使用到,可能会引发预期之外的问题。
    例如:
    red:connect(ip, 6379)
    red:select(1)
    red:set_keepalive(10000, 100)
    高并发下上面代码将连接放到池里后,后面其他模块如果拿到这个连接还是会取db1操作的,如果默认是想去db0操作那么就会引发bug
    所以正确的代码应该是在set_keepalvie之前,复位默认,red:select(0)
--]]

不使用封装工具类也没啥问题,但是会有很多重复冗余的代码。

关于哨兵和集群模式

参看OpenResty连接redis的例子 https://github.com/openresty/lua-resty-redis
目前官方还不支持redis哨兵和集群模式,第三方库和实现思路可以参考:
https://github.com/steve0511/resty-redis-cluster

https://www.sohu.com/a/300661862_355142

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容

  • 一个简单实用的函数式缓存工具类:封装了基本的缓存增删查操作,提供了热点数据集中失效和缓存穿透的统一解决方案,以及在...
    javacoo阅读 1,159评论 2 13
  • 目标 基于openresty 和 lua 参考apisix的后台实现一个自己的网关那首先要搭建一个apisix 啊...
    dozenx阅读 945评论 0 1
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 8,532评论 28 53
  • 首先介绍下自己的背景: 我11年左右入市到现在,也差不多有4年时间,看过一些关于股票投资的书籍,对于巴菲特等股神的...
    瞎投资阅读 5,718评论 3 8
  • ![Flask](data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAW...
    极客学院Wiki阅读 7,240评论 0 3