最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501
当前位置: 首页 - 科技 - 知识百科 - 正文

redis源代码分析16–阻塞式命令

来源:懂视网 责编:小采 时间:2020-11-09 13:32:43
文档

redis源代码分析16–阻塞式命令

redis源代码分析16–阻塞式命令:redis现在只支持对list的阻塞式操作,相关的两个命令是brpop和blpop。 这两个命令在list中有元素时,跟普通的pop没有区别,弹出list的一个元素,然后返回。但在list没有元素时,会为redisClient设置REDIS_BLOCKED标志,然后client阻塞(设置R
推荐度:
导读redis源代码分析16–阻塞式命令:redis现在只支持对list的阻塞式操作,相关的两个命令是brpop和blpop。 这两个命令在list中有元素时,跟普通的pop没有区别,弹出list的一个元素,然后返回。但在list没有元素时,会为redisClient设置REDIS_BLOCKED标志,然后client阻塞(设置R

redis现在只支持对list的阻塞式操作,相关的两个命令是brpop和blpop。 这两个命令在list中有元素时,跟普通的pop没有区别,弹出list的一个元素,然后返回。但在list没有元素时,会为redisClient设置REDIS_BLOCKED标志,然后client阻塞(设置REDIS_BLOCKED标

redis现在只支持对list的阻塞式操作,相关的两个命令是brpop和blpop。

这两个命令在list中有元素时,跟普通的pop没有区别,弹出list的一个元素,然后返回。但在list没有元素时,会为redisClient设置REDIS_BLOCKED标志,然后client阻塞(设置REDIS_BLOCKED标志的redisClient会一直阻塞,参考命令处理章节),一直到新元素加入时(push操作的处理函数pushGenericCommand),才会返回。

这两个命令设置的处理函数brpopCommand和blpopCommand都会调用blockingPopGenericCommand。该函数在检查list中有元素后,会调用非阻塞的popGenericCommand来弹出一个元素,否则调用blockForKeys来处理阻塞的情况。

/* Blocking RPOP/LPOP */
static void blockingPopGenericCommand(redisClient *c, int where) {
 robj *o;
 long long lltimeout;
 time_t timeout;
 int j;
 /* Make sure timeout is an integer value */
 if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout,
 "timeout is not an integer") != REDIS_OK) return;
 /* Make sure the timeout is not negative */
 if (lltimeout < 0) {
 addReplySds(c,sdsnew("-ERR timeout is negative\r\n"));
 return;
 }
 for (j = 1; j < c->argc-1; j++) {
 o = lookupKeyWrite(c->db,c->argv[j]);
 if (o != NULL) {
 if (o->type != REDIS_LIST) {
 addReply(c,shared.wrongtypeerr);
 return;
 } else {
 list *list = o->ptr;
 if (listLength(list) != 0) {
 /* If the list contains elements fall back to the usual
 * non-blocking POP operation */
 robj *argv[2], **orig_argv;
 int orig_argc;
 /* We need to alter the command arguments before to call
 * popGenericCommand() as the command takes a single key. */
 orig_argv = c->argv;
 orig_argc = c->argc;
 argv[1] = c->argv[j];
 c->argv = argv;
 c->argc = 2;
 /* Also the return value is different, we need to output
 * the multi bulk reply header and the key name. The
 * "real" command will add the last element (the value)
 * for us. If this souds like an hack to you it's just
 * because it is... */
 addReplySds(c,sdsnew("*2\r\n"));
 addReplyBulk(c,argv[1]);
 popGenericCommand(c,where);
 /* Fix the client structure with the original stuff */
 c->argv = orig_argv;
 c->argc = orig_argc;
 return;
 }
 }
 }
 }
 /* If we are inside a MULTI/EXEC and the list is empty the only thing
 * we can do is treating it as a timeout (even with timeout 0). */
 if (c->flags & REDIS_MULTI) {
 addReply(c,shared.nullmultibulk);
 return;
 }
 /* If the list is empty or the key does not exists we must block */
 timeout = lltimeout;
 if (timeout > 0) timeout += time(NULL);
 blockForKeys(c,c->argv+1,c->argc-2,timeout);
}

blockForKeys会在db->blockingkeys记下client和等待的key的对应关系,然后给client设置REDIS_BLOCKED标志,这样client就一直阻塞了。

static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
 dictEntry *de;
 list *l;
 int j;
 ---
 if (c->fd < 0) return;
 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
 c->blockingkeysnum = numkeys;
 c->blockingto = timeout;
 for (j = 0; j < numkeys; j++) {
 /* Add the key in the client structure, to map clients -> keys */
 c->blockingkeys[j] = keys[j];
 incrRefCount(keys[j]);
 /* And in the other "side", to map keys -> clients */
 de = dictFind(c->db->blockingkeys,keys[j]);
 if (de == NULL) {
 int retval;
 /* For every key we take a list of clients blocked for it */
 l = listCreate();
 retval = dictAdd(c->db->blockingkeys,keys[j],l);
 incrRefCount(keys[j]);
 assert(retval == DICT_OK);
 } else {
 l = dictGetEntryVal(de);
 }
 listAddNodeTail(l,c);
 }
 /* Mark the client as a blocked client */
 c->flags |= REDIS_BLOCKED;
 server.blpop_blocked_clients++;
}

等待的client会一直阻塞,直到有push操作,此时会调用unblockClientWaitingData来解除client的阻塞。

/* Unblock a client that's waiting in a blocking operation such as BLPOP */
// 减少对所阻塞对象的引用
static void unblockClientWaitingData(redisClient *c) {
 dictEntry *de;
 list *l;
 int j;
 assert(c->blockingkeys != NULL);
 /* The client may wait for multiple keys, so unblock it for every key. */
 for (j = 0; j < c->blockingkeysnum; j++) {
 /* Remove this client from the list of clients waiting for this key. */
 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
 assert(de != NULL);
 l = dictGetEntryVal(de);
 listDelNode(l,listSearchKey(l,c));
 /* If the list is empty we need to remove it to avoid wasting memory */
 if (listLength(l) == 0)
 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
 decrRefCount(c->blockingkeys[j]);
 }
 /* Cleanup the client structure */
 zfree(c->blockingkeys);
 c->blockingkeys = NULL;
 c->flags &= (~REDIS_BLOCKED);
 server.blpop_blocked_clients--;
 /* We want to process data if there is some command waiting
 * in the input buffer. Note that this is safe even if
 * unblockClientWaitingData() gets called from freeClient() because
 * freeClient() will be smart enough to call this function
 * *after* c->querybuf was set to NULL. */
 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
}

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

文档

redis源代码分析16–阻塞式命令

redis源代码分析16–阻塞式命令:redis现在只支持对list的阻塞式操作,相关的两个命令是brpop和blpop。 这两个命令在list中有元素时,跟普通的pop没有区别,弹出list的一个元素,然后返回。但在list没有元素时,会为redisClient设置REDIS_BLOCKED标志,然后client阻塞(设置R
推荐度:
标签: 现在 支持 解析
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top