Add LMPOP/BLMPOP commands. (#9373)
We want to add COUNT option for BLPOP. But we can't do it without breaking compatibility due to the command arguments syntax. So this commit introduce two new commands. Syntax for the new LMPOP command: `LMPOP numkeys [<key> ...] LEFT|RIGHT [COUNT count]` Syntax for the new BLMPOP command: `BLMPOP timeout numkeys [<key> ...] LEFT|RIGHT [COUNT count]` Some background: - LPOP takes one key, and can return multiple elements. - BLPOP takes multiple keys, but returns one element from just one key. - LMPOP can take multiple keys and return multiple elements from just one key. Note that LMPOP/BLMPOP can take multiple keys, it eventually operates on just one key. And it will propagate as LPOP or RPOP with the COUNT option. As a new command, it still return NIL if we can't pop any elements. For the normal response is nested arrays in RESP2 and RESP3, like: ``` LMPOP/BLMPOP 1) keyname 2) 1) element1 2) element2 ``` I.e. unlike BLPOP that returns a key name and one element so it uses a flat array, and LPOP that returns multiple elements with no key name, and again uses a flat array, this one has to return a nested array, and it does for for both RESP2 and RESP3 (like SCAN does) Some discuss can see: #766 #8824
This commit is contained in:
parent
216f168b2b
commit
c50af0aeba
@ -65,7 +65,7 @@
|
||||
#include "latency.h"
|
||||
#include "monotonic.h"
|
||||
|
||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto);
|
||||
void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted);
|
||||
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
|
||||
|
||||
/* This structure represents the blocked key information that we store
|
||||
@ -271,6 +271,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||
if (de) {
|
||||
list *clients = dictGetVal(de);
|
||||
int numclients = listLength(clients);
|
||||
int deleted = 0;
|
||||
|
||||
while(numclients--) {
|
||||
listNode *clientnode = listFirst(clients);
|
||||
@ -286,41 +287,27 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||
robj *dstkey = receiver->bpop.target;
|
||||
int wherefrom = receiver->bpop.listpos.wherefrom;
|
||||
int whereto = receiver->bpop.listpos.whereto;
|
||||
robj *value = listTypePop(o, wherefrom);
|
||||
|
||||
if (value) {
|
||||
/* Protect receiver->bpop.target, that will be
|
||||
* freed by the next unblockClient()
|
||||
* call. */
|
||||
if (dstkey) incrRefCount(dstkey);
|
||||
/* Protect receiver->bpop.target, that will be
|
||||
* freed by the next unblockClient()
|
||||
* call. */
|
||||
if (dstkey) incrRefCount(dstkey);
|
||||
|
||||
monotime replyTimer;
|
||||
elapsedStart(&replyTimer);
|
||||
if (serveClientBlockedOnList(receiver,
|
||||
rl->key,dstkey,rl->db,value,
|
||||
wherefrom, whereto) == C_ERR)
|
||||
{
|
||||
/* If we failed serving the client we need
|
||||
* to also undo the POP operation. */
|
||||
listTypePush(o,value,wherefrom);
|
||||
}
|
||||
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||
unblockClient(receiver);
|
||||
monotime replyTimer;
|
||||
elapsedStart(&replyTimer);
|
||||
serveClientBlockedOnList(receiver, o,
|
||||
rl->key, dstkey, rl->db,
|
||||
wherefrom, whereto,
|
||||
&deleted);
|
||||
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||
unblockClient(receiver);
|
||||
|
||||
if (dstkey) decrRefCount(dstkey);
|
||||
decrRefCount(value);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
if (dstkey) decrRefCount(dstkey);
|
||||
|
||||
/* The list is empty and has been deleted. */
|
||||
if (deleted) break;
|
||||
}
|
||||
}
|
||||
|
||||
if (listTypeLength(o) == 0) {
|
||||
dbDelete(rl->db,rl->key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",rl->key,rl->db->id);
|
||||
}
|
||||
/* We don't call signalModifiedKey() as it was already called
|
||||
* when an element was pushed on the list. */
|
||||
}
|
||||
|
||||
/* Helper function for handleClientsBlockedOnKeys(). This function is called
|
||||
@ -627,12 +614,16 @@ void handleClientsBlockedOnKeys(void) {
|
||||
* for all the 'numkeys' keys as in the 'keys' argument. When we block for
|
||||
* stream keys, we also provide an array of streamID structures: clients will
|
||||
* be unblocked only when items with an ID greater or equal to the specified
|
||||
* one is appended to the stream. */
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {
|
||||
* one is appended to the stream.
|
||||
*
|
||||
* 'count' for those commands that support the optional count argument.
|
||||
* Otherwise the value is 0. */
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {
|
||||
dictEntry *de;
|
||||
list *l;
|
||||
int j;
|
||||
|
||||
c->bpop.count = count;
|
||||
c->bpop.timeout = timeout;
|
||||
c->bpop.target = target;
|
||||
|
||||
|
10
src/db.c
10
src/db.c
@ -1703,6 +1703,16 @@ int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *
|
||||
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
|
||||
}
|
||||
|
||||
int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
|
||||
UNUSED(cmd);
|
||||
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
|
||||
}
|
||||
|
||||
int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
|
||||
UNUSED(cmd);
|
||||
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
|
||||
}
|
||||
|
||||
/* Helper function to extract keys from the SORT command.
|
||||
*
|
||||
* SORT <sort-key> ... STORE <store-key> ...
|
||||
|
@ -5504,7 +5504,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
||||
"Blocking module command called from transaction");
|
||||
} else {
|
||||
if (keys) {
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL,NULL);
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,0,timeout,NULL,NULL,NULL);
|
||||
} else {
|
||||
blockClient(c,BLOCKED_MODULE);
|
||||
}
|
||||
|
@ -316,6 +316,10 @@ struct redisCommand redisCommandTable[] = {
|
||||
"write fast @list",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
|
||||
{"lmpop",lmpopCommand,-4,
|
||||
"write @list",
|
||||
0,lmpopGetKeys,0,0,0,0,0,0},
|
||||
|
||||
{"brpop",brpopCommand,-3,
|
||||
"write no-script @list @blocking",
|
||||
0,NULL,1,-2,1,0,0,0},
|
||||
@ -332,6 +336,10 @@ struct redisCommand redisCommandTable[] = {
|
||||
"write no-script @list @blocking",
|
||||
0,NULL,1,-2,1,0,0,0},
|
||||
|
||||
{"blmpop",blmpopCommand,-5,
|
||||
"write @list @blocking",
|
||||
0,blmpopGetKeys,0,0,0,0,0,0},
|
||||
|
||||
{"llen",llenCommand,2,
|
||||
"read-only fast @list",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
|
11
src/server.h
11
src/server.h
@ -793,6 +793,7 @@ typedef struct multiState {
|
||||
* The fields used depend on client->btype. */
|
||||
typedef struct blockingState {
|
||||
/* Generic fields. */
|
||||
long count; /* Elements to pop if count was specified (BLMPOP), 0 otherwise. */
|
||||
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
|
||||
* is > timeout then the operation timed out. */
|
||||
|
||||
@ -1910,6 +1911,7 @@ void addReplyPushLen(client *c, long length);
|
||||
void addReplyHelp(client *c, const char **help);
|
||||
void addReplySubcommandSyntaxError(client *c);
|
||||
void addReplyLoadedModules(client *c);
|
||||
void addListRangeReply(client *c, robj *o, long start, long end, int reverse);
|
||||
void copyClientOutputBuffer(client *dst, client *src);
|
||||
size_t sdsZmallocSize(sds s);
|
||||
size_t getStringObjectSdsUsedMemory(robj *o);
|
||||
@ -1991,9 +1993,10 @@ int listTypeEqual(listTypeEntry *entry, robj *o);
|
||||
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
|
||||
void listTypeConvert(robj *subject, int enc);
|
||||
robj *listTypeDup(robj *o);
|
||||
int listTypeDelRange(robj *o, long start, long stop);
|
||||
void unblockClientWaitingData(client *c);
|
||||
void popGenericCommand(client *c, int where);
|
||||
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count);
|
||||
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted);
|
||||
|
||||
/* MULTI/EXEC/WATCH... */
|
||||
void unwatchAllKeys(client *c);
|
||||
@ -2451,6 +2454,8 @@ int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysRes
|
||||
int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int lcsGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
|
||||
unsigned short crc16(const char *buf, int len);
|
||||
|
||||
@ -2487,7 +2492,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
|
||||
void disconnectAllBlockedClients(void);
|
||||
void handleClientsBlockedOnKeys(void);
|
||||
void signalKeyAsReady(redisDb *db, robj *key, int type);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids);
|
||||
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us);
|
||||
|
||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||
@ -2576,6 +2581,7 @@ void rpushxCommand(client *c);
|
||||
void linsertCommand(client *c);
|
||||
void lpopCommand(client *c);
|
||||
void rpopCommand(client *c);
|
||||
void lmpopCommand(client *c);
|
||||
void llenCommand(client *c);
|
||||
void lindexCommand(client *c);
|
||||
void lrangeCommand(client *c);
|
||||
@ -2652,6 +2658,7 @@ void execCommand(client *c);
|
||||
void discardCommand(client *c);
|
||||
void blpopCommand(client *c);
|
||||
void brpopCommand(client *c);
|
||||
void blmpopCommand(client *c);
|
||||
void brpoplpushCommand(client *c);
|
||||
void blmoveCommand(client *c);
|
||||
void appendCommand(client *c);
|
||||
|
304
src/t_list.c
304
src/t_list.c
@ -215,6 +215,15 @@ robj *listTypeDup(robj *o) {
|
||||
return lobj;
|
||||
}
|
||||
|
||||
/* Delete a range of elements from the list. */
|
||||
int listTypeDelRange(robj *subject, long start, long count) {
|
||||
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
|
||||
return quicklistDelRange(subject->ptr, start, count);
|
||||
} else {
|
||||
serverPanic("Unknown list encoding");
|
||||
}
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* List Commands
|
||||
*----------------------------------------------------------------------------*/
|
||||
@ -374,6 +383,35 @@ void lsetCommand(client *c) {
|
||||
}
|
||||
}
|
||||
|
||||
/* A helper function like addListRangeReply, more details see below.
|
||||
* The difference is that here we are returning nested arrays, like:
|
||||
* 1) keyname
|
||||
* 2) 1) element1
|
||||
* 2) element2
|
||||
*
|
||||
* And also actually pop out from the list by calling listElementsRemoved.
|
||||
* We maintain the server.dirty and notifications there.
|
||||
*
|
||||
* 'deleted' is an optional output argument to get an indication
|
||||
* if the key got deleted by this function. */
|
||||
void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long count, int *deleted) {
|
||||
long llen = listTypeLength(o);
|
||||
long rangelen = (count > llen) ? llen : count;
|
||||
long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
|
||||
long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;
|
||||
int reverse = (where == LIST_HEAD) ? 0 : 1;
|
||||
|
||||
/* We return key-name just once, and an array of elements */
|
||||
addReplyArrayLen(c, 2);
|
||||
addReplyBulk(c, key);
|
||||
addListRangeReply(c, o, rangestart, rangeend, reverse);
|
||||
|
||||
/* Pop these elements. */
|
||||
listTypeDelRange(o, rangestart, rangelen);
|
||||
/* Maintain the notifications and dirty. */
|
||||
listElementsRemoved(c, key, where, o, rangelen, deleted);
|
||||
}
|
||||
|
||||
/* A helper for replying with a list's range between the inclusive start and end
|
||||
* indexes as multi-bulk, with support for negative indexes. Note that start
|
||||
* must be less than end or an empty array is returned. When the reverse
|
||||
@ -419,14 +457,21 @@ void addListRangeReply(client *c, robj *o, long start, long end, int reverse) {
|
||||
}
|
||||
}
|
||||
|
||||
/* A housekeeping helper for list elements popping tasks. */
|
||||
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count) {
|
||||
/* A housekeeping helper for list elements popping tasks.
|
||||
*
|
||||
* 'deleted' is an optional output argument to get an indication
|
||||
* if the key got deleted by this function. */
|
||||
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted) {
|
||||
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
||||
|
||||
notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id);
|
||||
if (listTypeLength(o) == 0) {
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
|
||||
if (deleted) *deleted = 1;
|
||||
|
||||
dbDelete(c->db, key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
|
||||
} else {
|
||||
if (deleted) *deleted = 0;
|
||||
}
|
||||
signalModifiedKey(c, c->db, key);
|
||||
server.dirty += count;
|
||||
@ -466,7 +511,7 @@ void popGenericCommand(client *c, int where) {
|
||||
serverAssert(value != NULL);
|
||||
addReplyBulk(c,value);
|
||||
decrRefCount(value);
|
||||
listElementsRemoved(c,c->argv[1],where,o,1);
|
||||
listElementsRemoved(c,c->argv[1],where,o,1,NULL);
|
||||
} else {
|
||||
/* Pop a range of elements. An addition to the original POP command,
|
||||
* which replies with a multi-bulk. */
|
||||
@ -477,11 +522,52 @@ void popGenericCommand(client *c, int where) {
|
||||
int reverse = (where == LIST_HEAD) ? 0 : 1;
|
||||
|
||||
addListRangeReply(c,o,rangestart,rangeend,reverse);
|
||||
quicklistDelRange(o->ptr,rangestart,rangelen);
|
||||
listElementsRemoved(c,c->argv[1],where,o,rangelen);
|
||||
listTypeDelRange(o,rangestart,rangelen);
|
||||
listElementsRemoved(c,c->argv[1],where,o,rangelen,NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* Like popGenericCommand but work with multiple keys.
|
||||
* Take multiple keys and return multiple elements from just one key.
|
||||
*
|
||||
* 'numkeys' the number of keys.
|
||||
* 'count' is the number of elements requested to pop.
|
||||
*
|
||||
* Always reply with array. */
|
||||
void mpopGenericCommand(client *c, robj **keys, int numkeys, int where, long count) {
|
||||
int j;
|
||||
robj *o;
|
||||
robj *key;
|
||||
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
key = keys[j];
|
||||
o = lookupKeyWrite(c->db, key);
|
||||
|
||||
/* Non-existing key, move to next key. */
|
||||
if (o == NULL) continue;
|
||||
|
||||
if (checkType(c, o, OBJ_LIST)) return;
|
||||
|
||||
long llen = listTypeLength(o);
|
||||
/* Empty list, move to next key. */
|
||||
if (llen == 0) continue;
|
||||
|
||||
/* Pop a range of elements in a nested arrays way. */
|
||||
listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);
|
||||
|
||||
/* Replicate it as [LR]POP COUNT. */
|
||||
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
|
||||
rewriteClientCommandVector(c, 3,
|
||||
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
|
||||
key, count_obj);
|
||||
decrRefCount(count_obj);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Look like we are not able to pop up any elements. */
|
||||
addReplyNullArray(c);
|
||||
}
|
||||
|
||||
/* LPOP <key> [count] */
|
||||
void lpopCommand(client *c) {
|
||||
popGenericCommand(c,LIST_HEAD);
|
||||
@ -829,10 +915,11 @@ void rpoplpushCommand(client *c) {
|
||||
* is to serve a specific client (receiver) that is blocked on 'key'
|
||||
* in the context of the specified 'db', doing the following:
|
||||
*
|
||||
* 1) Provide the client with the 'value' element.
|
||||
* 1) Provide the client with the 'value' element or a range of elements.
|
||||
* We will do the pop in here and caller does not need to bother the return.
|
||||
* 2) If the dstkey is not NULL (we are serving a BLMOVE) also push the
|
||||
* 'value' element on the destination list (the "push" side of the command).
|
||||
* 3) Propagate the resulting BRPOP, BLPOP and additional xPUSH if any into
|
||||
* 3) Propagate the resulting BRPOP, BLPOP, BLMPOP and additional xPUSH if any into
|
||||
* the AOF and replication channel.
|
||||
*
|
||||
* The argument 'wherefrom' is LIST_TAIL or LIST_HEAD, and indicates if the
|
||||
@ -843,25 +930,45 @@ void rpoplpushCommand(client *c) {
|
||||
* 'value' element is to be pushed to the head or tail so that we can
|
||||
* propagate the command properly.
|
||||
*
|
||||
* The function returns C_OK if we are able to serve the client, otherwise
|
||||
* C_ERR is returned to signal the caller that the list POP operation
|
||||
* should be undone as the client was not served: This only happens for
|
||||
* BLMOVE that fails to push the value to the destination key as it is
|
||||
* of the wrong type. */
|
||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto)
|
||||
* 'deleted' is an optional output argument to get an indication
|
||||
* if the key got deleted by this function. */
|
||||
void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey, redisDb *db, int wherefrom, int whereto, int *deleted)
|
||||
{
|
||||
robj *argv[5];
|
||||
robj *value = NULL;
|
||||
|
||||
if (deleted) *deleted = 0;
|
||||
|
||||
if (dstkey == NULL) {
|
||||
/* Propagate the [LR]POP operation. */
|
||||
struct redisCommand *cmd = (wherefrom == LIST_HEAD) ?
|
||||
server.lpopCommand : server.rpopCommand;
|
||||
argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop :
|
||||
shared.rpop;
|
||||
argv[1] = key;
|
||||
propagate((wherefrom == LIST_HEAD) ?
|
||||
server.lpopCommand : server.rpopCommand,
|
||||
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
|
||||
if (receiver->lastcmd->proc == blmpopCommand) {
|
||||
/* Propagate the [LR]POP COUNT operation. */
|
||||
long count = receiver->bpop.count;
|
||||
serverAssert(count > 0);
|
||||
long llen = listTypeLength(o);
|
||||
serverAssert(llen > 0);
|
||||
|
||||
argv[2] = createStringObjectFromLongLong((count > llen) ? llen : count);
|
||||
propagate(cmd, db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
decrRefCount(argv[2]);
|
||||
|
||||
/* Pop a range of elements in a nested arrays way. */
|
||||
listPopRangeAndReplyWithKey(receiver, o, key, wherefrom, count, deleted);
|
||||
return;
|
||||
}
|
||||
|
||||
propagate(cmd, db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
|
||||
/* BRPOP/BLPOP */
|
||||
value = listTypePop(o, wherefrom);
|
||||
serverAssert(value != NULL);
|
||||
|
||||
addReplyArrayLen(receiver,2);
|
||||
addReplyBulk(receiver,key);
|
||||
addReplyBulk(receiver,value);
|
||||
@ -876,6 +983,9 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
||||
if (!(dstobj &&
|
||||
checkType(receiver,dstobj,OBJ_LIST)))
|
||||
{
|
||||
value = listTypePop(o, wherefrom);
|
||||
serverAssert(value != NULL);
|
||||
|
||||
lmoveHandlePush(receiver,dstkey,dstobj,value,whereto);
|
||||
/* Propagate the LMOVE/RPOPLPUSH operation. */
|
||||
int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand);
|
||||
@ -892,49 +1002,82 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
||||
/* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop",
|
||||
key,receiver->db->id);
|
||||
} else {
|
||||
/* BLMOVE failed because of wrong
|
||||
* destination type. */
|
||||
return C_ERR;
|
||||
}
|
||||
}
|
||||
return C_OK;
|
||||
|
||||
if (value) decrRefCount(value);
|
||||
|
||||
if (listTypeLength(o) == 0) {
|
||||
if (deleted) *deleted = 1;
|
||||
|
||||
dbDelete(receiver->db, key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, receiver->db->id);
|
||||
}
|
||||
/* We don't call signalModifiedKey() as it was already called
|
||||
* when an element was pushed on the list. */
|
||||
}
|
||||
|
||||
/* Blocking RPOP/LPOP */
|
||||
void blockingPopGenericCommand(client *c, int where) {
|
||||
/* Blocking RPOP/LPOP/LMPOP
|
||||
*
|
||||
* 'numkeys' is the number of keys.
|
||||
* 'timeout_idx' parameter position of block timeout.
|
||||
* 'where' LIST_HEAD for LEFT, LIST_TAIL for RIGHT.
|
||||
* 'count' is the number of elements requested to pop, or 0 for plain single pop.
|
||||
*
|
||||
* When count is 0, a reply of a single bulk-string will be used.
|
||||
* When count > 0, an array reply will be used. */
|
||||
void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, int timeout_idx, long count) {
|
||||
robj *o;
|
||||
robj *key;
|
||||
mstime_t timeout;
|
||||
int j;
|
||||
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[timeout_idx],&timeout,UNIT_SECONDS)
|
||||
!= C_OK) return;
|
||||
|
||||
for (j = 1; j < c->argc-1; j++) {
|
||||
o = lookupKeyWrite(c->db,c->argv[j]);
|
||||
if (o != NULL) {
|
||||
if (checkType(c,o,OBJ_LIST)) {
|
||||
return;
|
||||
} else {
|
||||
if (listTypeLength(o) != 0) {
|
||||
/* Non empty list, this is like a normal [LR]POP. */
|
||||
robj *value = listTypePop(o,where);
|
||||
serverAssert(value != NULL);
|
||||
/* Traverse all input keys, we take action only based on one key. */
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
key = keys[j];
|
||||
o = lookupKeyWrite(c->db, key);
|
||||
|
||||
addReplyArrayLen(c,2);
|
||||
addReplyBulk(c,c->argv[j]);
|
||||
addReplyBulk(c,value);
|
||||
decrRefCount(value);
|
||||
listElementsRemoved(c,c->argv[j],where,o,1);
|
||||
/* Non-existing key, move to next key. */
|
||||
if (o == NULL) continue;
|
||||
|
||||
/* Replicate it as an [LR]POP instead of B[LR]POP. */
|
||||
rewriteClientCommandVector(c,2,
|
||||
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
|
||||
c->argv[j]);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (checkType(c, o, OBJ_LIST)) return;
|
||||
|
||||
long llen = listTypeLength(o);
|
||||
/* Empty list, move to next key. */
|
||||
if (llen == 0) continue;
|
||||
|
||||
if (count != 0) {
|
||||
/* BLMPOP, non empty list, like a normal [LR]POP with count option.
|
||||
* The difference here we pop a range of elements in a nested arrays way. */
|
||||
listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);
|
||||
|
||||
/* Replicate it as [LR]POP COUNT. */
|
||||
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
|
||||
rewriteClientCommandVector(c, 3,
|
||||
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
|
||||
key, count_obj);
|
||||
decrRefCount(count_obj);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Non empty list, this is like a normal [LR]POP. */
|
||||
robj *value = listTypePop(o,where);
|
||||
serverAssert(value != NULL);
|
||||
|
||||
addReplyArrayLen(c,2);
|
||||
addReplyBulk(c,key);
|
||||
addReplyBulk(c,value);
|
||||
decrRefCount(value);
|
||||
listElementsRemoved(c,key,where,o,1,NULL);
|
||||
|
||||
/* Replicate it as an [LR]POP instead of B[LR]POP. */
|
||||
rewriteClientCommandVector(c,2,
|
||||
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
|
||||
key);
|
||||
return;
|
||||
}
|
||||
|
||||
/* If we are not allowed to block the client, the only thing
|
||||
@ -946,17 +1089,17 @@ void blockingPopGenericCommand(client *c, int where) {
|
||||
|
||||
/* If the keys do not exist we must block */
|
||||
struct listPos pos = {where};
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL);
|
||||
blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL);
|
||||
}
|
||||
|
||||
/* BLPOP <key> [<key> ...] <timeout> */
|
||||
void blpopCommand(client *c) {
|
||||
blockingPopGenericCommand(c,LIST_HEAD);
|
||||
blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_HEAD,c->argc-1,0);
|
||||
}
|
||||
|
||||
/* BRPOP <key> [<key> ...] <timeout> */
|
||||
void brpopCommand(client *c) {
|
||||
blockingPopGenericCommand(c,LIST_TAIL);
|
||||
blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_TAIL,c->argc-1,0);
|
||||
}
|
||||
|
||||
void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeout) {
|
||||
@ -971,7 +1114,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
|
||||
} else {
|
||||
/* The list is empty and the client blocks. */
|
||||
struct listPos pos = {wherefrom, whereto};
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],&pos,NULL);
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,0,timeout,c->argv[2],&pos,NULL);
|
||||
}
|
||||
} else {
|
||||
/* The list exists and has elements, so
|
||||
@ -1001,3 +1144,62 @@ void brpoplpushCommand(client *c) {
|
||||
!= C_OK) return;
|
||||
blmoveGenericCommand(c, LIST_TAIL, LIST_HEAD, timeout);
|
||||
}
|
||||
|
||||
/* LMPOP/BLMPOP
|
||||
*
|
||||
* 'numkeys_idx' parameter position of key number.
|
||||
* 'is_block' this indicates whether it is a blocking variant. */
|
||||
void lmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
|
||||
long j;
|
||||
long numkeys = 0; /* Number of keys. */
|
||||
int where = 0; /* HEAD for LEFT, TAIL for RIGHT. */
|
||||
long count = 1; /* Reply will consist of up to count elements, depending on the list's length. */
|
||||
|
||||
/* Parse the numkeys. */
|
||||
if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX,
|
||||
&numkeys, "numkeys should be greater than 0") != C_OK)
|
||||
return;
|
||||
|
||||
/* Parse the where. where_idx: the index of where in the c->argv. */
|
||||
long where_idx = numkeys_idx + numkeys + 1;
|
||||
if (where_idx >= c->argc) {
|
||||
addReplyErrorObject(c, shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
if (getListPositionFromObjectOrReply(c, c->argv[where_idx], &where) != C_OK)
|
||||
return;
|
||||
|
||||
/* Parse the optional arguments. */
|
||||
for (j = where_idx + 1; j < c->argc; j++) {
|
||||
char *opt = c->argv[j]->ptr;
|
||||
int moreargs = (c->argc - 1) - j;
|
||||
|
||||
if (!strcasecmp(opt, "COUNT") && moreargs) {
|
||||
j++;
|
||||
if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX,
|
||||
&count,"count should be greater than 0") != C_OK)
|
||||
return;
|
||||
} else {
|
||||
addReplyErrorObject(c, shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (is_block) {
|
||||
/* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingPopGenericCommand. */
|
||||
blockingPopGenericCommand(c, c->argv+numkeys_idx+1, numkeys, where, 1, count);
|
||||
} else {
|
||||
/* NON-BLOCK */
|
||||
mpopGenericCommand(c, c->argv+numkeys_idx+1, numkeys, where, count);
|
||||
}
|
||||
}
|
||||
|
||||
/* LMPOP numkeys [<key> ...] LEFT|RIGHT [COUNT count] */
|
||||
void lmpopCommand(client *c) {
|
||||
lmpopGenericCommand(c, 1, 0);
|
||||
}
|
||||
|
||||
/* BLMPOP timeout numkeys [<key> ...] LEFT|RIGHT [COUNT count] */
|
||||
void blmpopCommand(client *c) {
|
||||
lmpopGenericCommand(c, 2, 1);
|
||||
}
|
||||
|
@ -2152,7 +2152,7 @@ void xreadCommand(client *c) {
|
||||
goto cleanup;
|
||||
}
|
||||
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
|
||||
timeout, NULL, NULL, ids);
|
||||
0, timeout, NULL, NULL, ids);
|
||||
/* If no COUNT is given and we block, set a relatively small count:
|
||||
* in case the ID provided is too low, we do not want the server to
|
||||
* block just to serve this client a huge stream of messages. */
|
||||
|
@ -3967,7 +3967,7 @@ void blockingGenericZpopCommand(client *c, int where) {
|
||||
}
|
||||
|
||||
/* If the keys do not exist we must block */
|
||||
blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL,NULL);
|
||||
blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,0,timeout,NULL,NULL,NULL);
|
||||
}
|
||||
|
||||
// BZPOPMIN key [key ...] timeout
|
||||
|
@ -320,4 +320,52 @@ tags {"aof external:skip"} {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Test that LMPOP/BLMPOP work fine with AOF.
|
||||
create_aof {
|
||||
append_to_aof [formatCommand lpush mylist a b c]
|
||||
append_to_aof [formatCommand rpush mylist2 1 2 3]
|
||||
append_to_aof [formatCommand lpush mylist3 a b c d e]
|
||||
}
|
||||
|
||||
start_server_aof [list dir $server_path aof-load-truncated no] {
|
||||
test "AOF+LMPOP/BLMPOP: pop elements from the list" {
|
||||
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
set client2 [redis [dict get $srv host] [dict get $srv port] 1 $::tls]
|
||||
wait_done_loading $client
|
||||
wait_done_loading $client2
|
||||
|
||||
# Pop all elements from mylist, should be blmpop delete mylist.
|
||||
$client lmpop 1 mylist left count 1
|
||||
$client blmpop 0 1 mylist left count 10
|
||||
|
||||
# Pop all elements from mylist2, should be lmpop delete mylist2.
|
||||
$client blmpop 0 2 mylist mylist2 right count 10
|
||||
$client lmpop 2 mylist mylist2 right count 2
|
||||
|
||||
# Blocking path, be blocked and then released.
|
||||
$client2 blmpop 0 2 mylist mylist2 left count 2
|
||||
after 100
|
||||
$client lpush mylist2 a b c
|
||||
|
||||
# Pop up the last element in mylist2
|
||||
$client blmpop 0 3 mylist mylist2 mylist3 left count 1
|
||||
|
||||
# Leave two elements in mylist3.
|
||||
$client blmpop 0 3 mylist mylist2 mylist3 right count 3
|
||||
}
|
||||
}
|
||||
|
||||
start_server_aof [list dir $server_path aof-load-truncated no] {
|
||||
test "AOF+LMPOP/BLMPOP: after pop elements from the list" {
|
||||
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
wait_done_loading $client
|
||||
|
||||
# mylist and mylist2 no longer exist.
|
||||
assert_equal 0 [$client exists mylist mylist2]
|
||||
|
||||
# Length of mylist3 is two.
|
||||
assert_equal 2 [$client llen mylist3]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,28 @@ start_server {
|
||||
} {
|
||||
source "tests/unit/type/list-common.tcl"
|
||||
|
||||
# A helper function for BPOP/BLMPOP with one input key.
|
||||
proc bpop_command {rd pop key timeout} {
|
||||
if {$pop == "BLMPOP_LEFT"} {
|
||||
$rd blmpop $timeout 1 $key left count 1
|
||||
} elseif {$pop == "BLMPOP_RIGHT"} {
|
||||
$rd blmpop $timeout 1 $key right count 1
|
||||
} else {
|
||||
$rd $pop $key $timeout
|
||||
}
|
||||
}
|
||||
|
||||
# A helper function for BPOP/BLMPOP with two input keys.
|
||||
proc bpop_command_two_key {rd pop key key2 timeout} {
|
||||
if {$pop == "BLMPOP_LEFT"} {
|
||||
$rd blmpop $timeout 2 $key $key2 left count 1
|
||||
} elseif {$pop == "BLMPOP_RIGHT"} {
|
||||
$rd blmpop $timeout 2 $key $key2 right count 1
|
||||
} else {
|
||||
$rd $pop $key $key2 $timeout
|
||||
}
|
||||
}
|
||||
|
||||
test {LPOS basic usage} {
|
||||
r DEL mylist
|
||||
r RPUSH mylist a b c 1 2 3 c c
|
||||
@ -111,10 +133,6 @@ start_server {
|
||||
assert_equal $largevalue(linkedlist) [r rpop mylist2]
|
||||
assert_equal c [r lpop mylist2]
|
||||
}
|
||||
|
||||
test {R/LPOP against empty list} {
|
||||
r lpop non-existing-list
|
||||
} {}
|
||||
|
||||
test {R/LPOP with the optional count argument} {
|
||||
assert_equal 7 [r lpush listcount aa bb cc dd ee ff gg]
|
||||
@ -147,53 +165,77 @@ start_server {
|
||||
}
|
||||
|
||||
foreach {type large} [array get largevalue] {
|
||||
test "BLPOP, BRPOP: single existing list - $type" {
|
||||
foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
test "$pop: single existing list - $type" {
|
||||
set rd [redis_deferring_client]
|
||||
create_list blist "a b $large c d"
|
||||
|
||||
$rd blpop blist 1
|
||||
bpop_command $rd $pop blist 1
|
||||
assert_equal {blist a} [$rd read]
|
||||
$rd brpop blist 1
|
||||
if {$pop == "BLPOP"} {
|
||||
bpop_command $rd BRPOP blist 1
|
||||
} else {
|
||||
bpop_command $rd BLMPOP_RIGHT blist 1
|
||||
}
|
||||
assert_equal {blist d} [$rd read]
|
||||
|
||||
$rd blpop blist 1
|
||||
bpop_command $rd $pop blist 1
|
||||
assert_equal {blist b} [$rd read]
|
||||
$rd brpop blist 1
|
||||
if {$pop == "BLPOP"} {
|
||||
bpop_command $rd BRPOP blist 1
|
||||
} else {
|
||||
bpop_command $rd BLMPOP_RIGHT blist 1
|
||||
}
|
||||
assert_equal {blist c} [$rd read]
|
||||
|
||||
assert_equal 1 [r llen blist]
|
||||
}
|
||||
|
||||
test "BLPOP, BRPOP: multiple existing lists - $type" {
|
||||
test "$pop: multiple existing lists - $type" {
|
||||
set rd [redis_deferring_client]
|
||||
create_list blist1{t} "a $large c"
|
||||
create_list blist2{t} "d $large f"
|
||||
|
||||
$rd blpop blist1{t} blist2{t} 1
|
||||
bpop_command_two_key $rd $pop blist1{t} blist2{t} 1
|
||||
assert_equal {blist1{t} a} [$rd read]
|
||||
$rd brpop blist1{t} blist2{t} 1
|
||||
if {$pop == "BLPOP"} {
|
||||
bpop_command_two_key $rd BRPOP blist1{t} blist2{t} 1
|
||||
} else {
|
||||
bpop_command_two_key $rd BLMPOP_RIGHT blist1{t} blist2{t} 1
|
||||
}
|
||||
assert_equal {blist1{t} c} [$rd read]
|
||||
assert_equal 1 [r llen blist1{t}]
|
||||
assert_equal 3 [r llen blist2{t}]
|
||||
|
||||
$rd blpop blist2{t} blist1{t} 1
|
||||
bpop_command_two_key $rd $pop blist2{t} blist1{t} 1
|
||||
assert_equal {blist2{t} d} [$rd read]
|
||||
$rd brpop blist2{t} blist1{t} 1
|
||||
if {$pop == "BLPOP"} {
|
||||
bpop_command_two_key $rd BRPOP blist2{t} blist1{t} 1
|
||||
} else {
|
||||
bpop_command_two_key $rd BLMPOP_RIGHT blist2{t} blist1{t} 1
|
||||
}
|
||||
assert_equal {blist2{t} f} [$rd read]
|
||||
assert_equal 1 [r llen blist1{t}]
|
||||
assert_equal 1 [r llen blist2{t}]
|
||||
}
|
||||
|
||||
test "BLPOP, BRPOP: second list has an entry - $type" {
|
||||
test "$pop: second list has an entry - $type" {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist1{t}
|
||||
create_list blist2{t} "d $large f"
|
||||
|
||||
$rd blpop blist1{t} blist2{t} 1
|
||||
bpop_command_two_key $rd $pop blist1{t} blist2{t} 1
|
||||
assert_equal {blist2{t} d} [$rd read]
|
||||
$rd brpop blist1{t} blist2{t} 1
|
||||
if {$pop == "BLPOP"} {
|
||||
bpop_command_two_key $rd BRPOP blist1{t} blist2{t} 1
|
||||
} else {
|
||||
bpop_command_two_key $rd BLMPOP_RIGHT blist1{t} blist2{t} 1
|
||||
}
|
||||
assert_equal {blist2{t} f} [$rd read]
|
||||
assert_equal 0 [r llen blist1{t}]
|
||||
assert_equal 1 [r llen blist2{t}]
|
||||
}
|
||||
}
|
||||
|
||||
test "BRPOPLPUSH - $type" {
|
||||
r del target{t}
|
||||
@ -239,26 +281,31 @@ start_server {
|
||||
}
|
||||
}
|
||||
|
||||
test "BLPOP, LPUSH + DEL should not awake blocked client" {
|
||||
foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
test "$pop, LPUSH + DEL should not awake blocked client" {
|
||||
set rd [redis_deferring_client]
|
||||
r del list
|
||||
|
||||
$rd blpop list 0
|
||||
bpop_command $rd $pop list 0
|
||||
after 100 ;# Make sure rd is blocked before MULTI
|
||||
wait_for_blocked_client
|
||||
|
||||
r multi
|
||||
r lpush list a
|
||||
r del list
|
||||
r exec
|
||||
r del list
|
||||
r lpush list b
|
||||
$rd read
|
||||
} {list b}
|
||||
assert_equal {list b} [$rd read]
|
||||
}
|
||||
|
||||
test "BLPOP, LPUSH + DEL + SET should not awake blocked client" {
|
||||
test "$pop, LPUSH + DEL + SET should not awake blocked client" {
|
||||
set rd [redis_deferring_client]
|
||||
r del list
|
||||
|
||||
$rd blpop list 0
|
||||
bpop_command $rd $pop list 0
|
||||
after 100 ;# Make sure rd is blocked before MULTI
|
||||
wait_for_blocked_client
|
||||
|
||||
r multi
|
||||
r lpush list a
|
||||
@ -267,8 +314,9 @@ start_server {
|
||||
r exec
|
||||
r del list
|
||||
r lpush list b
|
||||
$rd read
|
||||
} {list b}
|
||||
assert_equal {list b} [$rd read]
|
||||
}
|
||||
}
|
||||
|
||||
test "BLPOP with same key multiple times should work (issue #801)" {
|
||||
set rd [redis_deferring_client]
|
||||
@ -291,29 +339,36 @@ start_server {
|
||||
assert_equal [$rd read] {list2{t} b}
|
||||
}
|
||||
|
||||
test "MULTI/EXEC is isolated from the point of view of BLPOP" {
|
||||
foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
test "MULTI/EXEC is isolated from the point of view of $pop" {
|
||||
set rd [redis_deferring_client]
|
||||
r del list
|
||||
$rd blpop list 0
|
||||
|
||||
bpop_command $rd $pop list 0
|
||||
after 100 ;# Make sure rd is blocked before MULTI
|
||||
wait_for_blocked_client
|
||||
|
||||
r multi
|
||||
r lpush list a
|
||||
r lpush list b
|
||||
r lpush list c
|
||||
r exec
|
||||
$rd read
|
||||
} {list c}
|
||||
assert_equal {list c} [$rd read]
|
||||
}
|
||||
|
||||
test "BLPOP with variadic LPUSH" {
|
||||
test "$pop with variadic LPUSH" {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist
|
||||
if {$::valgrind} {after 100}
|
||||
$rd blpop blist 0
|
||||
bpop_command $rd $pop blist 0
|
||||
if {$::valgrind} {after 100}
|
||||
wait_for_blocked_client
|
||||
assert_equal 2 [r lpush blist foo bar]
|
||||
if {$::valgrind} {after 100}
|
||||
assert_equal {blist bar} [$rd read]
|
||||
assert_equal foo [lindex [r lrange blist 0 -1] 0]
|
||||
}
|
||||
}
|
||||
|
||||
test "BRPOPLPUSH with zero timeout should block indefinitely" {
|
||||
set rd [redis_deferring_client]
|
||||
@ -412,6 +467,32 @@ start_server {
|
||||
assert_equal {foo} [r lrange target2{t} 0 -1]
|
||||
}
|
||||
|
||||
test "BLMPOP with multiple blocked clients" {
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
set rd3 [redis_deferring_client]
|
||||
set rd4 [redis_deferring_client]
|
||||
r del blist{t} blist2{t}
|
||||
|
||||
$rd1 blmpop 0 2 blist{t} blist2{t} left count 1
|
||||
$rd2 blmpop 0 2 blist{t} blist2{t} right count 10
|
||||
$rd3 blmpop 0 2 blist{t} blist2{t} left count 10
|
||||
$rd4 blmpop 0 2 blist{t} blist2{t} right count 1
|
||||
wait_for_blocked_clients_count 4
|
||||
|
||||
r multi
|
||||
r lpush blist{t} a b c d e
|
||||
r lpush blist2{t} 1 2 3 4 5
|
||||
r exec
|
||||
|
||||
assert_equal {blist{t} e} [$rd1 read]
|
||||
assert_equal {blist{t} {a b c d}} [$rd2 read]
|
||||
assert_equal {blist2{t} {5 4 3 2 1}} [$rd3 read]
|
||||
|
||||
r lpush blist2{t} 1 2 3
|
||||
assert_equal {blist2{t} 1} [$rd4 read]
|
||||
}
|
||||
|
||||
test "Linked LMOVEs" {
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
@ -420,6 +501,7 @@ start_server {
|
||||
|
||||
$rd1 blmove list1{t} list2{t} right left 0
|
||||
$rd2 blmove list2{t} list3{t} left right 0
|
||||
wait_for_blocked_clients_count 2
|
||||
|
||||
r rpush list1{t} foo
|
||||
|
||||
@ -436,6 +518,7 @@ start_server {
|
||||
|
||||
$rd1 brpoplpush list1{t} list2{t} 0
|
||||
$rd2 brpoplpush list2{t} list1{t} 0
|
||||
wait_for_blocked_clients_count 2
|
||||
|
||||
r rpush list1{t} foo
|
||||
|
||||
@ -449,6 +532,7 @@ start_server {
|
||||
r del blist{t}
|
||||
|
||||
$rd brpoplpush blist{t} blist{t} 0
|
||||
wait_for_blocked_client
|
||||
|
||||
r rpush blist{t} foo
|
||||
|
||||
@ -475,6 +559,7 @@ start_server {
|
||||
r del srclist{t} dstlist{t} somekey{t}
|
||||
r set somekey{t} somevalue
|
||||
$blocked_client brpoplpush srclist{t} dstlist{t} 0
|
||||
wait_for_blocked_client
|
||||
$watching_client watch dstlist{t}
|
||||
$watching_client read
|
||||
$watching_client multi
|
||||
@ -492,6 +577,7 @@ start_server {
|
||||
r del srclist{t} dstlist{t} somekey{t}
|
||||
r set somekey{t} somevalue
|
||||
$blocked_client brpoplpush srclist{t} dstlist{t} 0
|
||||
wait_for_blocked_client
|
||||
$watching_client watch dstlist{t}
|
||||
$watching_client read
|
||||
$watching_client multi
|
||||
@ -513,16 +599,19 @@ start_server {
|
||||
$rd read
|
||||
} {}
|
||||
|
||||
test "BLPOP when new key is moved into place" {
|
||||
foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
test "$pop when new key is moved into place" {
|
||||
set rd [redis_deferring_client]
|
||||
r del foo{t}
|
||||
|
||||
$rd blpop foo{t} 5
|
||||
bpop_command $rd $pop foo{t} 0
|
||||
wait_for_blocked_client
|
||||
r lpush bob{t} abc def hij
|
||||
r rename bob{t} foo{t}
|
||||
$rd read
|
||||
} {foo{t} hij}
|
||||
|
||||
test "BLPOP when result key is created by SORT..STORE" {
|
||||
test "$pop when result key is created by SORT..STORE" {
|
||||
set rd [redis_deferring_client]
|
||||
|
||||
# zero out list from previous test without explicit delete
|
||||
@ -530,17 +619,20 @@ start_server {
|
||||
r lpop foo{t}
|
||||
r lpop foo{t}
|
||||
|
||||
$rd blpop foo{t} 5
|
||||
bpop_command $rd $pop foo{t} 5
|
||||
wait_for_blocked_client
|
||||
r lpush notfoo{t} hello hola aguacate konichiwa zanzibar
|
||||
r sort notfoo{t} ALPHA store foo{t}
|
||||
$rd read
|
||||
} {foo{t} aguacate}
|
||||
}
|
||||
|
||||
foreach {pop} {BLPOP BRPOP} {
|
||||
foreach {pop} {BLPOP BRPOP BLMPOP_LEFT BLMPOP_RIGHT} {
|
||||
test "$pop: with single empty list argument" {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist1
|
||||
$rd $pop blist1 1
|
||||
bpop_command $rd $pop blist1 1
|
||||
wait_for_blocked_client
|
||||
r rpush blist1 foo
|
||||
assert_equal {blist1 foo} [$rd read]
|
||||
assert_equal 0 [r exists blist1]
|
||||
@ -548,14 +640,14 @@ start_server {
|
||||
|
||||
test "$pop: with negative timeout" {
|
||||
set rd [redis_deferring_client]
|
||||
$rd $pop blist1 -1
|
||||
bpop_command $rd $pop blist1 -1
|
||||
assert_error "ERR*is negative*" {$rd read}
|
||||
}
|
||||
|
||||
test "$pop: with non-integer timeout" {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist1
|
||||
$rd $pop blist1 0.1
|
||||
bpop_command $rd $pop blist1 0.1
|
||||
r rpush blist1 foo
|
||||
assert_equal {blist1 foo} [$rd read]
|
||||
assert_equal 0 [r exists blist1]
|
||||
@ -565,7 +657,8 @@ start_server {
|
||||
# To test this, use a timeout of 0 and wait a second.
|
||||
# The blocking pop should still be waiting for a push.
|
||||
set rd [redis_deferring_client]
|
||||
$rd $pop blist1 0
|
||||
bpop_command $rd $pop blist1 0
|
||||
wait_for_blocked_client
|
||||
after 1000
|
||||
r rpush blist1 foo
|
||||
assert_equal {blist1 foo} [$rd read]
|
||||
@ -575,6 +668,7 @@ start_server {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist1{t} blist2{t}
|
||||
r set blist2{t} nolist{t}
|
||||
bpop_command_two_key $rd $pop blist1{t} blist2{t} 1
|
||||
$rd $pop blist1{t} blist2{t} 1
|
||||
assert_error "WRONGTYPE*" {$rd read}
|
||||
}
|
||||
@ -582,7 +676,8 @@ start_server {
|
||||
test "$pop: timeout" {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist1{t} blist2{t}
|
||||
$rd $pop blist1{t} blist2{t} 1
|
||||
bpop_command_two_key $rd $pop blist1{t} blist2{t} 1
|
||||
wait_for_blocked_client
|
||||
assert_equal {} [$rd read]
|
||||
}
|
||||
|
||||
@ -590,13 +685,15 @@ start_server {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist1{t} blist2{t}
|
||||
|
||||
$rd $pop blist1{t} blist2{t} 1
|
||||
bpop_command_two_key $rd $pop blist1{t} blist2{t} 1
|
||||
wait_for_blocked_client
|
||||
r rpush blist1{t} foo
|
||||
assert_equal {blist1{t} foo} [$rd read]
|
||||
assert_equal 0 [r exists blist1{t}]
|
||||
assert_equal 0 [r exists blist2{t}]
|
||||
|
||||
$rd $pop blist1{t} blist2{t} 1
|
||||
bpop_command_two_key $rd $pop blist1{t} blist2{t} 1
|
||||
wait_for_blocked_client
|
||||
r rpush blist2{t} foo
|
||||
assert_equal {blist2{t} foo} [$rd read]
|
||||
assert_equal 0 [r exists blist1{t}]
|
||||
@ -604,16 +701,57 @@ start_server {
|
||||
}
|
||||
}
|
||||
|
||||
test {BLPOP inside a transaction} {
|
||||
foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
test "$pop inside a transaction" {
|
||||
r del xlist
|
||||
r lpush xlist foo
|
||||
r lpush xlist bar
|
||||
r multi
|
||||
r blpop xlist 0
|
||||
r blpop xlist 0
|
||||
r blpop xlist 0
|
||||
|
||||
bpop_command r $pop xlist 0
|
||||
bpop_command r $pop xlist 0
|
||||
bpop_command r $pop xlist 0
|
||||
r exec
|
||||
} {{xlist bar} {xlist foo} {}}
|
||||
}
|
||||
|
||||
test {BLMPOP propagate as pop with count command to replica} {
|
||||
set rd [redis_deferring_client]
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# BLMPOP without block.
|
||||
r lpush mylist{t} a b c
|
||||
r rpush mylist2{t} 1 2 3
|
||||
r blmpop 0 1 mylist{t} left count 1
|
||||
r blmpop 0 2 mylist{t} mylist2{t} right count 10
|
||||
r blmpop 0 2 mylist{t} mylist2{t} right count 10
|
||||
|
||||
# BLMPOP with block.
|
||||
$rd blmpop 0 1 mylist{t} left count 1
|
||||
wait_for_blocked_client
|
||||
r lpush mylist{t} a
|
||||
$rd blmpop 0 2 mylist{t} mylist2{t} left count 5
|
||||
wait_for_blocked_client
|
||||
r lpush mylist{t} a b c
|
||||
$rd blmpop 0 2 mylist{t} mylist2{t} right count 10
|
||||
wait_for_blocked_client
|
||||
r rpush mylist2{t} a b c
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{lpush mylist{t} a b c}
|
||||
{rpush mylist2{t} 1 2 3}
|
||||
{lpop mylist{t} 1}
|
||||
{rpop mylist{t} 2}
|
||||
{rpop mylist2{t} 3}
|
||||
{lpush mylist{t} a}
|
||||
{lpop mylist{t} 1}
|
||||
{lpush mylist{t} a b c}
|
||||
{lpop mylist{t} 3}
|
||||
{rpush mylist2{t} a b c}
|
||||
{rpop mylist2{t} 3}
|
||||
}
|
||||
} {} {needs:repl}
|
||||
|
||||
test {LPUSHX, RPUSHX - generic} {
|
||||
r del xlist
|
||||
@ -860,23 +998,46 @@ start_server {
|
||||
} {}
|
||||
|
||||
foreach {type large} [array get largevalue] {
|
||||
test "Basic LPOP/RPOP - $type" {
|
||||
test "Basic LPOP/RPOP/LMPOP - $type" {
|
||||
create_list mylist "$large 1 2"
|
||||
assert_equal $large [r lpop mylist]
|
||||
assert_equal 2 [r rpop mylist]
|
||||
assert_equal 1 [r lpop mylist]
|
||||
assert_equal 0 [r llen mylist]
|
||||
|
||||
# pop on empty list
|
||||
assert_equal {} [r lpop mylist]
|
||||
assert_equal {} [r rpop mylist]
|
||||
create_list mylist "$large 1 2"
|
||||
assert_equal "mylist $large" [r lmpop 1 mylist left count 1]
|
||||
assert_equal {mylist {2 1}} [r lmpop 2 mylist mylist right count 2]
|
||||
}
|
||||
}
|
||||
|
||||
test {LPOP/RPOP against non list value} {
|
||||
r set notalist foo
|
||||
assert_error WRONGTYPE* {r lpop notalist}
|
||||
assert_error WRONGTYPE* {r rpop notalist}
|
||||
test {LPOP/RPOP/LMPOP against empty list} {
|
||||
r del non-existing-list{t} non-existing-list2{t}
|
||||
|
||||
assert_equal {} [r lpop non-existing-list{t}]
|
||||
assert_equal {} [r rpop non-existing-list2{t}]
|
||||
|
||||
assert_equal {} [r lmpop 1 non-existing-list{t} left count 1]
|
||||
assert_equal {} [r lmpop 1 non-existing-list{t} left count 10]
|
||||
assert_equal {} [r lmpop 2 non-existing-list{t} non-existing-list2{t} right count 1]
|
||||
assert_equal {} [r lmpop 2 non-existing-list{t} non-existing-list2{t} right count 10]
|
||||
}
|
||||
|
||||
test {LPOP/RPOP/LMPOP NON-BLOCK or BLOCK against non list value} {
|
||||
r set notalist{t} foo
|
||||
assert_error WRONGTYPE* {r lpop notalist{t}}
|
||||
assert_error WRONGTYPE* {r blpop notalist{t} 0}
|
||||
assert_error WRONGTYPE* {r rpop notalist{t}}
|
||||
assert_error WRONGTYPE* {r brpop notalist{t} 0}
|
||||
|
||||
r del notalist2{t}
|
||||
assert_error "WRONGTYPE*" {r lmpop 2 notalist{t} notalist2{t} left count 1}
|
||||
assert_error "WRONGTYPE*" {r blmpop 0 2 notalist{t} notalist2{t} left count 1}
|
||||
|
||||
r del notalist{t}
|
||||
r set notalist2{t} nolist
|
||||
assert_error "WRONGTYPE*" {r lmpop 2 notalist{t} notalist2{t} right count 10}
|
||||
assert_error "WRONGTYPE*" {r blmpop 0 2 notalist{t} notalist2{t} left count 1}
|
||||
}
|
||||
|
||||
foreach {type num} {quicklist 250 quicklist 500} {
|
||||
@ -897,6 +1058,121 @@ start_server {
|
||||
}
|
||||
}
|
||||
|
||||
test {LMPOP with illegal argument} {
|
||||
assert_error "ERR wrong number of arguments*" {r lmpop}
|
||||
assert_error "ERR wrong number of arguments*" {r lmpop 1}
|
||||
assert_error "ERR wrong number of arguments*" {r lmpop 1 mylist{t}}
|
||||
|
||||
assert_error "ERR numkeys*" {r lmpop 0 mylist{t} LEFT}
|
||||
assert_error "ERR numkeys*" {r lmpop a mylist{t} LEFT}
|
||||
assert_error "ERR numkeys*" {r lmpop -1 mylist{t} RIGHT}
|
||||
|
||||
assert_error "ERR syntax error*" {r lmpop 1 mylist{t} bad_where}
|
||||
assert_error "ERR syntax error*" {r lmpop 1 mylist{t} LEFT bar_arg}
|
||||
assert_error "ERR syntax error*" {r lmpop 1 mylist{t} RIGHT LEFT}
|
||||
assert_error "ERR syntax error*" {r lmpop 1 mylist{t} COUNT}
|
||||
assert_error "ERR syntax error*" {r lmpop 2 mylist{t} mylist2{t} bad_arg}
|
||||
|
||||
assert_error "ERR count*" {r lmpop 1 mylist{t} LEFT COUNT 0}
|
||||
assert_error "ERR count*" {r lmpop 1 mylist{t} RIGHT COUNT a}
|
||||
assert_error "ERR count*" {r lmpop 1 mylist{t} LEFT COUNT -1}
|
||||
assert_error "ERR count*" {r lmpop 2 mylist{t} mylist2{t} RIGHT COUNT -1}
|
||||
}
|
||||
|
||||
test {LMPOP single existing list} {
|
||||
# Same key multiple times.
|
||||
create_list mylist{t} "a b c d e f"
|
||||
assert_equal {mylist{t} {a b}} [r lmpop 2 mylist{t} mylist{t} left count 2]
|
||||
assert_equal {mylist{t} {f e}} [r lmpop 2 mylist{t} mylist{t} right count 2]
|
||||
assert_equal 2 [r llen mylist{t}]
|
||||
|
||||
# First one exists, second one does not exist.
|
||||
create_list mylist{t} "a b c d e"
|
||||
r del mylist2{t}
|
||||
assert_equal {mylist{t} a} [r lmpop 2 mylist{t} mylist2{t} left count 1]
|
||||
assert_equal 4 [r llen mylist{t}]
|
||||
assert_equal {mylist{t} {e d c b}} [r lmpop 2 mylist{t} mylist2{t} right count 10]
|
||||
assert_equal {} [r lmpop 2 mylist{t} mylist2{t} right count 1]
|
||||
|
||||
# First one does not exist, second one exists.
|
||||
r del mylist{t}
|
||||
create_list mylist2{t} "1 2 3 4 5"
|
||||
assert_equal {mylist2{t} 5} [r lmpop 2 mylist{t} mylist2{t} right count 1]
|
||||
assert_equal 4 [r llen mylist2{t}]
|
||||
assert_equal {mylist2{t} {1 2 3 4}} [r lmpop 2 mylist{t} mylist2{t} left count 10]
|
||||
|
||||
assert_equal 0 [r exists mylist{t} mylist2{t}]
|
||||
}
|
||||
|
||||
test {LMPOP multiple existing lists} {
|
||||
create_list mylist{t} "a b c d e"
|
||||
create_list mylist2{t} "1 2 3 4 5"
|
||||
|
||||
# Pop up from the first key.
|
||||
assert_equal {mylist{t} {a b}} [r lmpop 2 mylist{t} mylist2{t} left count 2]
|
||||
assert_equal 3 [r llen mylist{t}]
|
||||
assert_equal {mylist{t} {e d c}} [r lmpop 2 mylist{t} mylist2{t} right count 3]
|
||||
assert_equal 0 [r exists mylist{t}]
|
||||
|
||||
# Pop up from the second key.
|
||||
assert_equal {mylist2{t} {1 2 3}} [r lmpop 2 mylist{t} mylist2{t} left count 3]
|
||||
assert_equal 2 [r llen mylist2{t}]
|
||||
assert_equal {mylist2{t} {5 4}} [r lmpop 2 mylist{t} mylist2{t} right count 2]
|
||||
assert_equal 0 [r exists mylist{t}]
|
||||
|
||||
# Pop up all elements.
|
||||
create_list mylist{t} "a b c"
|
||||
create_list mylist2{t} "1 2 3"
|
||||
assert_equal {mylist{t} {a b c}} [r lmpop 2 mylist{t} mylist2{t} left count 10]
|
||||
assert_equal 0 [r llen mylist{t}]
|
||||
assert_equal {mylist2{t} {3 2 1}} [r lmpop 2 mylist{t} mylist2{t} right count 10]
|
||||
assert_equal 0 [r llen mylist2{t}]
|
||||
assert_equal 0 [r exists mylist{t} mylist2{t}]
|
||||
}
|
||||
|
||||
test {LMPOP propagate as pop with count command to replica} {
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
# left/right propagate as lpop/rpop with count
|
||||
r lpush mylist{t} a b c
|
||||
|
||||
# Pop elements from one list.
|
||||
r lmpop 1 mylist{t} left count 1
|
||||
r lmpop 1 mylist{t} right count 1
|
||||
|
||||
# Now the list have only one element
|
||||
r lmpop 2 mylist{t} mylist2{t} left count 10
|
||||
|
||||
# No elements so we don't propagate.
|
||||
r lmpop 2 mylist{t} mylist2{t} left count 10
|
||||
|
||||
# Pop elements from the second list.
|
||||
r rpush mylist2{t} 1 2 3
|
||||
r lmpop 2 mylist{t} mylist2{t} left count 2
|
||||
r lmpop 2 mylist{t} mylist2{t} right count 1
|
||||
|
||||
# Pop all elements.
|
||||
r rpush mylist{t} a b c
|
||||
r rpush mylist2{t} 1 2 3
|
||||
r lmpop 2 mylist{t} mylist2{t} left count 10
|
||||
r lmpop 2 mylist{t} mylist2{t} right count 10
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{lpush mylist{t} a b c}
|
||||
{lpop mylist{t} 1}
|
||||
{rpop mylist{t} 1}
|
||||
{lpop mylist{t} 1}
|
||||
{rpush mylist2{t} 1 2 3}
|
||||
{lpop mylist2{t} 2}
|
||||
{rpop mylist2{t} 1}
|
||||
{rpush mylist{t} a b c}
|
||||
{rpush mylist2{t} 1 2 3}
|
||||
{lpop mylist{t} 3}
|
||||
{rpop mylist2{t} 3}
|
||||
}
|
||||
} {} {needs:repl}
|
||||
|
||||
foreach {type large} [array get largevalue] {
|
||||
test "LRANGE basics - $type" {
|
||||
create_list mylist "$large 1 2 3 4 5 6 7 8 9"
|
||||
@ -1034,6 +1310,7 @@ start_server {
|
||||
r ping
|
||||
} {PONG}
|
||||
|
||||
foreach {pop} {BLPOP BLMPOP_RIGHT} {
|
||||
test "client unblock tests" {
|
||||
r del l
|
||||
set rd [redis_deferring_client]
|
||||
@ -1041,19 +1318,19 @@ start_server {
|
||||
set id [$rd read]
|
||||
|
||||
# test default args
|
||||
$rd blpop l 0
|
||||
bpop_command $rd $pop l 0
|
||||
wait_for_blocked_client
|
||||
r client unblock $id
|
||||
assert_equal {} [$rd read]
|
||||
|
||||
# test with timeout
|
||||
$rd blpop l 0
|
||||
bpop_command $rd $pop l 0
|
||||
wait_for_blocked_client
|
||||
r client unblock $id TIMEOUT
|
||||
assert_equal {} [$rd read]
|
||||
|
||||
# test with error
|
||||
$rd blpop l 0
|
||||
bpop_command $rd $pop l 0
|
||||
wait_for_blocked_client
|
||||
r client unblock $id ERROR
|
||||
catch {[$rd read]} e
|
||||
@ -1069,11 +1346,12 @@ start_server {
|
||||
assert_equal $e {invalid command name "0"}
|
||||
|
||||
# finally, see the this client and list are still functional
|
||||
$rd blpop l 0
|
||||
bpop_command $rd $pop l 0
|
||||
wait_for_blocked_client
|
||||
r lpush l foo
|
||||
assert_equal {l foo} [$rd read]
|
||||
} {}
|
||||
}
|
||||
|
||||
test {List ziplist of various encodings} {
|
||||
r del k
|
||||
|
Loading…
x
Reference in New Issue
Block a user