【Redis源码剖析】 您所在的位置:网站首页 微信零钱2万截图 【Redis源码剖析】

【Redis源码剖析】

#【Redis源码剖析】 | 来源: 网络整理| 查看: 265

原创作品,转载请标明:http://blog.csdn.net/Xiejingfa/article/details/51262268

Redis源码剖析系列文章汇总:传送门

今天为大家带来Redis中事务部分的源码分析。Redis的事务机制允许将多个命令当做一个独立的单元运行,主要包括multi、exec、watch、unwatch、discard五个相关命令。如果你还不熟悉这几个命令,可以先看看我的另一篇文章【Redis学习笔记(七)】 Redis中的事务

本文所讲述的内容主要涉及redis.h和multi.c两个源文件,依据惯例,文后会提供注释版的源码。

1、总流程

事务从开始到执行需要经历以下三个阶段:

声明事务 (multi命令)命令入队 执行事务(exec命令)

对于一个拥有不同状态的对象,我们通常会使用状态机的手段加以管理。Redis也使用了类似的方法来实现对事务中不同状态的管理。

我们先来看看与事务有关的几个状态,这在Redis中又称作flag,定义在redis.h中。

#define REDIS_MULTI (1watched_keys,如果某个程序要获得该客户端监视的所有key,那么它只要获得该链表即可。

接下来,我们看看watch命令的具体实现,该功能由watchForKey函数完成。

