Refactor code for BRPOPLPUSH.

This commit is contained in:
Damian Janowski & Michel Martens 2010-11-08 20:47:46 -03:00 committed by Michel Martens
parent 357a841714
commit ba3b474111
2 changed files with 88 additions and 43 deletions

View File

@ -689,7 +689,7 @@ void rpoplpushCommand(redisClient *c) {
/* Set a client in blocking mode for the specified key, with the specified /* Set a client in blocking mode for the specified key, with the specified
* timeout */ * timeout */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
dictEntry *de; dictEntry *de;
list *l; list *l;
int j; int j;
@ -697,6 +697,12 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); c->bstate.keys = zmalloc(sizeof(robj*)*numkeys);
c->bstate.count = numkeys; c->bstate.count = numkeys;
c->bstate.timeout = timeout; c->bstate.timeout = timeout;
c->bstate.target = target;
if (target != NULL) {
incrRefCount(target);
}
for (j = 0; j < numkeys; j++) { for (j = 0; j < numkeys; j++) {
/* Add the key in the client structure, to map clients -> keys */ /* Add the key in the client structure, to map clients -> keys */
c->bstate.keys[j] = keys[j]; c->bstate.keys[j] = keys[j];
@ -741,9 +747,15 @@ void unblockClientWaitingData(redisClient *c) {
dictDelete(c->db->blocking_keys,c->bstate.keys[j]); dictDelete(c->db->blocking_keys,c->bstate.keys[j]);
decrRefCount(c->bstate.keys[j]); decrRefCount(c->bstate.keys[j]);
} }
if (c->bstate.target != NULL) {
decrRefCount(c->bstate.target);
}
/* Cleanup the client structure */ /* Cleanup the client structure */
zfree(c->bstate.keys); zfree(c->bstate.keys);
c->bstate.keys = NULL; c->bstate.keys = NULL;
c->bstate.target = NULL;
c->flags &= (~REDIS_BLOCKED); c->flags &= (~REDIS_BLOCKED);
server.blpop_blocked_clients--; server.blpop_blocked_clients--;
/* We want to process data if there is some command waiting /* We want to process data if there is some command waiting
@ -783,8 +795,6 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
addReplyBulk(receiver,ele); addReplyBulk(receiver,ele);
} }
else { else {
receiver->argc++;
robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target);
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
@ -806,17 +816,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
/* Blocking RPOP/LPOP */ /* Blocking RPOP/LPOP */
void blockingPopGenericCommand(redisClient *c, int where) { void blockingPopGenericCommand(redisClient *c, int where) {
robj *o; robj *o;
long long lltimeout;
time_t timeout; time_t timeout;
int j; int j;
/* Make sure timeout is an integer value */ if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) {
if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout,
"timeout is not an integer") != REDIS_OK) return;
/* Make sure the timeout is not negative */
if (lltimeout < 0) {
addReplyError(c,"timeout is negative");
return; return;
} }
@ -833,7 +836,6 @@ void blockingPopGenericCommand(redisClient *c, int where) {
robj *argv[2], **orig_argv; robj *argv[2], **orig_argv;
int orig_argc; int orig_argc;
if (c->bstate.target == NULL) {
/* We need to alter the command arguments before to call /* We need to alter the command arguments before to call
* popGenericCommand() as the command takes a single key. */ * popGenericCommand() as the command takes a single key. */
orig_argv = c->argv; orig_argv = c->argv;
@ -855,13 +857,6 @@ void blockingPopGenericCommand(redisClient *c, int where) {
/* Fix the client structure with the original stuff */ /* Fix the client structure with the original stuff */
c->argv = orig_argv; c->argv = orig_argv;
c->argc = orig_argc; c->argc = orig_argc;
}
else {
c->argv[2] = c->bstate.target;
c->bstate.target = NULL;
rpoplpushCommand(c);
}
return; return;
} }
@ -877,9 +872,26 @@ void blockingPopGenericCommand(redisClient *c, int where) {
} }
/* If the list is empty or the key does not exists we must block */ /* If the list is empty or the key does not exists we must block */
timeout = lltimeout;
if (timeout > 0) timeout += time(NULL); if (timeout > 0) timeout += time(NULL);
blockForKeys(c,c->argv+1,c->argc-2,timeout); blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}
int checkTimeout(redisClient *c, robj *object, time_t *timeout) {
long long lltimeout;
if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) {
addReplyError(c, "timeout is not an integer");
return REDIS_ERR;
}
if (lltimeout < 0) {
addReplyError(c, "timeout is negative");
return REDIS_ERR;
}
*timeout = lltimeout;
return REDIS_OK;
} }
void blpopCommand(redisClient *c) { void blpopCommand(redisClient *c) {
@ -891,9 +903,28 @@ void brpopCommand(redisClient *c) {
} }
void brpoplpushCommand(redisClient *c) { void brpoplpushCommand(redisClient *c) {
c->bstate.target = c->argv[2]; time_t timeout;
c->argv[2] = c->argv[3];
c->argc--;
blockingPopGenericCommand(c,REDIS_TAIL); if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) {
return;
}
robj *key = lookupKeyWrite(c->db, c->argv[1]);
if (key == NULL) {
// block
if (c->flags & REDIS_MULTI) {
addReply(c,shared.nullmultibulk);
} else {
if (timeout > 0) timeout += time(NULL);
blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
}
} else if (key->type != REDIS_LIST) {
addReply(c, shared.wrongtypeerr);
} else {
// The list exists and has elements.
redisAssert(listTypeLength(key) > 0);
rpoplpushCommand(c);
}
} }

View File

@ -178,6 +178,20 @@ start_server {
assert_equal {foo} [r lrange blist 0 -1] assert_equal {foo} [r lrange blist 0 -1]
} }
test {BRPOPLPUSH inside a transaction} {
r del xlist target
r lpush xlist foo
r lpush xlist bar
r multi
r brpoplpush xlist target 0
r brpoplpush xlist target 0
r brpoplpush xlist target 0
r lrange xlist 0 -1
r lrange target 0 -1
r exec
} {foo bar {} {} {bar foo}}
foreach {pop} {BLPOP BRPOP} { foreach {pop} {BLPOP BRPOP} {
test "$pop: with single empty list argument" { test "$pop: with single empty list argument" {
set rd [redis_deferring_client] set rd [redis_deferring_client]