Fixes ZPOPMIN/ZPOPMAX wrong replies when count is 0 with non-zset (#9711)

Moves ZPOP ... 0 fast exit path after type check to reply with
WRONGTYPE. In the past it will return an empty array.

Also now count is not allowed to be negative.

see #9680

before:
```
127.0.0.1:6379> set zset str
OK
127.0.0.1:6379> zpopmin zset 0
(empty array)
127.0.0.1:6379> zpopmin zset -1
(empty array)
```

after:
```
127.0.0.1:6379> set zset str
OK
127.0.0.1:6379> zpopmin zset 0
(error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> zpopmin zset -1
(error) ERR value is out of range, must be positive
```
This commit is contained in:
Binbin 2021-11-18 16:13:16 +08:00 committed by GitHub
parent 32215e7889
commit 91e77a0cfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 63 additions and 36 deletions

View File

@ -363,7 +363,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
argv[0] = where == ZSET_MIN ? shared.zpopmin : shared.zpopmax;
argv[1] = rl->key;
incrRefCount(rl->key);
if (count != 0) {
if (count != -1) {
/* Replicate it as command with COUNT. */
robj *count_obj = createStringObjectFromLongLong((count > llen) ? llen : count);
argv[2] = count_obj;
@ -371,7 +371,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
}
propagate(receiver->db->id, argv, argc, PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[1]);
if (count != 0) decrRefCount(argv[2]);
if (count != -1) decrRefCount(argv[2]);
/* The zset is empty and has been deleted. */
if (deleted) break;

View File

@ -6249,7 +6249,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
"Blocking module command called from transaction");
} else {
if (keys) {
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,0,timeout,NULL,NULL,NULL);
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,-1,timeout,NULL,NULL,NULL);
} else {
blockClient(c,BLOCKED_MODULE);
}

View File

@ -846,7 +846,7 @@ typedef struct multiState {
* The fields used depend on client->btype. */
typedef struct blockingState {
/* Generic fields. */
long count; /* Elements to pop if count was specified (BLMPOP), 0 otherwise. */
long count; /* Elements to pop if count was specified (BLMPOP/BZMPOP), -1 otherwise. */
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */

View File

@ -1040,9 +1040,9 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
* 'numkeys' is the number of keys.
* 'timeout_idx' parameter position of block timeout.
* 'where' LIST_HEAD for LEFT, LIST_TAIL for RIGHT.
* 'count' is the number of elements requested to pop, or 0 for plain single pop.
* 'count' is the number of elements requested to pop, or -1 for plain single pop.
*
* When count is 0, a reply of a single bulk-string will be used.
* When count is -1, a reply of a single bulk-string will be used.
* When count > 0, an array reply will be used. */
void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, int timeout_idx, long count) {
robj *o;
@ -1067,7 +1067,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
/* Empty list, move to next key. */
if (llen == 0) continue;
if (count != 0) {
if (count != -1) {
/* BLMPOP, non empty list, like a normal [LR]POP with count option.
* The difference here we pop a range of elements in a nested arrays way. */
listPopRangeAndReplyWithKey(c, o, key, where, count, NULL);
@ -1112,12 +1112,12 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
/* BLPOP <key> [<key> ...] <timeout> */
void blpopCommand(client *c) {
blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_HEAD,c->argc-1,0);
blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_HEAD,c->argc-1,-1);
}
/* BRPOP <key> [<key> ...] <timeout> */
void brpopCommand(client *c) {
blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_TAIL,c->argc-1,0);
blockingPopGenericCommand(c,c->argv+1,c->argc-2,LIST_TAIL,c->argc-1,-1);
}
void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeout) {
@ -1132,7 +1132,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
} else {
/* The list is empty and the client blocks. */
struct blockPos pos = {wherefrom, whereto};
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,0,timeout,c->argv[2],&pos,NULL);
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,-1,timeout,c->argv[2],&pos,NULL);
}
} else {
/* The list exists and has elements, so
@ -1171,7 +1171,7 @@ void lmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
long j;
long numkeys = 0; /* Number of keys. */
int where = 0; /* HEAD for LEFT, TAIL for RIGHT. */
long count = 0; /* Reply will consist of up to count elements, depending on the list's length. */
long count = -1; /* Reply will consist of up to count elements, depending on the list's length. */
/* Parse the numkeys. */
if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX,
@ -1192,7 +1192,7 @@ void lmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
char *opt = c->argv[j]->ptr;
int moreargs = (c->argc - 1) - j;
if (count == 0 && !strcasecmp(opt, "COUNT") && moreargs) {
if (count == -1 && !strcasecmp(opt, "COUNT") && moreargs) {
j++;
if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX,
&count,"count should be greater than 0") != C_OK)
@ -1203,7 +1203,7 @@ void lmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
}
}
if (count == 0) count = 1;
if (count == -1) count = 1;
if (is_block) {
/* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingPopGenericCommand. */

View File

@ -2182,7 +2182,7 @@ void xreadCommand(client *c) {
goto cleanup;
}
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
0, timeout, NULL, NULL, ids);
-1, timeout, NULL, NULL, ids);
/* If no COUNT is given and we block, set a relatively small count:
* in case the ID provided is too low, we do not want the server to
* block just to serve this client a huge stream of messages. */

