From f44186e5755e943f39c1446ad9576ab97de5039d Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Fri, 25 Dec 2020 21:49:24 +0200 Subject: [PATCH] Adds count to L/RPOP (#8179) Adds: `L/RPOP [count]` Implements no. 2 of the following strategies: 1. Loop on listTypePop - this would result in multiple calls for memory freeing and allocating (see https://github.com/redis/redis/pull/8179/commits/769167a079b0e110d28e4a8099dce1ecd45682b5) 2. Iterate the range to build the reply, then call quickListDelRange - this requires two iterations and **is the current choice** 3. Refactor quicklist to have a pop variant of quickListDelRange - probably optimal but more complex Also: * There's a historical check for NULL after calling listTypePop that was converted to an assert. * This refactors common logic shared between LRANGE and the new form of LPOP/RPOP into addListRangeReply (adds test for b/w compat) * Consequently, it may have made sense to have `LRANGE l -1 -2` and `LRANGE l 9 0` be legit and return a reverse reply. Due to historical reasons that would be, however, a breaking change. * Added minimal comments to existing commands to adhere to the style, make core dev life easier and get commit karma, naturally. --- src/server.c | 4 +- src/server.h | 2 +- src/t_list.c | 168 +++++++++++++++++++++++++++------------ tests/unit/type/list.tcl | 17 ++++ 4 files changed, 138 insertions(+), 53 deletions(-) diff --git a/src/server.c b/src/server.c index f28c6e114..ad78abeba 100644 --- a/src/server.c +++ b/src/server.c @@ -288,11 +288,11 @@ struct redisCommand redisCommandTable[] = { "write use-memory @list", 0,NULL,1,1,1,0,0,0}, - {"rpop",rpopCommand,2, + {"rpop",rpopCommand,-2, "write fast @list", 0,NULL,1,1,1,0,0,0}, - {"lpop",lpopCommand,2, + {"lpop",lpopCommand,-2, "write fast @list", 0,NULL,1,1,1,0,0,0}, diff --git a/src/server.h b/src/server.h index a19b5ad9e..1dc761959 100644 --- a/src/server.h +++ b/src/server.h @@ -1849,7 +1849,7 @@ void listTypeConvert(robj *subject, int enc); robj *listTypeDup(robj *o); void unblockClientWaitingData(client *c); void popGenericCommand(client *c, int where); -void listElementsRemoved(client *c, robj *key, int where, robj *o); +void listElementsRemoved(client *c, robj *key, int where, robj *o, long count); /* MULTI/EXEC/WATCH... */ void unwatchAllKeys(client *c); diff --git a/src/t_list.c b/src/t_list.c index c8323c612..b64b37a8a 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -206,7 +206,7 @@ robj *listTypeDup(robj *o) { lobj->encoding = OBJ_ENCODING_QUICKLIST; break; default: - serverPanic("Wrong encoding."); + serverPanic("Unknown list encoding"); break; } return lobj; @@ -216,6 +216,7 @@ robj *listTypeDup(robj *o) { * List Commands *----------------------------------------------------------------------------*/ +/* Implements LPUSH/RPUSH. */ void pushGenericCommand(client *c, int where) { int j, pushed = 0; robj *lobj = lookupKeyWrite(c->db,c->argv[1]); @@ -244,14 +245,17 @@ void pushGenericCommand(client *c, int where) { server.dirty += pushed; } +/* LPUSH [ ...] */ void lpushCommand(client *c) { pushGenericCommand(c,LIST_HEAD); } +/* RPUSH [ ...] */ void rpushCommand(client *c) { pushGenericCommand(c,LIST_TAIL); } +/* Implements LPUSHX/RPUSHX. */ void pushxGenericCommand(client *c, int where) { int j, pushed = 0; robj *subject; @@ -274,14 +278,17 @@ void pushxGenericCommand(client *c, int where) { server.dirty += pushed; } +/* LPUSHX [ ...] */ void lpushxCommand(client *c) { pushxGenericCommand(c,LIST_HEAD); } +/* RPUSH [ ...] */ void rpushxCommand(client *c) { pushxGenericCommand(c,LIST_TAIL); } +/* LINSERT (BEFORE|AFTER) */ void linsertCommand(client *c) { int where; robj *subject; @@ -326,12 +333,14 @@ void linsertCommand(client *c) { addReplyLongLong(c,listTypeLength(subject)); } +/* LLEN */ void llenCommand(client *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); if (o == NULL || checkType(c,o,OBJ_LIST)) return; addReplyLongLong(c,listTypeLength(o)); } +/* LINDEX */ void lindexCommand(client *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]); if (o == NULL || checkType(c,o,OBJ_LIST)) return; @@ -359,6 +368,7 @@ void lindexCommand(client *c) { } } +/* LSET */ void lsetCommand(client *c) { robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); if (o == NULL || checkType(c,o,OBJ_LIST)) return; @@ -385,53 +395,15 @@ void lsetCommand(client *c) { } } -void listElementsRemoved(client *c, robj *key, int where, robj *o) { - char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; +/* A helper for replying with a list's range between the inclusive start and end + * indexes as multi-bulk, with support for negative indexes. Note that start + * must be less than end or an empty array is returned. When the reverse + * argument is set to a non-zero value, the reply is reversed so that elements + * are returned from end to start. */ +void addListRangeReply(client *c, robj *o, long start, long end, int reverse) { + long rangelen, llen = listTypeLength(o); - notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id); - if (listTypeLength(o) == 0) { - notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); - dbDelete(c->db, key); - } - signalModifiedKey(c, c->db, key); - server.dirty++; -} - -void popGenericCommand(client *c, int where) { - robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]); - if (o == NULL || checkType(c, o, OBJ_LIST)) - return; - - robj *value = listTypePop(o, where); - if (value == NULL) { - addReplyNull(c); - } else { - addReplyBulk(c,value); - decrRefCount(value); - listElementsRemoved(c,c->argv[1],where,o); - } -} - -void lpopCommand(client *c) { - popGenericCommand(c,LIST_HEAD); -} - -void rpopCommand(client *c) { - popGenericCommand(c,LIST_TAIL); -} - -void lrangeCommand(client *c) { - robj *o; - long start, end, llen, rangelen; - - if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || - (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL - || checkType(c,o,OBJ_LIST)) return; - llen = listTypeLength(o); - - /* convert negative indexes */ + /* Convert negative indexes. */ if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; @@ -448,7 +420,9 @@ void lrangeCommand(client *c) { /* Return the result in form of a multi-bulk reply */ addReplyArrayLen(c,rangelen); if (o->encoding == OBJ_ENCODING_QUICKLIST) { - listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL); + int from = reverse ? end : start; + int direction = reverse ? LIST_HEAD : LIST_TAIL; + listTypeIterator *iter = listTypeInitIterator(o,from,direction); while(rangelen--) { listTypeEntry entry; @@ -462,10 +436,98 @@ void lrangeCommand(client *c) { } listTypeReleaseIterator(iter); } else { - serverPanic("List encoding is not QUICKLIST!"); + serverPanic("Unknown list encoding"); } } +/* A housekeeping helper for list elements popping tasks. */ +void listElementsRemoved(client *c, robj *key, int where, robj *o, long count) { + char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; + + notifyKeyspaceEvent(NOTIFY_LIST, event, key, c->db->id); + if (listTypeLength(o) == 0) { + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); + dbDelete(c->db, key); + } + signalModifiedKey(c, c->db, key); + server.dirty += count; +} + +/* Implements the generic list pop operation for LPOP/RPOP. + * The where argument specifies which end of the list is operated on. An + * optional count may be provided as the third argument of the client's + * command. */ +void popGenericCommand(client *c, int where) { + long count = 0; + robj *value; + + if (c->argc > 3) { + addReplyErrorFormat(c,"wrong number of arguments for '%s' command", + c->cmd->name); + return; + } else if (c->argc == 3) { + /* Parse the optional count argument. */ + if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK) + return; + if (count == 0) { + /* Fast exit path. */ + addReplyNullArray(c); + return; + } + } + + robj *o = lookupKeyWriteOrReply(c, c->argv[1], shared.null[c->resp]); + if (o == NULL || checkType(c, o, OBJ_LIST)) + return; + + if (!count) { + /* Pop a single element. This is POP's original behavior that replies + * with a bulk string. */ + value = listTypePop(o,where); + serverAssert(value != NULL); + addReplyBulk(c,value); + decrRefCount(value); + listElementsRemoved(c,c->argv[1],where,o,1); + } else { + /* Pop a range of elements. An addition to the original POP command, + * which replies with a multi-bulk. */ + long llen = listTypeLength(o); + long rangelen = (count > llen) ? llen : count; + long rangestart = (where == LIST_HEAD) ? 0 : -rangelen; + long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1; + int reverse = (where == LIST_HEAD) ? 0 : 1; + + addListRangeReply(c,o,rangestart,rangeend,reverse); + quicklistDelRange(o->ptr,rangestart,rangelen); + listElementsRemoved(c,c->argv[1],where,o,rangelen); + } +} + +/* LPOP [count] */ +void lpopCommand(client *c) { + popGenericCommand(c,LIST_HEAD); +} + +/* RPOP [count] */ +void rpopCommand(client *c) { + popGenericCommand(c,LIST_TAIL); +} + +/* LRANGE */ +void lrangeCommand(client *c) { + robj *o; + long start, end; + + if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || + (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; + + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL + || checkType(c,o,OBJ_LIST)) return; + + addListRangeReply(c,o,start,end,0); +} + +/* LTRIM */ void ltrimCommand(client *c) { robj *o; long start, end, llen, ltrim, rtrim; @@ -629,6 +691,7 @@ void lposCommand(client *c) { } } +/* LREM */ void lremCommand(client *c) { robj *subject, *obj; obj = c->argv[3]; @@ -756,6 +819,7 @@ void lmoveGenericCommand(client *c, int wherefrom, int whereto) { } } +/* LMOVE (LEFT|RIGHT) (LEFT|RIGHT) */ void lmoveCommand(client *c) { int wherefrom, whereto; if (getListPositionFromObjectOrReply(c,c->argv[3],&wherefrom) @@ -888,7 +952,7 @@ void blockingPopGenericCommand(client *c, int where) { addReplyBulk(c,c->argv[j]); addReplyBulk(c,value); decrRefCount(value); - listElementsRemoved(c,c->argv[j],where,o); + listElementsRemoved(c,c->argv[j],where,o,1); /* Replicate it as an [LR]POP instead of B[LR]POP. */ rewriteClientCommandVector(c,2, @@ -912,10 +976,12 @@ void blockingPopGenericCommand(client *c, int where) { blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL); } +/* BLPOP [ ...] */ void blpopCommand(client *c) { blockingPopGenericCommand(c,LIST_HEAD); } +/* BLPOP [ ...] */ void brpopCommand(client *c) { blockingPopGenericCommand(c,LIST_TAIL); } @@ -942,6 +1008,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou } } +/* BLMOVE (LEFT|RIGHT) (LEFT|RIGHT) */ void blmoveCommand(client *c) { mstime_t timeout; int wherefrom, whereto; @@ -954,6 +1021,7 @@ void blmoveCommand(client *c) { blmoveGenericCommand(c,wherefrom,whereto,timeout); } +/* BRPOPLPUSH */ void brpoplpushCommand(client *c) { mstime_t timeout; if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS) diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 61ca23377..9be5dd93b 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -123,6 +123,17 @@ start_server { test {R/LPOP against empty list} { r lpop non-existing-list } {} + + test {R/LPOP with the optional count argument} { + assert_equal 7 [r lpush listcount aa bb cc dd ee ff gg] + assert_equal {} [r lpop listcount 0] + assert_equal {gg} [r lpop listcount 1] + assert_equal {ff ee} [r lpop listcount 2] + assert_equal {aa bb} [r rpop listcount 2] + assert_equal {cc} [r rpop listcount 1] + assert_equal {dd} [r rpop listcount 123] + assert_error "*ERR*range*" {r lpop forbarqaz -123} + } test {Variadic RPUSH/LPUSH} { r del mylist @@ -947,6 +958,12 @@ start_server { assert_equal {} [r lrange nosuchkey 0 1] } + test {LRANGE with start > end yields an empty array for backward compatibility} { + create_list mylist "1 2 3" + assert_equal {} [r lrange mylist 1 0] + assert_equal {} [r lrange mylist -1 -2] + } + foreach {type large} [array get largevalue] { proc trim_list {type min max} { upvar 1 large large