Adds new pop-push commands (LMOVE, BLMOVE) (#6929)
Adding [B]LMOVE <src> <dst> RIGHT|LEFT RIGHT|LEFT. deprecating [B]RPOPLPUSH. Note that when receiving a BRPOPLPUSH we'll still propagate an RPOPLPUSH, but on BLMOVE RIGHT LEFT we'll propagate an LMOVE improvement to existing tests - Replace "after 1000" with "wait_for_condition" when wait for clients to block/unblock. - Add a pre-existing element to target list on basic tests so that we can check if the new element was added to the correct side of the list. - check command stats on the replica to make sure the right command was replicated Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
2127f7c8eb
commit
c3f9e01794
@ -62,7 +62,8 @@
|
||||
|
||||
#include "server.h"
|
||||
|
||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
|
||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto);
|
||||
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
|
||||
|
||||
/* This structure represents the blocked key information that we store
|
||||
* in the client structure. Each client blocked on keys, has a
|
||||
@ -231,10 +232,9 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||
}
|
||||
|
||||
robj *dstkey = receiver->bpop.target;
|
||||
int where = (receiver->lastcmd &&
|
||||
receiver->lastcmd->proc == blpopCommand) ?
|
||||
LIST_HEAD : LIST_TAIL;
|
||||
robj *value = listTypePop(o,where);
|
||||
int wherefrom = receiver->bpop.listpos.wherefrom;
|
||||
int whereto = receiver->bpop.listpos.whereto;
|
||||
robj *value = listTypePop(o, wherefrom);
|
||||
|
||||
if (value) {
|
||||
/* Protect receiver->bpop.target, that will be
|
||||
@ -245,11 +245,11 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
||||
|
||||
if (serveClientBlockedOnList(receiver,
|
||||
rl->key,dstkey,rl->db,value,
|
||||
where) == C_ERR)
|
||||
wherefrom, whereto) == C_ERR)
|
||||
{
|
||||
/* If we failed serving the client we need
|
||||
* to also undo the POP operation. */
|
||||
listTypePush(o,value,where);
|
||||
listTypePush(o,value,wherefrom);
|
||||
}
|
||||
|
||||
if (dstkey) decrRefCount(dstkey);
|
||||
@ -466,8 +466,8 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
|
||||
* one new element via some write operation are accumulated into
|
||||
* the server.ready_keys list. This function will run the list and will
|
||||
* serve clients accordingly. Note that the function will iterate again and
|
||||
* again as a result of serving BRPOPLPUSH we can have new blocking clients
|
||||
* to serve because of the PUSH side of BRPOPLPUSH.
|
||||
* again as a result of serving BLMOVE we can have new blocking clients
|
||||
* to serve because of the PUSH side of BLMOVE.
|
||||
*
|
||||
* This function is normally "fair", that is, it will server clients
|
||||
* using a FIFO behavior. However this fairness is violated in certain
|
||||
@ -485,7 +485,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
/* Point server.ready_keys to a fresh list and save the current one
|
||||
* locally. This way as we run the old list we are free to call
|
||||
* signalKeyAsReady() that may push new elements in server.ready_keys
|
||||
* when handling clients blocked into BRPOPLPUSH. */
|
||||
* when handling clients blocked into BLMOVE. */
|
||||
l = server.ready_keys;
|
||||
server.ready_keys = listCreate();
|
||||
|
||||
@ -500,7 +500,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
/* Even if we are not inside call(), increment the call depth
|
||||
* in order to make sure that keys are expired against a fixed
|
||||
* reference time, and not against the wallclock time. This
|
||||
* way we can lookup an object multiple times (BRPOPLPUSH does
|
||||
* way we can lookup an object multiple times (BLMOVE does
|
||||
* that) without the risk of it being freed in the second
|
||||
* lookup, invalidating the first one.
|
||||
* See https://github.com/antirez/redis/pull/6554. */
|
||||
@ -560,7 +560,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
* stream keys, we also provide an array of streamID structures: clients will
|
||||
* be unblocked only when items with an ID greater or equal to the specified
|
||||
* one is appended to the stream. */
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) {
|
||||
dictEntry *de;
|
||||
list *l;
|
||||
int j;
|
||||
@ -568,6 +568,8 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
|
||||
c->bpop.timeout = timeout;
|
||||
c->bpop.target = target;
|
||||
|
||||
if (listpos != NULL) c->bpop.listpos = *listpos;
|
||||
|
||||
if (target != NULL) incrRefCount(target);
|
||||
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
|
@ -4492,7 +4492,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
||||
"Blocking module command called from transaction");
|
||||
} else {
|
||||
if (keys) {
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL,NULL);
|
||||
} else {
|
||||
blockClient(c,BLOCKED_MODULE);
|
||||
}
|
||||
|
14
src/server.c
14
src/server.c
@ -302,6 +302,10 @@ struct redisCommand redisCommandTable[] = {
|
||||
"write use-memory no-script @list @blocking",
|
||||
0,NULL,1,2,1,0,0,0},
|
||||
|
||||
{"blmove",blmoveCommand,6,
|
||||
"write use-memory no-script @list @blocking",
|
||||
0,NULL,1,2,1,0,0,0},
|
||||
|
||||
{"blpop",blpopCommand,-3,
|
||||
"write no-script @list @blocking",
|
||||
0,NULL,1,-2,1,0,0,0},
|
||||
@ -338,6 +342,10 @@ struct redisCommand redisCommandTable[] = {
|
||||
"write use-memory @list",
|
||||
0,NULL,1,2,1,0,0,0},
|
||||
|
||||
{"lmove",lmoveCommand,5,
|
||||
"write use-memory @list",
|
||||
0,NULL,1,2,1,0,0,0},
|
||||
|
||||
{"sadd",saddCommand,-3,
|
||||
"write use-memory fast @set",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
@ -2385,10 +2393,15 @@ void createSharedObjects(void) {
|
||||
shared.lpop = createStringObject("LPOP",4);
|
||||
shared.lpush = createStringObject("LPUSH",5);
|
||||
shared.rpoplpush = createStringObject("RPOPLPUSH",9);
|
||||
shared.lmove = createStringObject("LMOVE",5);
|
||||
shared.blmove = createStringObject("BLMOVE",6);
|
||||
shared.zpopmin = createStringObject("ZPOPMIN",7);
|
||||
shared.zpopmax = createStringObject("ZPOPMAX",7);
|
||||
shared.multi = createStringObject("MULTI",5);
|
||||
shared.exec = createStringObject("EXEC",4);
|
||||
/* Used in the LMOVE/BLMOVE commands */
|
||||
shared.left = createStringObject("left",4);
|
||||
shared.right = createStringObject("right",5);
|
||||
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
|
||||
shared.integers[j] =
|
||||
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
|
||||
@ -2523,6 +2536,7 @@ void initServerConfig(void) {
|
||||
server.xclaimCommand = lookupCommandByCString("xclaim");
|
||||
server.xgroupCommand = lookupCommandByCString("xgroup");
|
||||
server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
|
||||
server.lmoveCommand = lookupCommandByCString("lmove");
|
||||
|
||||
/* Debugging */
|
||||
server.watchdog_period = 0;
|
||||
|
18
src/server.h
18
src/server.h
@ -703,7 +703,13 @@ typedef struct blockingState {
|
||||
dict *keys; /* The keys we are waiting to terminate a blocking
|
||||
* operation such as BLPOP or XREAD. Or NULL. */
|
||||
robj *target; /* The key that should receive the element,
|
||||
* for BRPOPLPUSH. */
|
||||
* for BLMOVE. */
|
||||
struct listPos {
|
||||
int wherefrom; /* Where to pop from */
|
||||
int whereto; /* Where to push to */
|
||||
} listpos; /* The positions in the src/dst lists
|
||||
* where we want to pop/push an element
|
||||
* for BLPOP, BRPOP and BLMOVE. */
|
||||
|
||||
/* BLOCK_STREAM */
|
||||
size_t xread_count; /* XREAD COUNT option. */
|
||||
@ -893,8 +899,8 @@ struct sharedObjectsStruct {
|
||||
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
|
||||
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
|
||||
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan,
|
||||
*multi, *exec,
|
||||
*rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax,
|
||||
*emptyscan, *multi, *exec, *left, *right,
|
||||
*select[PROTO_SHARED_SELECT_CMDS],
|
||||
*integers[OBJ_SHARED_INTEGERS],
|
||||
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
|
||||
@ -1140,7 +1146,7 @@ struct redisServer {
|
||||
*lpopCommand, *rpopCommand, *zpopminCommand,
|
||||
*zpopmaxCommand, *sremCommand, *execCommand,
|
||||
*expireCommand, *pexpireCommand, *xclaimCommand,
|
||||
*xgroupCommand, *rpoplpushCommand;
|
||||
*xgroupCommand, *rpoplpushCommand, *lmoveCommand;
|
||||
/* Fields used only for stats */
|
||||
time_t stat_starttime; /* Server start time */
|
||||
long long stat_numcommands; /* Number of processed commands */
|
||||
@ -2226,7 +2232,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
|
||||
void disconnectAllBlockedClients(void);
|
||||
void handleClientsBlockedOnKeys(void);
|
||||
void signalKeyAsReady(redisDb *db, robj *key, int type);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids);
|
||||
|
||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||
void addClientToTimeoutTable(client *c);
|
||||
@ -2337,6 +2343,7 @@ void sortCommand(client *c);
|
||||
void lremCommand(client *c);
|
||||
void lposCommand(client *c);
|
||||
void rpoplpushCommand(client *c);
|
||||
void lmoveCommand(client *c);
|
||||
void infoCommand(client *c);
|
||||
void mgetCommand(client *c);
|
||||
void monitorCommand(client *c);
|
||||
@ -2380,6 +2387,7 @@ void discardCommand(client *c);
|
||||
void blpopCommand(client *c);
|
||||
void brpopCommand(client *c);
|
||||
void brpoplpushCommand(client *c);
|
||||
void blmoveCommand(client *c);
|
||||
void appendCommand(client *c);
|
||||
void strlenCommand(client *c);
|
||||
void zrankCommand(client *c);
|
||||
|
235
src/t_list.c
235
src/t_list.c
@ -647,6 +647,102 @@ void lremCommand(client *c) {
|
||||
addReplyLongLong(c,removed);
|
||||
}
|
||||
|
||||
void lmoveHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value,
|
||||
int where) {
|
||||
/* Create the list if the key does not exist */
|
||||
if (!dstobj) {
|
||||
dstobj = createQuicklistObject();
|
||||
quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
|
||||
server.list_compress_depth);
|
||||
dbAdd(c->db,dstkey,dstobj);
|
||||
}
|
||||
signalModifiedKey(c,c->db,dstkey);
|
||||
listTypePush(dstobj,value,where);
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,
|
||||
where == LIST_HEAD ? "lpush" : "rpush",
|
||||
dstkey,
|
||||
c->db->id);
|
||||
/* Always send the pushed value to the client. */
|
||||
addReplyBulk(c,value);
|
||||
}
|
||||
|
||||
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position) {
|
||||
if (strcasecmp(arg->ptr,"right") == 0) {
|
||||
*position = LIST_TAIL;
|
||||
} else if (strcasecmp(arg->ptr,"left") == 0) {
|
||||
*position = LIST_HEAD;
|
||||
} else {
|
||||
addReply(c,shared.syntaxerr);
|
||||
return C_ERR;
|
||||
}
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
robj *getStringObjectFromListPosition(int position) {
|
||||
if (position == LIST_HEAD) {
|
||||
return shared.left;
|
||||
} else {
|
||||
// LIST_TAIL
|
||||
return shared.right;
|
||||
}
|
||||
}
|
||||
|
||||
void lmoveGenericCommand(client *c, int wherefrom, int whereto) {
|
||||
robj *sobj, *value;
|
||||
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
|
||||
== NULL || checkType(c,sobj,OBJ_LIST)) return;
|
||||
|
||||
if (listTypeLength(sobj) == 0) {
|
||||
/* This may only happen after loading very old RDB files. Recent
|
||||
* versions of Redis delete keys of empty lists. */
|
||||
addReplyNull(c);
|
||||
} else {
|
||||
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
|
||||
robj *touchedkey = c->argv[1];
|
||||
|
||||
if (checkType(c,dobj,OBJ_LIST)) return;
|
||||
value = listTypePop(sobj,wherefrom);
|
||||
/* We saved touched key, and protect it, since lmoveHandlePush
|
||||
* may change the client command argument vector (it does not
|
||||
* currently). */
|
||||
incrRefCount(touchedkey);
|
||||
lmoveHandlePush(c,c->argv[2],dobj,value,whereto);
|
||||
|
||||
/* listTypePop returns an object with its refcount incremented */
|
||||
decrRefCount(value);
|
||||
|
||||
/* Delete the source list when it is empty */
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,
|
||||
wherefrom == LIST_HEAD ? "lpop" : "rpop",
|
||||
touchedkey,
|
||||
c->db->id);
|
||||
if (listTypeLength(sobj) == 0) {
|
||||
dbDelete(c->db,touchedkey);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
|
||||
touchedkey,c->db->id);
|
||||
}
|
||||
signalModifiedKey(c,c->db,touchedkey);
|
||||
decrRefCount(touchedkey);
|
||||
server.dirty++;
|
||||
if (c->cmd->proc == blmoveCommand) {
|
||||
rewriteClientCommandVector(c,5,shared.lmove,
|
||||
c->argv[1],c->argv[2],c->argv[3],c->argv[4]);
|
||||
} else if (c->cmd->proc == brpoplpushCommand) {
|
||||
rewriteClientCommandVector(c,3,shared.rpoplpush,
|
||||
c->argv[1],c->argv[2]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void lmoveCommand(client *c) {
|
||||
int wherefrom, whereto;
|
||||
if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom)
|
||||
!= C_OK) return;
|
||||
if (getListPositionFromObjectOrReply(c,c->argv[4],&whereto)
|
||||
!= C_OK) return;
|
||||
lmoveGenericCommand(c, wherefrom, whereto);
|
||||
}
|
||||
|
||||
/* This is the semantic of this command:
|
||||
* RPOPLPUSH srclist dstlist:
|
||||
* IF LLEN(srclist) > 0
|
||||
@ -662,60 +758,8 @@ void lremCommand(client *c) {
|
||||
* since the element is not just returned but pushed against another list
|
||||
* as well. This command was originally proposed by Ezra Zygmuntowicz.
|
||||
*/
|
||||
|
||||
void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
|
||||
/* Create the list if the key does not exist */
|
||||
if (!dstobj) {
|
||||
dstobj = createQuicklistObject();
|
||||
quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
|
||||
server.list_compress_depth);
|
||||
dbAdd(c->db,dstkey,dstobj);
|
||||
}
|
||||
signalModifiedKey(c,c->db,dstkey);
|
||||
listTypePush(dstobj,value,LIST_HEAD);
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
|
||||
/* Always send the pushed value to the client. */
|
||||
addReplyBulk(c,value);
|
||||
}
|
||||
|
||||
void rpoplpushCommand(client *c) {
|
||||
robj *sobj, *value;
|
||||
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
|
||||
== NULL || checkType(c,sobj,OBJ_LIST)) return;
|
||||
|
||||
if (listTypeLength(sobj) == 0) {
|
||||
/* This may only happen after loading very old RDB files. Recent
|
||||
* versions of Redis delete keys of empty lists. */
|
||||
addReplyNull(c);
|
||||
} else {
|
||||
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
|
||||
robj *touchedkey = c->argv[1];
|
||||
|
||||
if (checkType(c,dobj,OBJ_LIST)) return;
|
||||
value = listTypePop(sobj,LIST_TAIL);
|
||||
/* We saved touched key, and protect it, since rpoplpushHandlePush
|
||||
* may change the client command argument vector (it does not
|
||||
* currently). */
|
||||
incrRefCount(touchedkey);
|
||||
rpoplpushHandlePush(c,c->argv[2],dobj,value);
|
||||
|
||||
/* listTypePop returns an object with its refcount incremented */
|
||||
decrRefCount(value);
|
||||
|
||||
/* Delete the source list when it is empty */
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
|
||||
if (listTypeLength(sobj) == 0) {
|
||||
dbDelete(c->db,touchedkey);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
|
||||
touchedkey,c->db->id);
|
||||
}
|
||||
signalModifiedKey(c,c->db,touchedkey);
|
||||
decrRefCount(touchedkey);
|
||||
server.dirty++;
|
||||
if (c->cmd->proc == brpoplpushCommand) {
|
||||
rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]);
|
||||
}
|
||||
}
|
||||
lmoveGenericCommand(c, LIST_TAIL, LIST_HEAD);
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
@ -727,30 +771,34 @@ void rpoplpushCommand(client *c) {
|
||||
* in the context of the specified 'db', doing the following:
|
||||
*
|
||||
* 1) Provide the client with the 'value' element.
|
||||
* 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
|
||||
* 'value' element on the destination list (the LPUSH side of the command).
|
||||
* 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
|
||||
* 2) If the dstkey is not NULL (we are serving a BLMOVE) also push the
|
||||
* 'value' element on the destination list (the "push" side of the command).
|
||||
* 3) Propagate the resulting BRPOP, BLPOP and additional xPUSH if any into
|
||||
* the AOF and replication channel.
|
||||
*
|
||||
* The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
|
||||
* The argument 'wherefrom' is LIST_TAIL or LIST_HEAD, and indicates if the
|
||||
* 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that
|
||||
* we can propagate the command properly.
|
||||
*
|
||||
* The argument 'whereto' is LIST_TAIL or LIST_HEAD, and indicates if the
|
||||
* 'value' element is to be pushed to the head or tail so that we can
|
||||
* propagate the command properly.
|
||||
*
|
||||
* The function returns C_OK if we are able to serve the client, otherwise
|
||||
* C_ERR is returned to signal the caller that the list POP operation
|
||||
* should be undone as the client was not served: This only happens for
|
||||
* BRPOPLPUSH that fails to push the value to the destination key as it is
|
||||
* BLMOVE that fails to push the value to the destination key as it is
|
||||
* of the wrong type. */
|
||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
|
||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto)
|
||||
{
|
||||
robj *argv[3];
|
||||
robj *argv[5];
|
||||
|
||||
if (dstkey == NULL) {
|
||||
/* Propagate the [LR]POP operation. */
|
||||
argv[0] = (where == LIST_HEAD) ? shared.lpop :
|
||||
shared.rpop;
|
||||
argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop :
|
||||
shared.rpop;
|
||||
argv[1] = key;
|
||||
propagate((where == LIST_HEAD) ?
|
||||
propagate((wherefrom == LIST_HEAD) ?
|
||||
server.lpopCommand : server.rpopCommand,
|
||||
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
|
||||
@ -760,30 +808,33 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
||||
addReplyBulk(receiver,value);
|
||||
|
||||
/* Notify event. */
|
||||
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
||||
char *event = (wherefrom == LIST_HEAD) ? "lpop" : "rpop";
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
|
||||
} else {
|
||||
/* BRPOPLPUSH */
|
||||
/* BLMOVE */
|
||||
robj *dstobj =
|
||||
lookupKeyWrite(receiver->db,dstkey);
|
||||
if (!(dstobj &&
|
||||
checkType(receiver,dstobj,OBJ_LIST)))
|
||||
{
|
||||
rpoplpushHandlePush(receiver,dstkey,dstobj,
|
||||
value);
|
||||
/* Propagate the RPOPLPUSH operation. */
|
||||
argv[0] = shared.rpoplpush;
|
||||
lmoveHandlePush(receiver,dstkey,dstobj,value,whereto);
|
||||
/* Propagate the LMOVE/RPOPLPUSH operation. */
|
||||
int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand);
|
||||
argv[0] = isbrpoplpush ? shared.rpoplpush : shared.lmove;
|
||||
argv[1] = key;
|
||||
argv[2] = dstkey;
|
||||
propagate(server.rpoplpushCommand,
|
||||
db->id,argv,3,
|
||||
argv[3] = getStringObjectFromListPosition(wherefrom);
|
||||
argv[4] = getStringObjectFromListPosition(whereto);
|
||||
propagate(isbrpoplpush ? server.rpoplpushCommand : server.lmoveCommand,
|
||||
db->id,argv,(isbrpoplpush ? 3 : 5),
|
||||
PROPAGATE_AOF|
|
||||
PROPAGATE_REPL);
|
||||
|
||||
/* Notify event ("lpush" was notified by rpoplpushHandlePush). */
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,"rpop",key,receiver->db->id);
|
||||
/* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop",
|
||||
key,receiver->db->id);
|
||||
} else {
|
||||
/* BRPOPLPUSH failed because of wrong
|
||||
/* BLMOVE failed because of wrong
|
||||
* destination type. */
|
||||
return C_ERR;
|
||||
}
|
||||
@ -844,7 +895,8 @@ void blockingPopGenericCommand(client *c, int where) {
|
||||
}
|
||||
|
||||
/* If the keys do not exist we must block */
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
|
||||
struct listPos pos = {where};
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL);
|
||||
}
|
||||
|
||||
void blpopCommand(client *c) {
|
||||
@ -855,12 +907,7 @@ void brpopCommand(client *c) {
|
||||
blockingPopGenericCommand(c,LIST_TAIL);
|
||||
}
|
||||
|
||||
void brpoplpushCommand(client *c) {
|
||||
mstime_t timeout;
|
||||
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
|
||||
!= C_OK) return;
|
||||
|
||||
void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeout) {
|
||||
robj *key = lookupKeyWrite(c->db, c->argv[1]);
|
||||
if (checkType(c,key,OBJ_LIST)) return;
|
||||
|
||||
@ -871,12 +918,32 @@ void brpoplpushCommand(client *c) {
|
||||
addReplyNull(c);
|
||||
} else {
|
||||
/* The list is empty and the client blocks. */
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
|
||||
struct listPos pos = {wherefrom, whereto};
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],&pos,NULL);
|
||||
}
|
||||
} else {
|
||||
/* The list exists and has elements, so
|
||||
* the regular rpoplpushCommand is executed. */
|
||||
* the regular lmoveCommand is executed. */
|
||||
serverAssertWithInfo(c,key,listTypeLength(key) > 0);
|
||||
rpoplpushCommand(c);
|
||||
lmoveGenericCommand(c,wherefrom,whereto);
|
||||
}
|
||||
}
|
||||
|
||||
void blmoveCommand(client *c) {
|
||||
mstime_t timeout;
|
||||
int wherefrom, whereto;
|
||||
if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom)
|
||||
!= C_OK) return;
|
||||
if (getListPositionFromObjectOrReply(c,c->argv[4],&whereto)
|
||||
!= C_OK) return;
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[5],&timeout,UNIT_SECONDS)
|
||||
!= C_OK) return;
|
||||
blmoveGenericCommand(c,wherefrom,whereto,timeout);
|
||||
}
|
||||
|
||||
void brpoplpushCommand(client *c) {
|
||||
mstime_t timeout;
|
||||
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
|
||||
!= C_OK) return;
|
||||
blmoveGenericCommand(c, LIST_TAIL, LIST_HEAD, timeout);
|
||||
}
|
||||
|
@ -1628,7 +1628,7 @@ void xreadCommand(client *c) {
|
||||
goto cleanup;
|
||||
}
|
||||
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
|
||||
timeout, NULL, ids);
|
||||
timeout, NULL, NULL, ids);
|
||||
/* If no COUNT is given and we block, set a relatively small count:
|
||||
* in case the ID provided is too low, we do not want the server to
|
||||
* block just to serve this client a huge stream of messages. */
|
||||
|
@ -3364,7 +3364,7 @@ void blockingGenericZpopCommand(client *c, int where) {
|
||||
}
|
||||
|
||||
/* If the keys do not exist we must block */
|
||||
blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
|
||||
blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL,NULL);
|
||||
}
|
||||
|
||||
// BZPOPMIN key [key ...] timeout
|
||||
|
@ -78,6 +78,7 @@ start_server {tags {"repl"}} {
|
||||
}
|
||||
|
||||
test {BRPOPLPUSH replication, when blocking against empty list} {
|
||||
$A config resetstat
|
||||
set rd [redis_deferring_client]
|
||||
$rd brpoplpush a b 5
|
||||
r lpush a foo
|
||||
@ -86,9 +87,12 @@ start_server {tags {"repl"}} {
|
||||
} else {
|
||||
fail "Master and replica have different digest: [$A debug digest] VS [$B debug digest]"
|
||||
}
|
||||
assert_match {*calls=1,*} [cmdrstat rpoplpush $A]
|
||||
assert_match {} [cmdrstat lmove $A]
|
||||
}
|
||||
|
||||
test {BRPOPLPUSH replication, list exists} {
|
||||
$A config resetstat
|
||||
set rd [redis_deferring_client]
|
||||
r lpush c 1
|
||||
r lpush c 2
|
||||
@ -96,6 +100,39 @@ start_server {tags {"repl"}} {
|
||||
$rd brpoplpush c d 5
|
||||
after 1000
|
||||
assert_equal [$A debug digest] [$B debug digest]
|
||||
assert_match {*calls=1,*} [cmdrstat rpoplpush $A]
|
||||
assert_match {} [cmdrstat lmove $A]
|
||||
}
|
||||
|
||||
foreach wherefrom {left right} {
|
||||
foreach whereto {left right} {
|
||||
test "BLMOVE ($wherefrom, $whereto) replication, when blocking against empty list" {
|
||||
$A config resetstat
|
||||
set rd [redis_deferring_client]
|
||||
$rd blmove a b $wherefrom $whereto 5
|
||||
r lpush a foo
|
||||
wait_for_condition 50 100 {
|
||||
[$A debug digest] eq [$B debug digest]
|
||||
} else {
|
||||
fail "Master and replica have different digest: [$A debug digest] VS [$B debug digest]"
|
||||
}
|
||||
assert_match {*calls=1,*} [cmdrstat lmove $A]
|
||||
assert_match {} [cmdrstat rpoplpush $A]
|
||||
}
|
||||
|
||||
test "BLMOVE ($wherefrom, $whereto) replication, list exists" {
|
||||
$A config resetstat
|
||||
set rd [redis_deferring_client]
|
||||
r lpush c 1
|
||||
r lpush c 2
|
||||
r lpush c 3
|
||||
$rd blmove c d $wherefrom $whereto 5
|
||||
after 1000
|
||||
assert_equal [$A debug digest] [$B debug digest]
|
||||
assert_match {*calls=1,*} [cmdrstat lmove $A]
|
||||
assert_match {} [cmdrstat rpoplpush $A]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test {BLPOP followed by role change, issue #2473} {
|
||||
|
@ -519,3 +519,9 @@ proc get_child_pid {idx} {
|
||||
|
||||
return $child_pid
|
||||
}
|
||||
|
||||
proc cmdrstat {cmd r} {
|
||||
if {[regexp "\r\ncmdstat_$cmd:(.*?)\r\n" [$r info commandstats] _ value]} {
|
||||
set _ $value
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,5 @@
|
||||
proc cmdstat {cmd} {
|
||||
if {[regexp "\r\ncmdstat_$cmd:(.*?)\r\n" [r info commandstats] _ value]} {
|
||||
set _ $value
|
||||
}
|
||||
return [cmdrstat $cmd r]
|
||||
}
|
||||
|
||||
start_server {tags {"introspection"}} {
|
||||
|
@ -194,6 +194,7 @@ start_server {
|
||||
|
||||
test "BRPOPLPUSH - $type" {
|
||||
r del target
|
||||
r rpush target bar
|
||||
|
||||
set rd [redis_deferring_client]
|
||||
create_list blist "a b $large c d"
|
||||
@ -201,9 +202,38 @@ start_server {
|
||||
$rd brpoplpush blist target 1
|
||||
assert_equal d [$rd read]
|
||||
|
||||
assert_equal d [r rpop target]
|
||||
assert_equal d [r lpop target]
|
||||
assert_equal "a b $large c" [r lrange blist 0 -1]
|
||||
}
|
||||
|
||||
foreach wherefrom {left right} {
|
||||
foreach whereto {left right} {
|
||||
test "BLMOVE $wherefrom $whereto - $type" {
|
||||
r del target
|
||||
r rpush target bar
|
||||
|
||||
set rd [redis_deferring_client]
|
||||
create_list blist "a b $large c d"
|
||||
|
||||
$rd blmove blist target $wherefrom $whereto 1
|
||||
set poppedelement [$rd read]
|
||||
|
||||
if {$wherefrom eq "right"} {
|
||||
assert_equal d $poppedelement
|
||||
assert_equal "a b $large c" [r lrange blist 0 -1]
|
||||
} else {
|
||||
assert_equal a $poppedelement
|
||||
assert_equal "b $large c d" [r lrange blist 0 -1]
|
||||
}
|
||||
|
||||
if {$whereto eq "right"} {
|
||||
assert_equal $poppedelement [r rpop target]
|
||||
} else {
|
||||
assert_equal $poppedelement [r lpop target]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test "BLPOP, LPUSH + DEL should not awake blocked client" {
|
||||
@ -285,24 +315,60 @@ start_server {
|
||||
test "BRPOPLPUSH with zero timeout should block indefinitely" {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist target
|
||||
r rpush target bar
|
||||
$rd brpoplpush blist target 0
|
||||
after 1000
|
||||
wait_for_condition 100 10 {
|
||||
[s blocked_clients] == 1
|
||||
} else {
|
||||
fail "Timeout waiting for blocked clients"
|
||||
}
|
||||
r rpush blist foo
|
||||
assert_equal foo [$rd read]
|
||||
assert_equal {foo} [r lrange target 0 -1]
|
||||
assert_equal {foo bar} [r lrange target 0 -1]
|
||||
}
|
||||
|
||||
test "BRPOPLPUSH with a client BLPOPing the target list" {
|
||||
set rd [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
r del blist target
|
||||
$rd2 blpop target 0
|
||||
$rd brpoplpush blist target 0
|
||||
after 1000
|
||||
r rpush blist foo
|
||||
assert_equal foo [$rd read]
|
||||
assert_equal {target foo} [$rd2 read]
|
||||
assert_equal 0 [r exists target]
|
||||
foreach wherefrom {left right} {
|
||||
foreach whereto {left right} {
|
||||
test "BLMOVE $wherefrom $whereto with zero timeout should block indefinitely" {
|
||||
set rd [redis_deferring_client]
|
||||
r del blist target
|
||||
r rpush target bar
|
||||
$rd blmove blist target $wherefrom $whereto 0
|
||||
wait_for_condition 100 10 {
|
||||
[s blocked_clients] == 1
|
||||
} else {
|
||||
fail "Timeout waiting for blocked clients"
|
||||
}
|
||||
r rpush blist foo
|
||||
assert_equal foo [$rd read]
|
||||
if {$whereto eq "right"} {
|
||||
assert_equal {bar foo} [r lrange target 0 -1]
|
||||
} else {
|
||||
assert_equal {foo bar} [r lrange target 0 -1]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
foreach wherefrom {left right} {
|
||||
foreach whereto {left right} {
|
||||
test "BLMOVE ($wherefrom, $whereto) with a client BLPOPing the target list" {
|
||||
set rd [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
r del blist target
|
||||
$rd2 blpop target 0
|
||||
$rd blmove blist target $wherefrom $whereto 0
|
||||
wait_for_condition 100 10 {
|
||||
[s blocked_clients] == 2
|
||||
} else {
|
||||
fail "Timeout waiting for blocked clients"
|
||||
}
|
||||
r rpush blist foo
|
||||
assert_equal foo [$rd read]
|
||||
assert_equal {target foo} [$rd2 read]
|
||||
assert_equal 0 [r exists target]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test "BRPOPLPUSH with wrong source type" {
|
||||
@ -325,7 +391,11 @@ start_server {
|
||||
r del blist target
|
||||
r set target nolist
|
||||
$rd brpoplpush blist target 0
|
||||
after 1000
|
||||
wait_for_condition 100 10 {
|
||||
[s blocked_clients] == 1
|
||||
} else {
|
||||
fail "Timeout waiting for blocked clients"
|
||||
}
|
||||
r rpush blist foo
|
||||
assert_error "WRONGTYPE*" {$rd read}
|
||||
assert_equal {foo} [r lrange blist 0 -1]
|
||||
@ -355,14 +425,14 @@ start_server {
|
||||
assert_equal {foo} [r lrange target2 0 -1]
|
||||
}
|
||||
|
||||
test "Linked BRPOPLPUSH" {
|
||||
test "Linked LMOVEs" {
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
|
||||
r del list1 list2 list3
|
||||
|
||||
$rd1 brpoplpush list1 list2 0
|
||||
$rd2 brpoplpush list2 list3 0
|
||||
$rd1 blmove list1 list2 right left 0
|
||||
$rd2 blmove list2 list3 left right 0
|
||||
|
||||
r rpush list1 foo
|
||||
|
||||
@ -451,7 +521,16 @@ start_server {
|
||||
set rd [redis_deferring_client]
|
||||
|
||||
$rd brpoplpush foo_list bar_list 1
|
||||
after 2000
|
||||
wait_for_condition 100 10 {
|
||||
[s blocked_clients] == 1
|
||||
} else {
|
||||
fail "Timeout waiting for blocked client"
|
||||
}
|
||||
wait_for_condition 500 10 {
|
||||
[s blocked_clients] == 0
|
||||
} else {
|
||||
fail "Timeout waiting for client to unblock"
|
||||
}
|
||||
$rd read
|
||||
} {}
|
||||
|
||||
@ -676,6 +755,29 @@ start_server {
|
||||
assert_encoding quicklist mylist2
|
||||
}
|
||||
|
||||
foreach wherefrom {left right} {
|
||||
foreach whereto {left right} {
|
||||
test "LMOVE $wherefrom $whereto base case - $type" {
|
||||
r del mylist1 mylist2
|
||||
|
||||
if {$wherefrom eq "right"} {
|
||||
create_list mylist1 "c d $large a"
|
||||
} else {
|
||||
create_list mylist1 "a $large c d"
|
||||
}
|
||||
assert_equal a [r lmove mylist1 mylist2 $wherefrom $whereto]
|
||||
assert_equal $large [r lmove mylist1 mylist2 $wherefrom $whereto]
|
||||
assert_equal "c d" [r lrange mylist1 0 -1]
|
||||
if {$whereto eq "right"} {
|
||||
assert_equal "a $large" [r lrange mylist2 0 -1]
|
||||
} else {
|
||||
assert_equal "$large a" [r lrange mylist2 0 -1]
|
||||
}
|
||||
assert_encoding quicklist mylist2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test "RPOPLPUSH with the same list as src and dst - $type" {
|
||||
create_list mylist "a $large c"
|
||||
assert_equal "a $large c" [r lrange mylist 0 -1]
|
||||
@ -683,6 +785,26 @@ start_server {
|
||||
assert_equal "c a $large" [r lrange mylist 0 -1]
|
||||
}
|
||||
|
||||
foreach wherefrom {left right} {
|
||||
foreach whereto {left right} {
|
||||
test "LMOVE $wherefrom $whereto with the same list as src and dst - $type" {
|
||||
if {$wherefrom eq "right"} {
|
||||
create_list mylist "a $large c"
|
||||
assert_equal "a $large c" [r lrange mylist 0 -1]
|
||||
} else {
|
||||
create_list mylist "c a $large"
|
||||
assert_equal "c a $large" [r lrange mylist 0 -1]
|
||||
}
|
||||
assert_equal c [r lmove mylist mylist $wherefrom $whereto]
|
||||
if {$whereto eq "right"} {
|
||||
assert_equal "a $large c" [r lrange mylist 0 -1]
|
||||
} else {
|
||||
assert_equal "c a $large" [r lrange mylist 0 -1]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
foreach {othertype otherlarge} [array get largevalue] {
|
||||
test "RPOPLPUSH with $type source and existing target $othertype" {
|
||||
create_list srclist "a b c $large"
|
||||
@ -698,6 +820,35 @@ start_server {
|
||||
assert_encoding quicklist dstlist
|
||||
}
|
||||
}
|
||||
|
||||
foreach wherefrom {left right} {
|
||||
foreach whereto {left right} {
|
||||
test "LMOVE $wherefrom $whereto with $type source and existing target $othertype" {
|
||||
create_list dstlist "$otherlarge"
|
||||
|
||||
if {$wherefrom eq "right"} {
|
||||
create_list srclist "a b c $large"
|
||||
} else {
|
||||
create_list srclist "$large c a b"
|
||||
}
|
||||
assert_equal $large [r lmove srclist dstlist $wherefrom $whereto]
|
||||
assert_equal c [r lmove srclist dstlist $wherefrom $whereto]
|
||||
assert_equal "a b" [r lrange srclist 0 -1]
|
||||
|
||||
if {$whereto eq "right"} {
|
||||
assert_equal "$otherlarge $large c" [r lrange dstlist 0 -1]
|
||||
} else {
|
||||
assert_equal "c $large $otherlarge" [r lrange dstlist 0 -1]
|
||||
}
|
||||
|
||||
# When we lmoved a large value, dstlist should be
|
||||
# converted to the same encoding as srclist.
|
||||
if {$type eq "linkedlist"} {
|
||||
assert_encoding quicklist dstlist
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user