Redis学习之事务

介绍

   Redis 事务可以一次执行多个命令, 并且带有以下两个重要的保证:

  • 批量操作在发送 EXEC 命令前被放入队列缓存。

  • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。

事务过程不同的客户端是没有事务之间的影响的,一个事务从开始到执行会经历以下三个阶段:

  • 开始事务

  • 命令入队

  • 执行事务

一、相关命令

  • Exec 执行所有事务块内的命令。

  • Watch 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

  • Discard 取消事务,放弃执行事务块内的所有命令。

  • Unwatch 取消 WATCH 命令对所有 key 的监视。

  • Multi 标记一个事务块的开始。

二、数据结构

typedef struct client {
    ...
    list *watched_keys; /* 事务操作 监控的keys链表 值为watchedKey结构*/
    multiState mstate;  /* 事务状态 */
    ...
}client;
typedef struct multiState {
    multiCmd *commands;     /*事务命令队列 */
    int count;              /*命令总个数*/
    ...
} multiState;
其中、client.watched_keys里面放的Node都是数据时如下结构体
//监视key
typedef struct watchedKey {
    robj *key;//监视的key
    redisDb *db;//哪个db里面的
} watchedKey;

//redisDb结构
typedef struct redisDb {
    ...
    dict *watched_keys;         /*数据库的监控key字典用来实现事务的 key -> client链表*/
    ...
} redisDb;

三、API

void multiCommand(client *c);//事务开始
void execCommand(client *c);//事务执行
void discardCommand(client *c);//取消事务
void watchCommand(client *c);//监控客户端的key
void unwatchCommand(client *c);//取消监控客户端正在监控的key

四、重要API解析

  • multiCommand

    void multiCommand(client *c) {//设置事务标志
        if (c->flags & CLIENT_MULTI) {
            addReplyError(c,"MULTI calls can not be nested");
            return;
        }
        c->flags |= CLIENT_MULTI;//设置事务标志位
        addReply(c,shared.ok);
    }
    
  • watchCommand

    /* 监视特定的key */
    void watchForKey(client *c, robj *key) {
        list *clients = NULL;
        listIter li;
        listNode *ln;
        watchedKey *wk;
        listRewind(c->watched_keys,&li);
        while((ln = listNext(&li))) {
            wk = listNodeValue(ln);
            if (wk->db == c->db && equalStringObjects(key,wk->key))
                return; /* 如果这个key已经被监控了 直接返回*/
        }
        /* 否则加入监视 */
        clients = dictFetchValue(c->db->watched_keys,key);
        if (!clients) {
            clients = listCreate();
            dictAdd(c->db->watched_keys,key,clients);
            incrRefCount(key);
        }
        listAddNodeTail(clients,c);//加入到监视keys的链表
        wk = zmalloc(sizeof(*wk));
        wk->key = key;
        wk->db = c->db;
        incrRefCount(key);
        listAddNodeTail(c->watched_keys,wk);//加入到watched_keys中
    }
    
    /* 键如果被改变 事务执行会失败*/
    void touchWatchedKey(redisDb *db, robj *key) {
        list *clients;
        listIter li;
        listNode *ln;
    
        if (dictSize(db->watched_keys) == 0) return;
        clients = dictFetchValue(db->watched_keys, key);
        if (!clients) return;
    
        //如果key被改变了,那么监视这个key的所有client都要被设置CLIENT_DIRTY_CAS
        listRewind(clients,&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
    
            c->flags |= CLIENT_DIRTY_CAS;
        }
    }
    
  • execCommand

    void execCommand(client *c) {//执行事务
        int j;
        robj **orig_argv;
        int orig_argc;
        struct redisCommand *orig_cmd;
        int must_propagate = 0; /* 需要将 MULTI/EXEC 同步到 AOF / slaves? */
        int was_master = server.masterhost == NULL;
    
        if (!(c->flags & CLIENT_MULTI)) {//如果并没有设置MULTI标志,无操作
            addReplyError(c,"EXEC without MULTI");
            return;
        }
        //如果事务期间被监视的key被修改了 停止事务,返回nil
        //入队的命令如果有误的话,执行事务error
        if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
            addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                       shared.nullarray[c->resp]);
            discardTransaction(c);
            goto handle_monitor;
        }
        //如果对于从库有修改key的操作 取消事务
        if (!server.loading && server.masterhost && server.repl_slave_ro &&
            !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
        {
            addReplyError(c,
                "Transaction contains write commands but instance "
                "is now a read-only replica. EXEC aborted.");
            discardTransaction(c);
            goto handle_monitor;
        }
    
        /* 执行所有命令*/
        unwatchAllKeys(c); /* 取消此用户所有监视key 提升cpu效率 */
        orig_argv = c->argv;
        orig_argc = c->argc;
        orig_cmd = c->cmd;
        addReplyArrayLen(c,c->mstate.count);
        for (j = 0; j < c->mstate.count; j++) {
            c->argc = c->mstate.commands[j].argc;
            c->argv = c->mstate.commands[j].argv;
            c->cmd = c->mstate.commands[j].cmd;
            //如果有修改key的操作的话 需要同步到 aof 和 从库
            if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
                execCommandPropagateMulti(c);
                must_propagate = 1;
            }
            //执行命令
            call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
            /* 以上函数可能会改变argc argv. */
            c->mstate.commands[j].argc = c->argc;
            c->mstate.commands[j].argv = c->argv;
            c->mstate.commands[j].cmd = c->cmd;
        }
        c->argv = orig_argv;
        c->argc = orig_argc;
        c->cmd = orig_cmd;
        discardTransaction(c);
        /* 需要将 MULTI/EXEC 同步到 AOF / slaves */
        if (must_propagate) {
            int is_master = server.masterhost == NULL;
            server.dirty++;
            if (server.repl_backlog && was_master && !is_master) {
                char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
                feedReplicationBacklog(execcmd,strlen(execcmd));
            }
        }
    handle_monitor:
        if (listLength(server.monitors) && !server.loading)
            replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }
    

参考

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容