void watchForKey(redisClient *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; watchedKey *wk; /* Check if we are already watching for this key */ // 检查该key是否已经保存在client->watched_keys列表中 // listRewind获取list的迭代器 listRewind(c->watched_keys,&li); // 遍历查找,如果发现给定key已经存在直接返回 while((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key,wk->key)) return; /* Key already watched */ } /* This key is not already watched in this DB. Let's add it */ // 检查redisDB->watched_keys是否保存了该key和客户端的映射关系,如果没有则添加之 // 获取监控给定key的客户端列表 clients = dictFetchValue(c->db->watched_keys,key); // 如果该列表为空,则创建一个 if (!clients) { clients = listCreate(); dictAdd(c->db->watched_keys,key,clients); incrRefCount(key); } // 并加入当前客户端 listAddNodeTail(clients,c); /* Add the new key to the list of keys watched by this client */ // 将一个新的watchedKey结构添加到client->watched_keys列表中 wk = zmalloc(sizeof(*wk)); wk->key = key; wk->db = c->db; incrRefCount(key); listAddNodeTail(c->watched_keys,wk); }

unwatch命令执行相反的操作,由unwatchAllKeys函数实现。

void unwatchAllKeys(redisClient *c) { listIter li; listNode *ln; // 如果没有key被监控,直接返回 if (listLength(c->watched_keys) == 0) return; // 获得c->watched_keys列表的迭代器 listRewind(c->watched_keys,&li); // 遍历c->watched_keys列表,逐一删除被该客户端监视的key while((ln = listNext(&li))) { list *clients; watchedKey *wk; /* Lookup the watched key -> clients list and remove the client * from the list */ wk = listNodeValue(ln); // 将当前客户端从db->watched_keys中删除 clients = dictFetchValue(wk->db->watched_keys, wk->key); redisAssertWithInfo(c,NULL,clients != NULL); listDelNode(clients,listSearchKey(clients,c)); /* Kill the entry at all if this was the only client */ // 如果没有任何客户端监控该key,则将该key从db->watched_keys中删除 if (listLength(clients) == 0) dictDelete(wk->db->watched_keys, wk->key); /* Remove this watched key from the client->watched list */ // 将c->watched_keys删除该key listDelNode(c->watched_keys,ln); // 释放资源 decrRefCount(wk->key); zfree(wk); } }

对于上面这两段代码,我已经详细注释过了,这里就不展开讲解。接下来我们看看multi/exec命令实现原理。

3、multi/exec命令实现

我们从 “声明事务 ” => “命令入队” => “执行事务”这三个阶段来分别介绍其原理。

3.1、声明事务

声明事务通过multi命令实现,从下面源码中我们可以看到:

Redis不支持嵌套事务。声明事务其实就是简单地将flags设置为REDIS_MULTI标识。随后redisClient进入事务状态,等待命令入队。 /* 执行MULTI命令 */ void multiCommand(redisClient *c) { // 不支持嵌套事务,否则直接报错 if (c->flags & REDIS_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; } // 设置事务标识 c->flags |= REDIS_MULTI; addReply(c,shared.ok); } 3.2、命令入队

这里我们先来介绍一下与命令队列相关的数据结构。

在redisClient中存在multiState mstate字段用来保存一个事务中的所有命令和其它相关信息,multiState结构定义在redis.h头文件中。

/* 事务状态结构体 */ typedef struct multiState { // 命令数组,保存着该事务中的所有命令并按输入顺序排列 multiCmd *commands; /* Array of MULTI commands */ // 命令数组长度,即命令的数量 int count; /* Total number of MULTI commands */ int minreplicas; /* MINREPLICAS for synchronous replication */ time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ } multiState;

multiState结构中的multiCmd *commands正是真正存放命令的命令队列,其实质是一个数组。multiCmd表示一条完整的输入命令,包含“要执行的命令”、“命令参数”、“参数个数”三个属性。定义如下:

/* 事务命令结构体 */ typedef struct multiCmd { // 命令参数 robj **argv; // 参数个数 int argc; // 要执行的命令 struct redisCommand *cmd; } multiCmd;

命令入队由queueMultiCommand函数实现。

/* 将一个新命令添加到multi命令队列中 */ void queueMultiCommand(redisClient *c) { multiCmd *mc; int j; // 在原commands后面配置空间以存放新命令 c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1)); // 执行新配置的空间 mc = c->mstate.commands+c->mstate.count; // 设置各个属性(命令、命令参数个数以及具体的命令参数) mc->cmd = c->cmd; mc->argc = c->argc; // 分配空间以存放命令参数 mc->argv = zmalloc(sizeof(robj*)*c->argc); memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); for (j = 0; j argc; j++) incrRefCount(mc->argv[j]); // 命令队列中保存的命令个数加1 c->mstate.count++; } 3.3、执行事务

Redis通过exec命令执行事务,该命令将会检查redisClient的flags标识,如果该标识为REDIS_DIRTY_CAS或REDIS_DIRTY_EXEC,则事务执行失败返回。如果客户端仍然处于事务状态, 那么当 exec 命令执行时,Redis会根据客户端所保存的事务队列, 以“先近先出”的策略执行事务队列中的命令,即最先入队的命令最先执行, 而最后入队的命令最后执行。

exec命令由execCommand执行。

/* 执行exec命令 */ void execCommand(redisClient *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; // 是否需要将MULTI/EXEC命令传播到slave节点/AOF int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ // 如果客户端当前不处于事务状态,直接返回 if (!(c->flags & REDIS_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; } /* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ // 检查是否需要中断事务执行,因为: // (1)、有被监控的key被修改 // (2)、命令入队的时候发生错误 // 对于第一种情况,Redis返回多个nil空对象(准确地说这种情况并不是错误,应视为一种特殊的行为) // 对于第二种情况则返回一个EXECABORT错误 if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) { addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); // 取消事务 discardTransaction(c); goto handle_monitor; } /* Exec all the queued commands */ // 现在可以执行该事务的所有命令了 // 取消对所有key的监控,否则会浪费CPU资源 unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ // 先备份一次命令队列中的命令 orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyMultiBulkLen(c,c->mstate.count); // 逐一将事务中的命令交给客户端redisClient执行 for (j = 0; j < c->mstate.count; j++) { // 将事务命令队列中的命令设置给客户端 c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; /* Propagate a MULTI request once we encounter the first write op. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ // 当我们第一次遇到写命令时,传播MULTI命令。如果是读命令则无需传播 // 这里我们MULTI/..../EXEC当做一个整体传输,保证服务器和AOF以及附属节点的一致性 if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) { execCommandPropagateMulti(c); // 只需要传播一次MULTI命令即可 must_propagate = 1; } // 真正执行命令 call(c,REDIS_CALL_FULL); /* Commands may alter argc/argv, restore mstate. */ // 命令执行后可能会被修改,需要更新操作 c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } // 恢复原命令 c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; // 清除事务状态 discardTransaction(c); /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ if (must_propagate) server.dirty++; handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: * MUTLI, EXEC, ... commands inside transaction ... * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command * table, and we do it here with correct ordering. */ if (listLength(server.monitors) && !server.loading) replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }

Redis事务的实现原理就介绍这么多。很多人一听到“事务”这个词就会潜意识的认为这是一个很复杂的东西。而实际上Redis中使用很轻巧的办法提供事务操作,代码只有300来行,并不是很复杂。

注释版源码请移步:https://github.com/xiejingfa/the-annotated-redis-2.8.24



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有