View File

@ -3776,7 +3776,7 @@ void zscanCommand(client *c) {
* behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
* Or in ZMPOP/BZMPOP, because we also can take multiple keys.
*
* 'count' is the number of elements requested to pop, or 0 for plain single pop.
* 'count' is the number of elements requested to pop, or -1 for plain single pop.
*
* 'use_nested_array' when false it generates a flat array (with or without key name).
* When true, it generates a nested 2 level array of field + score pairs, or 3 level when emitkey is set.
@ -3819,10 +3819,16 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
return;
}
if (count == 0) {
/* ZPOPMIN/ZPOPMAX with count 0. */
addReply(c, shared.emptyarray);
return;
}
long result_count = 0;
/* When count is 0, we need to correct it to 1 for plain single pop. */
if (count == 0) count = 1;
/* When count is -1, we need to correct it to 1 for plain single pop. */
if (count == -1) count = 1;
long llen = zsetLength(zobj);
long rangelen = (count > llen) ? llen : count;
@ -3926,20 +3932,13 @@ void zpopMinMaxCommand(client *c, int where) {
return;
}
long count = 0; /* 0 for plain single pop. */
if (c->argc == 3) {
if (getLongFromObjectOrReply(c, c->argv[2], &count, NULL) != C_OK)
return;
long count = -1; /* -1 for plain single pop. */
if (c->argc == 3 && getPositiveLongFromObjectOrReply(c, c->argv[2], &count, NULL) != C_OK)
return;
if (count <= 0) {
addReply(c, shared.emptyarray);
return;
}
}
/* Respond with a single (flat) array in RESP2 or if count is 0
/* Respond with a single (flat) array in RESP2 or if count is -1
* (returning a single element). In RESP3, when count > 0 use nested array. */
int use_nested_array = (c->resp > 2 && count != 0);
int use_nested_array = (c->resp > 2 && count != -1);
genericZpopCommand(c, &c->argv[1], 1, where, 0, count, use_nested_array, 0, NULL);
}
@ -3962,7 +3961,7 @@ void zpopmaxCommand(client *c) {
*
* 'where' ZSET_MIN or ZSET_MAX.
*
* 'count' is the number of elements requested to pop, or 0 for plain single pop.
* 'count' is the number of elements requested to pop, or -1 for plain single pop.
*
* 'use_nested_array' when false it generates a flat array (with or without key name).
* When true, it generates a nested 3 level array of keyname, field + score pairs.
@ -3992,7 +3991,7 @@ void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where,
/* Non empty zset, this is like a normal ZPOP[MIN|MAX]. */
genericZpopCommand(c, &key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, NULL);
if (count == 0) {
if (count == -1) {
/* Replicate it as ZPOP[MIN|MAX] instead of BZPOP[MIN|MAX]. */
rewriteClientCommandVector(c,2,
(where == ZSET_MAX) ? shared.zpopmax : shared.zpopmin,
@ -4023,12 +4022,12 @@ void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where,
// BZPOPMIN key [key ...] timeout
void bzpopminCommand(client *c) {
blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MIN, c->argc-1, 0, 0, 0);
blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MIN, c->argc-1, -1, 0, 0);
}
// BZPOPMAX key [key ...] timeout
void bzpopmaxCommand(client *c) {
blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MAX, c->argc-1, 0, 0, 0);
blockingGenericZpopCommand(c, c->argv+1, c->argc-2, ZSET_MAX, c->argc-1, -1, 0, 0);
}
static void zarndmemberReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
@ -4287,7 +4286,7 @@ void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
long j;
long numkeys = 0; /* Number of keys. */
int where = 0; /* ZSET_MIN or ZSET_MAX. */
long count = 0; /* Reply will consist of up to count elements, depending on the zset's length. */
long count = -1; /* Reply will consist of up to count elements, depending on the zset's length. */
/* Parse the numkeys. */
if (getRangeLongFromObjectOrReply(c, c->argv[numkeys_idx], 1, LONG_MAX,
@ -4314,7 +4313,7 @@ void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
char *opt = c->argv[j]->ptr;
int moreargs = (c->argc - 1) - j;
if (count == 0 && !strcasecmp(opt, "COUNT") && moreargs) {
if (count == -1 && !strcasecmp(opt, "COUNT") && moreargs) {
j++;
if (getRangeLongFromObjectOrReply(c, c->argv[j], 1, LONG_MAX,
&count,"count should be greater than 0") != C_OK)
@ -4325,7 +4324,7 @@ void zmpopGenericCommand(client *c, int numkeys_idx, int is_block) {
}
}
if (count == 0) count = 1;
if (count == -1) count = 1;
if (is_block) {
/* BLOCK. We will handle CLIENT_DENY_BLOCKING flag in blockingGenericZpopCommand. */

View File

@ -1003,6 +1003,32 @@ start_server {tags {"zset"}} {
}
}
foreach {pop} {ZPOPMIN ZPOPMAX} {
test "$pop with the count 0 returns an empty array" {
r del zset
r zadd zset 1 a 2 b 3 c
assert_equal {} [r $pop zset 0]
# Make sure we can distinguish between an empty array and a null response
r readraw 1
assert_equal {*0} [r $pop zset 0]
r readraw 0
assert_equal 3 [r zcard zset]
}
test "$pop with negative count" {
r set zset foo
assert_error "ERR *must be positive" {r $pop zset -1}
r del zset
assert_error "ERR *must be positive" {r $pop zset -2}
r zadd zset 1 a 2 b 3 c
assert_error "ERR *must be positive" {r $pop zset -3}
}
}
foreach {popmin popmax} {ZPOPMIN ZPOPMAX ZMPOP_MIN ZMPOP_MAX} {
test "Basic $popmin/$popmax with a single key - $encoding" {
r del zset
@ -1115,7 +1141,9 @@ start_server {tags {"zset"}} {
test "ZPOP/ZMPOP against wrong type" {
r set foo{t} bar
assert_error "*WRONGTYPE*" {r zpopmin foo{t}}
assert_error "*WRONGTYPE*" {r zpopmin foo{t} 0}
assert_error "*WRONGTYPE*" {r zpopmax foo{t}}
assert_error "*WRONGTYPE*" {r zpopmax foo{t} 0}
assert_error "*WRONGTYPE*" {r zpopmin foo{t} 2}
assert_error "*WRONGTYPE*" {r zmpop 1 foo{t} min}