diff --git a/src/blocked.c b/src/blocked.c index 87f816faa..8641fcf8c 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -62,7 +62,8 @@ #include "server.h" -int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); +int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto); +int getListPositionFromObjectOrReply(client *c, robj *arg, int *position); /* This structure represents the blocked key information that we store * in the client structure. Each client blocked on keys, has a @@ -231,10 +232,9 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { } robj *dstkey = receiver->bpop.target; - int where = (receiver->lastcmd && - receiver->lastcmd->proc == blpopCommand) ? - LIST_HEAD : LIST_TAIL; - robj *value = listTypePop(o,where); + int wherefrom = receiver->bpop.listpos.wherefrom; + int whereto = receiver->bpop.listpos.whereto; + robj *value = listTypePop(o, wherefrom); if (value) { /* Protect receiver->bpop.target, that will be @@ -245,11 +245,11 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) { if (serveClientBlockedOnList(receiver, rl->key,dstkey,rl->db,value, - where) == C_ERR) + wherefrom, whereto) == C_ERR) { /* If we failed serving the client we need * to also undo the POP operation. */ - listTypePush(o,value,where); + listTypePush(o,value,wherefrom); } if (dstkey) decrRefCount(dstkey); @@ -466,8 +466,8 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { * one new element via some write operation are accumulated into * the server.ready_keys list. This function will run the list and will * serve clients accordingly. Note that the function will iterate again and - * again as a result of serving BRPOPLPUSH we can have new blocking clients - * to serve because of the PUSH side of BRPOPLPUSH. + * again as a result of serving BLMOVE we can have new blocking clients + * to serve because of the PUSH side of BLMOVE. * * This function is normally "fair", that is, it will server clients * using a FIFO behavior. However this fairness is violated in certain @@ -485,7 +485,7 @@ void handleClientsBlockedOnKeys(void) { /* Point server.ready_keys to a fresh list and save the current one * locally. This way as we run the old list we are free to call * signalKeyAsReady() that may push new elements in server.ready_keys - * when handling clients blocked into BRPOPLPUSH. */ + * when handling clients blocked into BLMOVE. */ l = server.ready_keys; server.ready_keys = listCreate(); @@ -500,7 +500,7 @@ void handleClientsBlockedOnKeys(void) { /* Even if we are not inside call(), increment the call depth * in order to make sure that keys are expired against a fixed * reference time, and not against the wallclock time. This - * way we can lookup an object multiple times (BRPOPLPUSH does + * way we can lookup an object multiple times (BLMOVE does * that) without the risk of it being freed in the second * lookup, invalidating the first one. * See https://github.com/antirez/redis/pull/6554. */ @@ -560,7 +560,7 @@ void handleClientsBlockedOnKeys(void) { * stream keys, we also provide an array of streamID structures: clients will * be unblocked only when items with an ID greater or equal to the specified * one is appended to the stream. */ -void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) { +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids) { dictEntry *de; list *l; int j; @@ -568,6 +568,8 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo c->bpop.timeout = timeout; c->bpop.target = target; + if (listpos != NULL) c->bpop.listpos = *listpos; + if (target != NULL) incrRefCount(target); for (j = 0; j < numkeys; j++) { diff --git a/src/module.c b/src/module.c index fb8f83e9c..3f3d92aea 100644 --- a/src/module.c +++ b/src/module.c @@ -4492,7 +4492,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF "Blocking module command called from transaction"); } else { if (keys) { - blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL); + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL,NULL); } else { blockClient(c,BLOCKED_MODULE); } diff --git a/src/server.c b/src/server.c index d8aaef06d..a9a18ea19 100644 --- a/src/server.c +++ b/src/server.c @@ -302,6 +302,10 @@ struct redisCommand redisCommandTable[] = { "write use-memory no-script @list @blocking", 0,NULL,1,2,1,0,0,0}, + {"blmove",blmoveCommand,6, + "write use-memory no-script @list @blocking", + 0,NULL,1,2,1,0,0,0}, + {"blpop",blpopCommand,-3, "write no-script @list @blocking", 0,NULL,1,-2,1,0,0,0}, @@ -338,6 +342,10 @@ struct redisCommand redisCommandTable[] = { "write use-memory @list", 0,NULL,1,2,1,0,0,0}, + {"lmove",lmoveCommand,5, + "write use-memory @list", + 0,NULL,1,2,1,0,0,0}, + {"sadd",saddCommand,-3, "write use-memory fast @set", 0,NULL,1,1,1,0,0,0}, @@ -2385,10 +2393,15 @@ void createSharedObjects(void) { shared.lpop = createStringObject("LPOP",4); shared.lpush = createStringObject("LPUSH",5); shared.rpoplpush = createStringObject("RPOPLPUSH",9); + shared.lmove = createStringObject("LMOVE",5); + shared.blmove = createStringObject("BLMOVE",6); shared.zpopmin = createStringObject("ZPOPMIN",7); shared.zpopmax = createStringObject("ZPOPMAX",7); shared.multi = createStringObject("MULTI",5); shared.exec = createStringObject("EXEC",4); + /* Used in the LMOVE/BLMOVE commands */ + shared.left = createStringObject("left",4); + shared.right = createStringObject("right",5); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -2523,6 +2536,7 @@ void initServerConfig(void) { server.xclaimCommand = lookupCommandByCString("xclaim"); server.xgroupCommand = lookupCommandByCString("xgroup"); server.rpoplpushCommand = lookupCommandByCString("rpoplpush"); + server.lmoveCommand = lookupCommandByCString("lmove"); /* Debugging */ server.watchdog_period = 0; diff --git a/src/server.h b/src/server.h index 114034962..8ddc0b75f 100644 --- a/src/server.h +++ b/src/server.h @@ -703,7 +703,13 @@ typedef struct blockingState { dict *keys; /* The keys we are waiting to terminate a blocking * operation such as BLPOP or XREAD. Or NULL. */ robj *target; /* The key that should receive the element, - * for BRPOPLPUSH. */ + * for BLMOVE. */ + struct listPos { + int wherefrom; /* Where to pop from */ + int whereto; /* Where to push to */ + } listpos; /* The positions in the src/dst lists + * where we want to pop/push an element + * for BLPOP, BRPOP and BLMOVE. */ /* BLOCK_STREAM */ size_t xread_count; /* XREAD COUNT option. */ @@ -893,8 +899,8 @@ struct sharedObjectsStruct { *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, - *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, - *multi, *exec, + *rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax, + *emptyscan, *multi, *exec, *left, *right, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -1140,7 +1146,7 @@ struct redisServer { *lpopCommand, *rpopCommand, *zpopminCommand, *zpopmaxCommand, *sremCommand, *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand, - *xgroupCommand, *rpoplpushCommand; + *xgroupCommand, *rpoplpushCommand, *lmoveCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ @@ -2226,7 +2232,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key, int type); -void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids); /* timeout.c -- Blocked clients timeout and connections timeout. */ void addClientToTimeoutTable(client *c); @@ -2337,6 +2343,7 @@ void sortCommand(client *c); void lremCommand(client *c); void lposCommand(client *c); void rpoplpushCommand(client *c); +void lmoveCommand(client *c); void infoCommand(client *c); void mgetCommand(client *c); void monitorCommand(client *c); @@ -2380,6 +2387,7 @@ void discardCommand(client *c); void blpopCommand(client *c); void brpopCommand(client *c); void brpoplpushCommand(client *c); +void blmoveCommand(client *c); void appendCommand(client *c); void strlenCommand(client *c); void zrankCommand(client *c); diff --git a/src/t_list.c b/src/t_list.c index 4edcc8be0..e7a69b1e9 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -647,6 +647,102 @@ void lremCommand(client *c) { addReplyLongLong(c,removed); } +void lmoveHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value, + int where) { + /* Create the list if the key does not exist */ + if (!dstobj) { + dstobj = createQuicklistObject(); + quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size, + server.list_compress_depth); + dbAdd(c->db,dstkey,dstobj); + } + signalModifiedKey(c,c->db,dstkey); + listTypePush(dstobj,value,where); + notifyKeyspaceEvent(NOTIFY_LIST, + where == LIST_HEAD ? "lpush" : "rpush", + dstkey, + c->db->id); + /* Always send the pushed value to the client. */ + addReplyBulk(c,value); +} + +int getListPositionFromObjectOrReply(client *c, robj *arg, int *position) { + if (strcasecmp(arg->ptr,"right") == 0) { + *position = LIST_TAIL; + } else if (strcasecmp(arg->ptr,"left") == 0) { + *position = LIST_HEAD; + } else { + addReply(c,shared.syntaxerr); + return C_ERR; + } + return C_OK; +} + +robj *getStringObjectFromListPosition(int position) { + if (position == LIST_HEAD) { + return shared.left; + } else { + // LIST_TAIL + return shared.right; + } +} + +void lmoveGenericCommand(client *c, int wherefrom, int whereto) { + robj *sobj, *value; + if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp])) + == NULL || checkType(c,sobj,OBJ_LIST)) return; + + if (listTypeLength(sobj) == 0) { + /* This may only happen after loading very old RDB files. Recent + * versions of Redis delete keys of empty lists. */ + addReplyNull(c); + } else { + robj *dobj = lookupKeyWrite(c->db,c->argv[2]); + robj *touchedkey = c->argv[1]; + + if (checkType(c,dobj,OBJ_LIST)) return; + value = listTypePop(sobj,wherefrom); + /* We saved touched key, and protect it, since lmoveHandlePush + * may change the client command argument vector (it does not + * currently). */ + incrRefCount(touchedkey); + lmoveHandlePush(c,c->argv[2],dobj,value,whereto); + + /* listTypePop returns an object with its refcount incremented */ + decrRefCount(value); + + /* Delete the source list when it is empty */ + notifyKeyspaceEvent(NOTIFY_LIST, + wherefrom == LIST_HEAD ? "lpop" : "rpop", + touchedkey, + c->db->id); + if (listTypeLength(sobj) == 0) { + dbDelete(c->db,touchedkey); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del", + touchedkey,c->db->id); + } + signalModifiedKey(c,c->db,touchedkey); + decrRefCount(touchedkey); + server.dirty++; + if (c->cmd->proc == blmoveCommand) { + rewriteClientCommandVector(c,5,shared.lmove, + c->argv[1],c->argv[2],c->argv[3],c->argv[4]); + } else if (c->cmd->proc == brpoplpushCommand) { + rewriteClientCommandVector(c,3,shared.rpoplpush, + c->argv[1],c->argv[2]); + } + } +} + +void lmoveCommand(client *c) { + int wherefrom, whereto; + if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom) + != C_OK) return; + if (getListPositionFromObjectOrReply(c,c->argv[4],&whereto) + != C_OK) return; + lmoveGenericCommand(c, wherefrom, whereto); +} + /* This is the semantic of this command: * RPOPLPUSH srclist dstlist: * IF LLEN(srclist) > 0 @@ -662,60 +758,8 @@ void lremCommand(client *c) { * since the element is not just returned but pushed against another list * as well. This command was originally proposed by Ezra Zygmuntowicz. */ - -void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) { - /* Create the list if the key does not exist */ - if (!dstobj) { - dstobj = createQuicklistObject(); - quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size, - server.list_compress_depth); - dbAdd(c->db,dstkey,dstobj); - } - signalModifiedKey(c,c->db,dstkey); - listTypePush(dstobj,value,LIST_HEAD); - notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id); - /* Always send the pushed value to the client. */ - addReplyBulk(c,value); -} - void rpoplpushCommand(client *c) { - robj *sobj, *value; - if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp])) - == NULL || checkType(c,sobj,OBJ_LIST)) return; - - if (listTypeLength(sobj) == 0) { - /* This may only happen after loading very old RDB files. Recent - * versions of Redis delete keys of empty lists. */ - addReplyNull(c); - } else { - robj *dobj = lookupKeyWrite(c->db,c->argv[2]); - robj *touchedkey = c->argv[1]; - - if (checkType(c,dobj,OBJ_LIST)) return; - value = listTypePop(sobj,LIST_TAIL); - /* We saved touched key, and protect it, since rpoplpushHandlePush - * may change the client command argument vector (it does not - * currently). */ - incrRefCount(touchedkey); - rpoplpushHandlePush(c,c->argv[2],dobj,value); - - /* listTypePop returns an object with its refcount incremented */ - decrRefCount(value); - - /* Delete the source list when it is empty */ - notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id); - if (listTypeLength(sobj) == 0) { - dbDelete(c->db,touchedkey); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del", - touchedkey,c->db->id); - } - signalModifiedKey(c,c->db,touchedkey); - decrRefCount(touchedkey); - server.dirty++; - if (c->cmd->proc == brpoplpushCommand) { - rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]); - } - } + lmoveGenericCommand(c, LIST_TAIL, LIST_HEAD); } /*----------------------------------------------------------------------------- @@ -727,30 +771,34 @@ void rpoplpushCommand(client *c) { * in the context of the specified 'db', doing the following: * * 1) Provide the client with the 'value' element. - * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the - * 'value' element on the destination list (the LPUSH side of the command). - * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into + * 2) If the dstkey is not NULL (we are serving a BLMOVE) also push the + * 'value' element on the destination list (the "push" side of the command). + * 3) Propagate the resulting BRPOP, BLPOP and additional xPUSH if any into * the AOF and replication channel. * - * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the + * The argument 'wherefrom' is LIST_TAIL or LIST_HEAD, and indicates if the * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that * we can propagate the command properly. * + * The argument 'whereto' is LIST_TAIL or LIST_HEAD, and indicates if the + * 'value' element is to be pushed to the head or tail so that we can + * propagate the command properly. + * * The function returns C_OK if we are able to serve the client, otherwise * C_ERR is returned to signal the caller that the list POP operation * should be undone as the client was not served: This only happens for - * BRPOPLPUSH that fails to push the value to the destination key as it is + * BLMOVE that fails to push the value to the destination key as it is * of the wrong type. */ -int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where) +int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto) { - robj *argv[3]; + robj *argv[5]; if (dstkey == NULL) { /* Propagate the [LR]POP operation. */ - argv[0] = (where == LIST_HEAD) ? shared.lpop : - shared.rpop; + argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop : + shared.rpop; argv[1] = key; - propagate((where == LIST_HEAD) ? + propagate((wherefrom == LIST_HEAD) ? server.lpopCommand : server.rpopCommand, db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); @@ -760,30 +808,33 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb addReplyBulk(receiver,value); /* Notify event. */ - char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; + char *event = (wherefrom == LIST_HEAD) ? "lpop" : "rpop"; notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id); } else { - /* BRPOPLPUSH */ + /* BLMOVE */ robj *dstobj = lookupKeyWrite(receiver->db,dstkey); if (!(dstobj && checkType(receiver,dstobj,OBJ_LIST))) { - rpoplpushHandlePush(receiver,dstkey,dstobj, - value); - /* Propagate the RPOPLPUSH operation. */ - argv[0] = shared.rpoplpush; + lmoveHandlePush(receiver,dstkey,dstobj,value,whereto); + /* Propagate the LMOVE/RPOPLPUSH operation. */ + int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand); + argv[0] = isbrpoplpush ? shared.rpoplpush : shared.lmove; argv[1] = key; argv[2] = dstkey; - propagate(server.rpoplpushCommand, - db->id,argv,3, + argv[3] = getStringObjectFromListPosition(wherefrom); + argv[4] = getStringObjectFromListPosition(whereto); + propagate(isbrpoplpush ? server.rpoplpushCommand : server.lmoveCommand, + db->id,argv,(isbrpoplpush ? 3 : 5), PROPAGATE_AOF| PROPAGATE_REPL); - /* Notify event ("lpush" was notified by rpoplpushHandlePush). */ - notifyKeyspaceEvent(NOTIFY_LIST,"rpop",key,receiver->db->id); + /* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */ + notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop", + key,receiver->db->id); } else { - /* BRPOPLPUSH failed because of wrong + /* BLMOVE failed because of wrong * destination type. */ return C_ERR; } @@ -844,7 +895,8 @@ void blockingPopGenericCommand(client *c, int where) { } /* If the keys do not exist we must block */ - blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL); + struct listPos pos = {where}; + blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL); } void blpopCommand(client *c) { @@ -855,12 +907,7 @@ void brpopCommand(client *c) { blockingPopGenericCommand(c,LIST_TAIL); } -void brpoplpushCommand(client *c) { - mstime_t timeout; - - if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS) - != C_OK) return; - +void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeout) { robj *key = lookupKeyWrite(c->db, c->argv[1]); if (checkType(c,key,OBJ_LIST)) return; @@ -871,12 +918,32 @@ void brpoplpushCommand(client *c) { addReplyNull(c); } else { /* The list is empty and the client blocks. */ - blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL); + struct listPos pos = {wherefrom, whereto}; + blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],&pos,NULL); } } else { /* The list exists and has elements, so - * the regular rpoplpushCommand is executed. */ + * the regular lmoveCommand is executed. */ serverAssertWithInfo(c,key,listTypeLength(key) > 0); - rpoplpushCommand(c); + lmoveGenericCommand(c,wherefrom,whereto); } } + +void blmoveCommand(client *c) { + mstime_t timeout; + int wherefrom, whereto; + if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom) + != C_OK) return; + if (getListPositionFromObjectOrReply(c,c->argv[4],&whereto) + != C_OK) return; + if (getTimeoutFromObjectOrReply(c,c->argv[5],&timeout,UNIT_SECONDS) + != C_OK) return; + blmoveGenericCommand(c,wherefrom,whereto,timeout); +} + +void brpoplpushCommand(client *c) { + mstime_t timeout; + if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS) + != C_OK) return; + blmoveGenericCommand(c, LIST_TAIL, LIST_HEAD, timeout); +} diff --git a/src/t_stream.c b/src/t_stream.c index 285684987..6eb6363fc 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1628,7 +1628,7 @@ void xreadCommand(client *c) { goto cleanup; } blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, - timeout, NULL, ids); + timeout, NULL, NULL, ids); /* If no COUNT is given and we block, set a relatively small count: * in case the ID provided is too low, we do not want the server to * block just to serve this client a huge stream of messages. */ diff --git a/src/t_zset.c b/src/t_zset.c index 9fddf37d1..5fc7643c3 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3364,7 +3364,7 @@ void blockingGenericZpopCommand(client *c, int where) { } /* If the keys do not exist we must block */ - blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL); + blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL,NULL); } // BZPOPMIN key [key ...] timeout diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index d2c8b8e56..06414dd8a 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -78,6 +78,7 @@ start_server {tags {"repl"}} { } test {BRPOPLPUSH replication, when blocking against empty list} { + $A config resetstat set rd [redis_deferring_client] $rd brpoplpush a b 5 r lpush a foo @@ -86,9 +87,12 @@ start_server {tags {"repl"}} { } else { fail "Master and replica have different digest: [$A debug digest] VS [$B debug digest]" } + assert_match {*calls=1,*} [cmdrstat rpoplpush $A] + assert_match {} [cmdrstat lmove $A] } test {BRPOPLPUSH replication, list exists} { + $A config resetstat set rd [redis_deferring_client] r lpush c 1 r lpush c 2 @@ -96,6 +100,39 @@ start_server {tags {"repl"}} { $rd brpoplpush c d 5 after 1000 assert_equal [$A debug digest] [$B debug digest] + assert_match {*calls=1,*} [cmdrstat rpoplpush $A] + assert_match {} [cmdrstat lmove $A] + } + + foreach wherefrom {left right} { + foreach whereto {left right} { + test "BLMOVE ($wherefrom, $whereto) replication, when blocking against empty list" { + $A config resetstat + set rd [redis_deferring_client] + $rd blmove a b $wherefrom $whereto 5 + r lpush a foo + wait_for_condition 50 100 { + [$A debug digest] eq [$B debug digest] + } else { + fail "Master and replica have different digest: [$A debug digest] VS [$B debug digest]" + } + assert_match {*calls=1,*} [cmdrstat lmove $A] + assert_match {} [cmdrstat rpoplpush $A] + } + + test "BLMOVE ($wherefrom, $whereto) replication, list exists" { + $A config resetstat + set rd [redis_deferring_client] + r lpush c 1 + r lpush c 2 + r lpush c 3 + $rd blmove c d $wherefrom $whereto 5 + after 1000 + assert_equal [$A debug digest] [$B debug digest] + assert_match {*calls=1,*} [cmdrstat lmove $A] + assert_match {} [cmdrstat rpoplpush $A] + } + } } test {BLPOP followed by role change, issue #2473} { diff --git a/tests/support/util.tcl b/tests/support/util.tcl index b9a65358f..bc1394ce4 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -519,3 +519,9 @@ proc get_child_pid {idx} { return $child_pid } + +proc cmdrstat {cmd r} { + if {[regexp "\r\ncmdstat_$cmd:(.*?)\r\n" [$r info commandstats] _ value]} { + set _ $value + } +} diff --git a/tests/unit/introspection-2.tcl b/tests/unit/introspection-2.tcl index 55f656956..0968c2933 100644 --- a/tests/unit/introspection-2.tcl +++ b/tests/unit/introspection-2.tcl @@ -1,7 +1,5 @@ proc cmdstat {cmd} { - if {[regexp "\r\ncmdstat_$cmd:(.*?)\r\n" [r info commandstats] _ value]} { - set _ $value - } + return [cmdrstat $cmd r] } start_server {tags {"introspection"}} { diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 95f18b086..600a7ce70 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -194,6 +194,7 @@ start_server { test "BRPOPLPUSH - $type" { r del target + r rpush target bar set rd [redis_deferring_client] create_list blist "a b $large c d" @@ -201,9 +202,38 @@ start_server { $rd brpoplpush blist target 1 assert_equal d [$rd read] - assert_equal d [r rpop target] + assert_equal d [r lpop target] assert_equal "a b $large c" [r lrange blist 0 -1] } + + foreach wherefrom {left right} { + foreach whereto {left right} { + test "BLMOVE $wherefrom $whereto - $type" { + r del target + r rpush target bar + + set rd [redis_deferring_client] + create_list blist "a b $large c d" + + $rd blmove blist target $wherefrom $whereto 1 + set poppedelement [$rd read] + + if {$wherefrom eq "right"} { + assert_equal d $poppedelement + assert_equal "a b $large c" [r lrange blist 0 -1] + } else { + assert_equal a $poppedelement + assert_equal "b $large c d" [r lrange blist 0 -1] + } + + if {$whereto eq "right"} { + assert_equal $poppedelement [r rpop target] + } else { + assert_equal $poppedelement [r lpop target] + } + } + } + } } test "BLPOP, LPUSH + DEL should not awake blocked client" { @@ -285,24 +315,60 @@ start_server { test "BRPOPLPUSH with zero timeout should block indefinitely" { set rd [redis_deferring_client] r del blist target + r rpush target bar $rd brpoplpush blist target 0 - after 1000 + wait_for_condition 100 10 { + [s blocked_clients] == 1 + } else { + fail "Timeout waiting for blocked clients" + } r rpush blist foo assert_equal foo [$rd read] - assert_equal {foo} [r lrange target 0 -1] + assert_equal {foo bar} [r lrange target 0 -1] } - test "BRPOPLPUSH with a client BLPOPing the target list" { - set rd [redis_deferring_client] - set rd2 [redis_deferring_client] - r del blist target - $rd2 blpop target 0 - $rd brpoplpush blist target 0 - after 1000 - r rpush blist foo - assert_equal foo [$rd read] - assert_equal {target foo} [$rd2 read] - assert_equal 0 [r exists target] + foreach wherefrom {left right} { + foreach whereto {left right} { + test "BLMOVE $wherefrom $whereto with zero timeout should block indefinitely" { + set rd [redis_deferring_client] + r del blist target + r rpush target bar + $rd blmove blist target $wherefrom $whereto 0 + wait_for_condition 100 10 { + [s blocked_clients] == 1 + } else { + fail "Timeout waiting for blocked clients" + } + r rpush blist foo + assert_equal foo [$rd read] + if {$whereto eq "right"} { + assert_equal {bar foo} [r lrange target 0 -1] + } else { + assert_equal {foo bar} [r lrange target 0 -1] + } + } + } + } + + foreach wherefrom {left right} { + foreach whereto {left right} { + test "BLMOVE ($wherefrom, $whereto) with a client BLPOPing the target list" { + set rd [redis_deferring_client] + set rd2 [redis_deferring_client] + r del blist target + $rd2 blpop target 0 + $rd blmove blist target $wherefrom $whereto 0 + wait_for_condition 100 10 { + [s blocked_clients] == 2 + } else { + fail "Timeout waiting for blocked clients" + } + r rpush blist foo + assert_equal foo [$rd read] + assert_equal {target foo} [$rd2 read] + assert_equal 0 [r exists target] + } + } } test "BRPOPLPUSH with wrong source type" { @@ -325,7 +391,11 @@ start_server { r del blist target r set target nolist $rd brpoplpush blist target 0 - after 1000 + wait_for_condition 100 10 { + [s blocked_clients] == 1 + } else { + fail "Timeout waiting for blocked clients" + } r rpush blist foo assert_error "WRONGTYPE*" {$rd read} assert_equal {foo} [r lrange blist 0 -1] @@ -355,14 +425,14 @@ start_server { assert_equal {foo} [r lrange target2 0 -1] } - test "Linked BRPOPLPUSH" { + test "Linked LMOVEs" { set rd1 [redis_deferring_client] set rd2 [redis_deferring_client] r del list1 list2 list3 - $rd1 brpoplpush list1 list2 0 - $rd2 brpoplpush list2 list3 0 + $rd1 blmove list1 list2 right left 0 + $rd2 blmove list2 list3 left right 0 r rpush list1 foo @@ -451,7 +521,16 @@ start_server { set rd [redis_deferring_client] $rd brpoplpush foo_list bar_list 1 - after 2000 + wait_for_condition 100 10 { + [s blocked_clients] == 1 + } else { + fail "Timeout waiting for blocked client" + } + wait_for_condition 500 10 { + [s blocked_clients] == 0 + } else { + fail "Timeout waiting for client to unblock" + } $rd read } {} @@ -676,6 +755,29 @@ start_server { assert_encoding quicklist mylist2 } + foreach wherefrom {left right} { + foreach whereto {left right} { + test "LMOVE $wherefrom $whereto base case - $type" { + r del mylist1 mylist2 + + if {$wherefrom eq "right"} { + create_list mylist1 "c d $large a" + } else { + create_list mylist1 "a $large c d" + } + assert_equal a [r lmove mylist1 mylist2 $wherefrom $whereto] + assert_equal $large [r lmove mylist1 mylist2 $wherefrom $whereto] + assert_equal "c d" [r lrange mylist1 0 -1] + if {$whereto eq "right"} { + assert_equal "a $large" [r lrange mylist2 0 -1] + } else { + assert_equal "$large a" [r lrange mylist2 0 -1] + } + assert_encoding quicklist mylist2 + } + } + } + test "RPOPLPUSH with the same list as src and dst - $type" { create_list mylist "a $large c" assert_equal "a $large c" [r lrange mylist 0 -1] @@ -683,6 +785,26 @@ start_server { assert_equal "c a $large" [r lrange mylist 0 -1] } + foreach wherefrom {left right} { + foreach whereto {left right} { + test "LMOVE $wherefrom $whereto with the same list as src and dst - $type" { + if {$wherefrom eq "right"} { + create_list mylist "a $large c" + assert_equal "a $large c" [r lrange mylist 0 -1] + } else { + create_list mylist "c a $large" + assert_equal "c a $large" [r lrange mylist 0 -1] + } + assert_equal c [r lmove mylist mylist $wherefrom $whereto] + if {$whereto eq "right"} { + assert_equal "a $large c" [r lrange mylist 0 -1] + } else { + assert_equal "c a $large" [r lrange mylist 0 -1] + } + } + } + } + foreach {othertype otherlarge} [array get largevalue] { test "RPOPLPUSH with $type source and existing target $othertype" { create_list srclist "a b c $large" @@ -698,6 +820,35 @@ start_server { assert_encoding quicklist dstlist } } + + foreach wherefrom {left right} { + foreach whereto {left right} { + test "LMOVE $wherefrom $whereto with $type source and existing target $othertype" { + create_list dstlist "$otherlarge" + + if {$wherefrom eq "right"} { + create_list srclist "a b c $large" + } else { + create_list srclist "$large c a b" + } + assert_equal $large [r lmove srclist dstlist $wherefrom $whereto] + assert_equal c [r lmove srclist dstlist $wherefrom $whereto] + assert_equal "a b" [r lrange srclist 0 -1] + + if {$whereto eq "right"} { + assert_equal "$otherlarge $large c" [r lrange dstlist 0 -1] + } else { + assert_equal "c $large $otherlarge" [r lrange dstlist 0 -1] + } + + # When we lmoved a large value, dstlist should be + # converted to the same encoding as srclist. + if {$type eq "linkedlist"} { + assert_encoding quicklist dstlist + } + } + } + } } }