Fix the missing server.dirty increment and redundant signalModifiedKey in serveClientBlockedOnList (#11326)
Mainly fix two minor bug 1. When handle BL*POP/BLMOVE commands with blocked clients, we should increment server.dirty. 2. `listPopRangeAndReplyWithKey()` in `serveClientBlockedOnList()` should not repeat calling `signalModifiedKey()` which has been called when an element was pushed on the list. (was skipped in all bpop commands, other than blmpop) Other optimization add `signal` param for `listElementsRemoved` to skip `signalModifiedKey()` to unify all pop operation. Unifying all pop operations also prepares us for #11303, so that we can avoid having to deal with the conversion from quicklist to listpack() in various places when the list shrinks.
This commit is contained in:
parent
3330ea1864
commit
f106beebfa
@ -2583,7 +2583,7 @@ robj *listTypeDup(robj *o);
|
||||
int listTypeDelRange(robj *o, long start, long stop);
|
||||
void unblockClientWaitingData(client *c);
|
||||
void popGenericCommand(client *c, int where);
|
||||
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted);
|
||||
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int signal, int *deleted);
|
||||
|
||||
/* MULTI/EXEC/WATCH... */
|
||||
void unwatchAllKeys(client *c);
|
||||
|
57
src/t_list.c
57
src/t_list.c
@ -399,7 +399,7 @@ void lsetCommand(client *c) {
|
||||
*
|
||||
* 'deleted' is an optional output argument to get an indication
|
||||
* if the key got deleted by this function. */
|
||||
void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long count, int *deleted) {
|
||||
void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long count, int signal, int *deleted) {
|
||||
long llen = listTypeLength(o);
|
||||
long rangelen = (count > llen) ? llen : count;
|
||||
long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
|
||||
@ -414,7 +414,7 @@ void listPopRangeAndReplyWithKey(client *c, robj *o, robj *key, int where, long
|
||||
/* Pop these elements. */
|
||||
listTypeDelRange(o, rangestart, rangelen);
|
||||
/* Maintain the notifications and dirty. */
|
||||
listElementsRemoved(c, key, where, o, rangelen, deleted);
|
||||
listElementsRemoved(c, key, where, o, rangelen, signal, deleted);
|
||||
}
|
||||
|
||||
/* A helper for replying with a list's range between the inclusive start and end
|
||||
@ -463,10 +463,12 @@ void addListRangeReply(client *c, robj *o, long start, long end, int reverse) {
|
||||
}
|
||||
|
||||
/* A housekeeping helper for list elements popping tasks.
|
||||
*
|
||||
* If 'signal' is 0, skip calling signalModifiedKey().
|
||||
*
|
||||
* 'deleted' is an optional output argument to get an indication
|
||||
* if the key got deleted by this function. */
|
||||
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int *deleted) {
|
||||
void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, int signal, int *deleted) {
|
||||
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
||||
|
||||
notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id);
|
||||
@ -478,7 +480,7 @@ void listElementsRemoved(client *c, robj *key, int where, robj *o, long count, i
|
||||
} else {
|
||||
if (deleted) *deleted = 0;
|
||||
}
|
||||
signalModifiedKey(c, c->db, key);
|
||||
if (signal) signalModifiedKey(c, c->db, key);
|
||||
server.dirty += count;
|
||||
}
|
||||
|
||||
@ -517,7 +519,7 @@ void popGenericCommand(client *c, int where) {
|
||||
serverAssert(value != NULL);
|
||||
addReplyBulk(c,value);
|
||||
decrRefCount(value);
|
||||
listElementsRemoved(c,c->argv[1],where,o,1,NULL);
|
||||
listElementsRemoved(c,c->argv[1],where,o,1,1,NULL);
|
||||
} else {
|
||||
/* Pop a range of elements. An addition to the original POP command,
|
||||
* which replies with a multi-bulk. */
|
||||
@ -529,7 +531,7 @@ void popGenericCommand(client *c, int where) {
|
||||
|
||||
addListRangeReply(c,o,rangestart,rangeend,reverse);
|
||||
listTypeDelRange(o,rangestart,rangelen);
|
||||
listElementsRemoved(c,c->argv[1],where,o,rangelen,NULL);
|
||||
listElementsRemoved(c,c->argv[1],where,o,rangelen,1,NULL);
|
||||
}
|
||||
}
|
||||
|
||||
@ -559,7 +561,7 @@ void mpopGenericCommand(client *c, robj **keys, int numkeys, int where, long cou
|
||||
if (llen == 0) continue;
|
||||
|
||||
/* Pop a range of elements in a nested arrays way. */
|
||||
listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);
|
||||
listPopRangeAndReplyWithKey(c, o, key, where, count, 1, NULL);
|
||||
|
||||
/* Replicate it as [LR]POP COUNT. */
|
||||
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
|
||||
@ -859,22 +861,11 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) {
|
||||
value = listTypePop(sobj,wherefrom);
|
||||
serverAssert(value); /* assertion for valgrind (avoid NPD) */
|
||||
lmoveHandlePush(c,c->argv[2],dobj,value,whereto);
|
||||
listElementsRemoved(c,touchedkey,wherefrom,sobj,1,1,NULL);
|
||||
|
||||
/* listTypePop returns an object with its refcount incremented */
|
||||
decrRefCount(value);
|
||||
|
||||
/* Delete the source list when it is empty */
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,
|
||||
wherefrom == LIST_HEAD ? "lpop" : "rpop",
|
||||
touchedkey,
|
||||
c->db->id);
|
||||
if (listTypeLength(sobj) == 0) {
|
||||
dbDelete(c->db,touchedkey);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
|
||||
touchedkey,c->db->id);
|
||||
}
|
||||
signalModifiedKey(c,c->db,touchedkey);
|
||||
server.dirty++;
|
||||
if (c->cmd->proc == blmoveCommand) {
|
||||
rewriteClientCommandVector(c,5,shared.lmove,
|
||||
c->argv[1],c->argv[2],c->argv[3],c->argv[4]);
|
||||
@ -964,7 +955,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
|
||||
decrRefCount(argv[2]);
|
||||
|
||||
/* Pop a range of elements in a nested arrays way. */
|
||||
listPopRangeAndReplyWithKey(receiver, o, key, wherefrom, count, deleted);
|
||||
listPopRangeAndReplyWithKey(receiver, o, key, wherefrom, count, 0, deleted);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -978,9 +969,9 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
|
||||
addReplyBulk(receiver,key);
|
||||
addReplyBulk(receiver,value);
|
||||
|
||||
/* Notify event. */
|
||||
char *event = (wherefrom == LIST_HEAD) ? "lpop" : "rpop";
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
|
||||
/* We don't call signalModifiedKey() as it was already called
|
||||
* when an element was pushed on the list. */
|
||||
listElementsRemoved(receiver,key,wherefrom,o,1,0,deleted);
|
||||
} else {
|
||||
/* BLMOVE */
|
||||
robj *dstobj =
|
||||
@ -991,6 +982,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
|
||||
value = listTypePop(o, wherefrom);
|
||||
serverAssert(value != NULL);
|
||||
|
||||
/* "lpush" or "rpush" notify event will be notified by lmoveHandlePush. */
|
||||
lmoveHandlePush(receiver,dstkey,dstobj,value,whereto);
|
||||
/* Propagate the LMOVE/RPOPLPUSH operation. */
|
||||
int isbrpoplpush = (receiver->lastcmd->proc == brpoplpushCommand);
|
||||
@ -1001,22 +993,13 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
|
||||
argv[4] = getStringObjectFromListPosition(whereto);
|
||||
alsoPropagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
|
||||
/* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */
|
||||
notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop",
|
||||
key,receiver->db->id);
|
||||
/* We don't call signalModifiedKey() as it was already called
|
||||
* when an element was pushed on the list. */
|
||||
listElementsRemoved(receiver,key,wherefrom,o,1,0,deleted);
|
||||
}
|
||||
}
|
||||
|
||||
if (value) decrRefCount(value);
|
||||
|
||||
if (listTypeLength(o) == 0) {
|
||||
if (deleted) *deleted = 1;
|
||||
|
||||
dbDelete(receiver->db, key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, receiver->db->id);
|
||||
}
|
||||
/* We don't call signalModifiedKey() as it was already called
|
||||
* when an element was pushed on the list. */
|
||||
}
|
||||
|
||||
/* Blocking RPOP/LPOP/LMPOP
|
||||
@ -1054,7 +1037,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
|
||||
if (count != -1) {
|
||||
/* BLMPOP, non empty list, like a normal [LR]POP with count option.
|
||||
* The difference here we pop a range of elements in a nested arrays way. */
|
||||
listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);
|
||||
listPopRangeAndReplyWithKey(c, o, key, where, count, 1, NULL);
|
||||
|
||||
/* Replicate it as [LR]POP COUNT. */
|
||||
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
|
||||
@ -1073,7 +1056,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
|
||||
addReplyBulk(c,key);
|
||||
addReplyBulk(c,value);
|
||||
decrRefCount(value);
|
||||
listElementsRemoved(c,key,where,o,1,NULL);
|
||||
listElementsRemoved(c,key,where,o,1,1,NULL);
|
||||
|
||||
/* Replicate it as an [LR]POP instead of B[LR]POP. */
|
||||
rewriteClientCommandVector(c,2,
|
||||
|
@ -1915,6 +1915,27 @@ foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
r ping
|
||||
} {PONG}
|
||||
|
||||
test "BLPOP/BLMOVE should increase dirty" {
|
||||
r del lst{t} lst1{t}
|
||||
set rd [redis_deferring_client]
|
||||
|
||||
set dirty [s rdb_changes_since_last_save]
|
||||
$rd blpop lst{t} 0
|
||||
r lpush lst{t} a
|
||||
assert_equal {lst{t} a} [$rd read]
|
||||
set dirty2 [s rdb_changes_since_last_save]
|
||||
assert {$dirty2 == $dirty + 2}
|
||||
|
||||
set dirty [s rdb_changes_since_last_save]
|
||||
$rd blmove lst{t} lst1{t} left left 0
|
||||
r lpush lst{t} a
|
||||
assert_equal {a} [$rd read]
|
||||
set dirty2 [s rdb_changes_since_last_save]
|
||||
assert {$dirty2 == $dirty + 2}
|
||||
|
||||
$rd close
|
||||
}
|
||||
|
||||
foreach {pop} {BLPOP BLMPOP_RIGHT} {
|
||||
test "client unblock tests" {
|
||||
r del l
|
||||
|
Loading…
x
Reference in New Issue
Block a user