diff --git a/src/redis.c b/src/redis.c index 1f5725b9c..12ce0f4d9 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1067,6 +1067,7 @@ void createSharedObjects(void) { shared.del = createStringObject("DEL",3); shared.rpop = createStringObject("RPOP",4); shared.lpop = createStringObject("LPOP",4); + shared.lpush = createStringObject("LPUSH",5); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; @@ -1187,6 +1188,8 @@ void initServerConfig() { server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); server.lpushCommand = lookupCommandByCString("lpush"); + server.lpopCommand = lookupCommandByCString("lpop"); + server.rpopCommand = lookupCommandByCString("rpop"); /* Slow log */ server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN; @@ -1262,6 +1265,7 @@ void initServer() { server.slaves = listCreate(); server.monitors = listCreate(); server.unblocked_clients = listCreate(); + server.ready_keys = listCreate(); createSharedObjects(); adjustOpenFilesLimit(); @@ -1292,6 +1296,7 @@ void initServer() { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + server.db[j].ready_keys = dictCreate(&setDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; } @@ -1682,6 +1687,8 @@ int processCommand(redisClient *c) { addReply(c,shared.queued); } else { call(c,REDIS_CALL_FULL); + if (listLength(server.ready_keys)) + handleClientsBlockedOnLists(); } return REDIS_OK; } diff --git a/src/redis.h b/src/redis.h index ec2bde221..24efef57b 100644 --- a/src/redis.h +++ b/src/redis.h @@ -300,6 +300,7 @@ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ + dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -326,6 +327,22 @@ typedef struct blockingState { * for BRPOPLPUSH. */ } blockingState; +/* The following structure represents a node in the server.ready_keys list, + * where we accumulate all the keys that had clients blocked with a blocking + * operation such as B[LR]POP, but received new data in the context of the + * last executed command. + * + * After the execution of every command or script, we run this list to check + * if as a result we should serve data to clients blocked, unblocking them. + * Note that server.ready_keys will not have duplicates as there dictionary + * also called ready_keys in every structure representing a Redis database, + * where we make sure to remember if a given key was already added in the + * server.ready_keys list. */ +typedef struct readyList { + redisDb *db; + robj *key; +} readyList; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { @@ -380,6 +397,7 @@ struct sharedObjectsStruct { *masterdownerr, *roslaveerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop, + *lpush, *select[REDIS_SHARED_SELECT_CMDS], *integers[REDIS_SHARED_INTEGERS], *mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -604,7 +622,8 @@ struct redisServer { off_t loading_loaded_bytes; time_t loading_start_time; /* Fast pointers to often looked up command */ - struct redisCommand *delCommand, *multiCommand, *lpushCommand; + struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, + *rpopCommand; /* Fields used only for stats */ time_t stat_starttime; /* Server start time */ long long stat_numcommands; /* Number of processed commands */ @@ -703,6 +722,7 @@ struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */ list *unblocked_clients; /* list of clients to unblock before next loop */ + list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -917,7 +937,7 @@ int listTypeEqual(listTypeEntry *entry, robj *o); void listTypeDelete(listTypeEntry *entry); void listTypeConvert(robj *subject, int enc); void unblockClientWaitingData(redisClient *c); -int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele); +void handleClientsBlockedOnLists(void); void popGenericCommand(redisClient *c, int where); /* MULTI/EXEC/WATCH... */ diff --git a/src/t_list.c b/src/t_list.c index ca03916b9..77e40eb6b 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -1,5 +1,7 @@ #include "redis.h" +void signalListAsReady(redisClient *c, robj *key); + /*----------------------------------------------------------------------------- * List API *----------------------------------------------------------------------------*/ @@ -14,6 +16,11 @@ void listTypeTryConversion(robj *subject, robj *value) { listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); } +/* The function pushes an elmenet to the specified list object 'subject', + * at head or tail position as specified by 'where'. + * + * There is no need for the caller to incremnet the refcount of 'value' as + * the function takes care of it if needed. */ void listTypePush(robj *subject, robj *value, int where) { /* Check if we need to convert the ziplist */ listTypeTryConversion(subject,value); @@ -268,16 +275,10 @@ void pushGenericCommand(redisClient *c, int where) { return; } + if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]); + for (j = 2; j < c->argc; j++) { c->argv[j] = tryObjectEncoding(c->argv[j]); - if (may_have_waiting_clients) { - if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) { - waiting++; - continue; - } else { - may_have_waiting_clients = 0; - } - } if (!lobj) { lobj = createZiplistObject(); dbAdd(c->db,c->argv[1],lobj); @@ -288,18 +289,6 @@ void pushGenericCommand(redisClient *c, int where) { addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0)); if (pushed) signalModifiedKey(c->db,c->argv[1]); server.dirty += pushed; - - /* Alter the replication of the command accordingly to the number of - * list elements delivered to clients waiting into a blocking operation. - * We do that only if there were waiting clients, and only if still some - * element was pushed into the list (othewise dirty is 0 and nothign will - * be propagated). */ - if (waiting && pushed) { - /* CMD KEY a b C D E */ - for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]); - memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed); - c->argc -= waiting; - } } void lpushCommand(redisClient *c) { @@ -666,29 +655,15 @@ void lremCommand(redisClient *c) { * as well. This command was originally proposed by Ezra Zygmuntowicz. */ -void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) { - if (!handleClientsWaitingListPush(origclient,dstkey,value)) { - /* Create the list if the key does not exist */ - if (!dstobj) { - dstobj = createZiplistObject(); - dbAdd(c->db,dstkey,dstobj); - } else { - signalModifiedKey(c->db,dstkey); - } - listTypePush(dstobj,value,REDIS_HEAD); - /* Additionally propagate this PUSH operation together with - * the operation performed by the command. */ - { - robj **argv = zmalloc(sizeof(robj*)*3); - argv[0] = createStringObject("LPUSH",5); - argv[1] = dstkey; - argv[2] = value; - incrRefCount(argv[1]); - incrRefCount(argv[2]); - alsoPropagate(server.lpushCommand,c->db->id,argv,3, - REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); - } +void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) { + /* Create the list if the key does not exist */ + if (!dstobj) { + dstobj = createZiplistObject(); + dbAdd(c->db,dstkey,dstobj); + signalListAsReady(c,dstkey); } + signalModifiedKey(c->db,dstkey); + listTypePush(dstobj,value,REDIS_HEAD); /* Always send the pushed value to the client. */ addReplyBulk(c,value); } @@ -709,9 +684,10 @@ void rpoplpushCommand(redisClient *c) { if (dobj && checkType(c,dobj,REDIS_LIST)) return; value = listTypePop(sobj,REDIS_TAIL); /* We saved touched key, and protect it, since rpoplpushHandlePush - * may change the client command argument vector. */ + * may change the client command argument vector (it does not + * currently). */ incrRefCount(touchedkey); - rpoplpushHandlePush(c,c,c->argv[2],dobj,value); + rpoplpushHandlePush(c,c->argv[2],dobj,value); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); @@ -721,13 +697,6 @@ void rpoplpushCommand(redisClient *c) { signalModifiedKey(c->db,touchedkey); decrRefCount(touchedkey); server.dirty++; - - /* Replicate this as a simple RPOP since the LPUSH side is replicated - * by rpoplpushHandlePush() call if needed (it may not be needed - * if a client is blocking wait a push against the list). */ - rewriteClientCommandVector(c,2, - resetRefCount(createStringObject("RPOP",4)), - c->argv[1]); } } @@ -735,20 +704,10 @@ void rpoplpushCommand(redisClient *c) { * Blocking POP operations *----------------------------------------------------------------------------*/ -/* Currently Redis blocking operations support is limited to list POP ops, - * so the current implementation is not fully generic, but it is also not - * completely specific so it will not require a rewrite to support new - * kind of blocking operations in the future. - * - * Still it's important to note that list blocking operations can be already - * used as a notification mechanism in order to implement other blocking - * operations at application level, so there must be a very strong evidence - * of usefulness and generality before new blocking operations are implemented. - * - * This is how the current blocking POP works, we use BLPOP as example: +/* This is how the current blocking POP works, we use BLPOP as example: * - If the user calls BLPOP and the key exists and contains a non empty list * then LPOP is called instead. So BLPOP is semantically the same as LPOP - * if there is not to block. + * if blocking is not required. * - If instead BLPOP is called and the key does not exists or the list is * empty we need to block. In order to do so we remove the notification for * new data to read in the client socket (so that we'll not serve new @@ -756,12 +715,10 @@ void rpoplpushCommand(redisClient *c) { * in a dictionary (db->blocking_keys) mapping keys to a list of clients * blocking for this keys. * - If a PUSH operation against a key with blocked clients waiting is - * performed, we serve the first in the list: basically instead to push - * the new element inside the list we return it to the (first / oldest) - * blocking client, unblock the client, and remove it form the list. - * - * The above comment and the source code should be enough in order to understand - * the implementation and modify / fix it later. + * performed, we mark this key as "ready", and after the current command, + * MULTI/EXEC block, or script, is executed, we serve all the clients waiting + * for this list, from the one that blocked first, to the last, accordingly + * to the number of elements we have in the ready list. */ /* Set a client in blocking mode for the specified key, with the specified @@ -836,68 +793,192 @@ void unblockClientWaitingData(redisClient *c) { listAddNodeTail(server.unblocked_clients,c); } -/* This should be called from any function PUSHing into lists. - * 'c' is the "pushing client", 'key' is the key it is pushing data against, - * 'ele' is the element pushed. +/* If the specified key has clients blocked waiting for list pushes, this + * function will put the key reference into the server.ready_keys list. + * Note that db->ready_keys is an hash table that allows us to avoid putting + * the same key agains and again in the list in case of multiple pushes + * made by a script or in the context of MULTI/EXEC. * - * If the function returns 0 there was no client waiting for a list push - * against this key. + * The list will be finally processed by handleClientsBlockedOnLists() */ +void signalListAsReady(redisClient *c, robj *key) { + readyList *rl; + + /* No clients blocking for this key? No need to queue it. */ + if (dictFind(c->db->blocking_keys,key) == NULL) return; + + /* Key was already signaled? No need to queue it again. */ + if (dictFind(c->db->ready_keys,key) != NULL) return; + + /* Ok, we need to queue this key into server.ready_keys. */ + rl = zmalloc(sizeof(*rl)); + rl->key = key; + rl->db = c->db; + incrRefCount(key); + listAddNodeTail(server.ready_keys,rl); + + /* We also add the key in the db->ready_keys dictionary in order + * to avoid adding it multiple times into a list with a simple O(1) + * check. */ + incrRefCount(key); + redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK); +} + +/* This is an helper function for handleClientsBlockedOnLists(). It's work + * is to serve a specific client (receiver) that is blocked on 'key' + * in the context of the specified 'db', doing the following: * - * If the function returns 1 there was a client waiting for a list push - * against this key, the element was passed to this client thus it's not - * needed to actually add it to the list and the caller should return asap. */ -int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { - struct dictEntry *de; - redisClient *receiver; - int numclients; - list *clients; - listNode *ln; - robj *dstkey, *dstobj; + * 1) Provide the client with the 'value' element. + * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the + * 'value' element on the destionation list (the LPUSH side of the command). + * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into + * the AOF and replication channel. + * + * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the + * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that + * we can propagate the command properly. + * + * The function returns REDIS_OK if we are able to serve the client, otherwise + * REDIS_ERR is returned to signal the caller that the list POP operation + * should be undoed as the client was not served: This only happens for + * BRPOPLPUSH that fails to push the value to the destination key as it is + * of the wrong type. */ +int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where) +{ + robj *argv[3]; - de = dictFind(c->db->blocking_keys,key); - if (de == NULL) return 0; - clients = dictGetVal(de); - numclients = listLength(clients); + if (dstkey == NULL) { + /* Propagate the [LR]POP operation. */ + argv[0] = (where == REDIS_HEAD) ? shared.lpop : + shared.rpop; + argv[1] = key; + propagate((where == REDIS_HEAD) ? + server.lpopCommand : server.rpopCommand, + db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); - /* Try to handle the push as long as there are clients waiting for a push. - * Note that "numclients" is used because the list of clients waiting for a - * push on "key" is deleted by unblockClient() when empty. - * - * This loop will have more than 1 iteration when there is a BRPOPLPUSH - * that cannot push the target list because it does not contain a list. If - * this happens, it simply tries the next client waiting for a push. */ - while (numclients--) { - ln = listFirst(clients); - redisAssertWithInfo(c,key,ln != NULL); - receiver = ln->value; - dstkey = receiver->bpop.target; - - /* Protect receiver->bpop.target, that will be freed by - * the next unblockClientWaitingData() call. */ - if (dstkey) incrRefCount(dstkey); - - /* This should remove the first element of the "clients" list. */ - unblockClientWaitingData(receiver); - - if (dstkey == NULL) { - /* BRPOP/BLPOP */ - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); - return 1; /* Serve just the first client as in B[RL]POP semantics */ + /* BRPOP/BLPOP */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,value); + } else { + /* BRPOPLPUSH */ + robj *dstobj = + lookupKeyWrite(receiver->db,dstkey); + if (!(dstobj && + checkType(receiver,dstobj,REDIS_LIST))) + { + /* Propagate the RPOP operation. */ + argv[0] = shared.rpop; + argv[1] = key; + propagate(server.rpopCommand, + db->id,argv,2, + REDIS_PROPAGATE_AOF| + REDIS_PROPAGATE_REPL); + rpoplpushHandlePush(receiver,dstkey,dstobj, + value); + /* Propagate the LPUSH operation. */ + argv[0] = shared.lpush; + argv[1] = dstkey; + argv[2] = value; + propagate(server.lpushCommand, + db->id,argv,3, + REDIS_PROPAGATE_AOF| + REDIS_PROPAGATE_REPL); } else { - /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */ - dstobj = lookupKeyWrite(receiver->db,dstkey); - if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) { - rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele); - decrRefCount(dstkey); - return 1; - } - decrRefCount(dstkey); + /* BRPOPLPUSH failed because of wrong + * destination type. */ + return REDIS_ERR; } } + return REDIS_OK; +} - return 0; +/* This function should be called by Redis every time a single command, + * a MULTI/EXEC block, or a Lua script, terminated its execution after + * being called by a client. + * + * All the keys with at least one client blocked that received at least + * one new element via some PUSH operation are accumulated into + * the server.ready_keys list. This function will run the list and will + * serve clients accordingly. Note that the function will iterate again and + * again as a result of serving BRPOPLPUSH we can have new blocking clients + * to serve because of the PUSH side of BRPOPLPUSH. */ +void handleClientsBlockedOnLists(void) { + while(listLength(server.ready_keys) != 0) { + list *l; + + /* Point server.ready_keys to a fresh list and save the current one + * locally. This way as we run the old list we are free to call + * signalListAsReady() that may push new elements in server.ready_keys + * when handling clients blocked into BRPOPLPUSH. */ + l = server.ready_keys; + server.ready_keys = listCreate(); + + while(listLength(l) != 0) { + listNode *ln = listFirst(l); + readyList *rl = ln->value; + + /* First of all remove this key from db->ready_keys so that + * we can safely call signalListAsReady() against this key. */ + dictDelete(rl->db->ready_keys,rl->key); + + /* If the key exists and it's a list, serve blocked clients + * with data. */ + robj *o = lookupKeyWrite(rl->db,rl->key); + if (o != NULL && o->type == REDIS_LIST) { + dictEntry *de; + + /* We serve clients in the same order they blocked for + * this key, from the first blocked to the last. */ + de = dictFind(rl->db->blocking_keys,rl->key); + if (de) { + list *clients = dictGetVal(de); + int numclients = listLength(clients); + + while(numclients--) { + listNode *clientnode = listFirst(clients); + redisClient *receiver = clientnode->value; + robj *dstkey = receiver->bpop.target; + int where = (receiver->lastcmd && + receiver->lastcmd->proc == blpopCommand) ? + REDIS_HEAD : REDIS_TAIL; + robj *value = listTypePop(o,where); + + if (value) { + /* Protect receiver->bpop.target, that will be + * freed by the next unblockClientWaitingData() + * call. */ + if (dstkey) incrRefCount(dstkey); + unblockClientWaitingData(receiver); + + if (serveClientBlockedOnList(receiver, + rl->key,dstkey,rl->db,value, + where) == REDIS_ERR) + { + /* If we failed serving the client we need + * to also undo the POP operation. */ + listTypePush(o,value,where); + } + + if (dstkey) decrRefCount(dstkey); + decrRefCount(value); + } else { + break; + } + } + } + + if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key); + /* We don't call signalModifiedKey() as it was already called + * when an element was pushed on the list. */ + } + + /* Free this item. */ + decrRefCount(rl->key); + zfree(rl); + listDelNode(l,ln); + } + listRelease(l); /* We have the new list on place at this point. */ + } } int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { @@ -986,7 +1067,6 @@ void brpoplpushCommand(redisClient *c) { if (key == NULL) { if (c->flags & REDIS_MULTI) { - /* Blocking against an empty list in a multi state * returns immediately. */ addReply(c, shared.nullbulk); @@ -998,7 +1078,6 @@ void brpoplpushCommand(redisClient *c) { if (key->type != REDIS_LIST) { addReply(c, shared.wrongtypeerr); } else { - /* The list exists and has elements, so * the regular rpoplpushCommand is executed. */ redisAssertWithInfo(c,key,listTypeLength(key) > 0); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 18e639d41..da94b0880 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -61,9 +61,13 @@ start_server {tags {"repl"}} { test {SET on the master should immediately propagate} { r -1 set mykey bar - if {$::valgrind} {after 2000} - r 0 get mykey - } {bar} + + wait_for_condition 500 100 { + [r 0 get mykey] eq {bar} + } else { + fail "SET on master did not propagated on slave" + } + } test {FLUSHALL should replicate} { r -1 flushall diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index 057918716..6dbdb6b63 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -344,5 +344,22 @@ start_server {tags {"scripting repl"}} { fail "Expected 2 in x, but value is '[r -1 get x]'" } } + + test {Replication of script multiple pushes to list with BLPOP} { + set rd [redis_deferring_client] + $rd brpop a 0 + r eval { + redis.call("lpush","a","1"); + redis.call("lpush","a","2"); + } 0 + set res [$rd read] + $rd close + wait_for_condition 50 100 { + [r -1 lrange a 0 -1] eq [r lrange a 0 -1] + } else { + fail "Expected list 'a' in slave and master to be the same, but they are respectively '[r -1 lrange a 0 -1]' and '[r lrange a 0 -1]'" + } + set res + } {a 1} } } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 85dde5690..8f598a4ab 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -161,6 +161,47 @@ start_server { } } + test "BLPOP, LPUSH + DEL should not awake blocked client" { + set rd [redis_deferring_client] + r del list + + $rd blpop list 0 + r multi + r lpush list a + r del list + r exec + r del list + r lpush list b + $rd read + } {list b} + + test "BLPOP, LPUSH + DEL + SET should not awake blocked client" { + set rd [redis_deferring_client] + r del list + + $rd blpop list 0 + r multi + r lpush list a + r del list + r set list foo + r exec + r del list + r lpush list b + $rd read + } {list b} + + test "MULTI/EXEC is isolated from the point of view of BLPOP" { + set rd [redis_deferring_client] + r del list + $rd blpop list 0 + r multi + r lpush list a + r lpush list b + r lpush list c + r exec + $rd read + } {list c} + test "BLPOP with variadic LPUSH" { set rd [redis_deferring_client] r del blist target @@ -169,8 +210,8 @@ start_server { if {$::valgrind} {after 100} assert_equal 2 [r lpush blist foo bar] if {$::valgrind} {after 100} - assert_equal {blist foo} [$rd read] - assert_equal bar [lindex [r lrange blist 0 -1] 0] + assert_equal {blist bar} [$rd read] + assert_equal foo [lindex [r lrange blist 0 -1] 0] } test "BRPOPLPUSH with zero timeout should block indefinitely" { @@ -222,6 +263,16 @@ start_server { assert_equal {foo} [r lrange blist 0 -1] } + test "BRPOPLPUSH maintains order of elements after failure" { + set rd [redis_deferring_client] + r del blist target + r set target nolist + $rd brpoplpush blist target 0 + r rpush blist a b c + assert_error "ERR*wrong kind*" {$rd read} + r lrange blist 0 -1 + } {a b c} + test "BRPOPLPUSH with multiple blocked clients" { set rd1 [redis_deferring_client] set rd2 [redis_deferring_client] @@ -293,6 +344,41 @@ start_server { r exec } {foo bar {} {} {bar foo}} + test "PUSH resulting from BRPOPLPUSH affect WATCH" { + set blocked_client [redis_deferring_client] + set watching_client [redis_deferring_client] + r del srclist dstlist somekey + r set somekey somevalue + $blocked_client brpoplpush srclist dstlist 0 + $watching_client watch dstlist + $watching_client read + $watching_client multi + $watching_client read + $watching_client get somekey + $watching_client read + r lpush srclist element + $watching_client exec + $watching_client read + } {} + + test "BRPOPLPUSH does not affect WATCH while still blocked" { + set blocked_client [redis_deferring_client] + set watching_client [redis_deferring_client] + r del srclist dstlist somekey + r set somekey somevalue + $blocked_client brpoplpush srclist dstlist 0 + $watching_client watch dstlist + $watching_client read + $watching_client multi + $watching_client read + $watching_client get somekey + $watching_client read + $watching_client exec + # Blocked BLPOPLPUSH may create problems, unblock it. + r lpush srclist element + $watching_client read + } {somevalue} + test {BRPOPLPUSH timeout} { set rd [redis_deferring_client]