Add ZRANGESTORE command, and improve ZSTORE command (#7844)

Add ZRANGESTORE command, and improve ZSTORE command to deprecated Z[REV]RANGE[BYSCORE|BYLEX].

Syntax for the new ZRANGESTORE command:
ZRANGESTORE [BYSCORE | BYLEX] [REV] [LIMIT offset count]

New syntax for ZRANGE:
ZRANGE [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count]

Old syntax for ZRANGE:
ZRANGE [WITHSCORES]

Other ZRANGE commands remain unchanged.

The implementation uses common code for all of these, by utilizing a consumer interface that in one
command response to the client, and in the other command stores a zset key.

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Jonah H. Harris 2021-01-07 03:58:53 -05:00 committed by GitHub
parent cfcd0fa6f7
commit b5029dfdad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 521 additions and 233 deletions

View File

@ -464,6 +464,10 @@ struct redisCommand redisCommandTable[] = {
"read-only @sortedset",
0,NULL,1,1,1,0,0,0},
{"zrangestore",zrangestoreCommand,-5,
"write use-memory @sortedset",
0,NULL,1,2,1,0,0,0},
{"zrangebyscore",zrangebyscoreCommand,-4,
"read-only @sortedset",
0,NULL,1,1,1,0,0,0},

View File

@ -2517,6 +2517,7 @@ void zinterstoreCommand(client *c);
void zdiffstoreCommand(client *c);
void zunionCommand(client *c);
void zinterCommand(client *c);
void zrangestoreCommand(client *c);
void zdiffCommand(client *c);
void zscanCommand(client *c);
void hkeysCommand(client *c);

View File

@ -1822,11 +1822,15 @@ void zremCommand(client *c) {
addReplyLongLong(c,deleted);
}
typedef enum {
ZRANGE_AUTO = 0,
ZRANGE_RANK,
ZRANGE_SCORE,
ZRANGE_LEX,
} zrange_type;
/* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
#define ZRANGE_RANK 0
#define ZRANGE_SCORE 1
#define ZRANGE_LEX 2
void zremrangeGenericCommand(client *c, int rangetype) {
void zremrangeGenericCommand(client *c, zrange_type rangetype) {
robj *key = c->argv[1];
robj *zobj;
int keyremoved = 0;
@ -1834,22 +1838,28 @@ void zremrangeGenericCommand(client *c, int rangetype) {
zrangespec range;
zlexrangespec lexrange;
long start, end, llen;
char *notify_type = NULL;
/* Step 1: Parse the range. */
if (rangetype == ZRANGE_RANK) {
notify_type = "zremrangebyrank";
if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) ||
(getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK))
return;
} else if (rangetype == ZRANGE_SCORE) {
notify_type = "zremrangebyscore";
if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) {
addReplyError(c,"min or max is not a float");
return;
}
} else if (rangetype == ZRANGE_LEX) {
notify_type = "zremrangebylex";
if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) {
addReplyError(c,"min or max not valid string range item");
return;
}
} else {
serverPanic("unknown rangetype %d", (int)rangetype);
}
/* Step 2: Lookup & range sanity checks if needed. */
@ -1875,6 +1885,7 @@ void zremrangeGenericCommand(client *c, int rangetype) {
/* Step 3: Perform the range deletion operation. */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
switch(rangetype) {
case ZRANGE_AUTO:
case ZRANGE_RANK:
zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
break;
@ -1892,6 +1903,7 @@ void zremrangeGenericCommand(client *c, int rangetype) {
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
switch(rangetype) {
case ZRANGE_AUTO:
case ZRANGE_RANK:
deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
break;
@ -1913,9 +1925,8 @@ void zremrangeGenericCommand(client *c, int rangetype) {
/* Step 4: Notifications and reply. */
if (deleted) {
char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id);
notifyKeyspaceEvent(NOTIFY_ZSET,notify_type,key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
}
@ -2778,27 +2789,180 @@ void zdiffCommand(client *c) {
zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_DIFF);
}
void zrangeGenericCommand(client *c, int reverse) {
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
long llen;
long rangelen;
typedef enum {
ZRANGE_DIRECTION_AUTO = 0,
ZRANGE_DIRECTION_FORWARD,
ZRANGE_DIRECTION_REVERSE
} zrange_direction;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
typedef enum {
ZRANGE_CONSUMER_TYPE_CLIENT = 0,
ZRANGE_CONSUMER_TYPE_INTERNAL
} zrange_consumer_type;
if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
withscores = 1;
} else if (c->argc >= 5) {
addReplyErrorObject(c,shared.syntaxerr);
return;
typedef struct zrange_result_handler zrange_result_handler;
typedef void (*zrangeResultBeginFunction)(zrange_result_handler *c);
typedef void (*zrangeResultFinalizeFunction)(
zrange_result_handler *c, size_t result_count);
typedef void (*zrangeResultEmitCBufferFunction)(
zrange_result_handler *c, const void *p, size_t len, double score);
typedef void (*zrangeResultEmitLongLongFunction)(
zrange_result_handler *c, long long ll, double score);
void zrangeGenericCommand (zrange_result_handler *handler, int argc_start, int store,
zrange_type rangetype, zrange_direction direction);
/* Interface struct for ZRANGE/ZRANGESTORE generic implementation.
* There is one implementation of this interface that sends a RESP reply to clients.
* and one implementation that stores the range result into a zset object. */
struct zrange_result_handler {
zrange_consumer_type type;
client *client;
robj *dstkey;
robj *dstobj;
void *userdata;
int withscores;
int should_emit_array_length;
zrangeResultBeginFunction beginResultEmission;
zrangeResultFinalizeFunction finalizeResultEmission;
zrangeResultEmitCBufferFunction emitResultFromCBuffer;
zrangeResultEmitLongLongFunction emitResultFromLongLong;
};
/* Result handler methods for responding the ZRANGE to clients. */
static void zrangeResultBeginClient(zrange_result_handler *handler) {
handler->userdata = addReplyDeferredLen(handler->client);
}
static void zrangeResultEmitCBufferToClient(zrange_result_handler *handler,
const void *value, size_t value_length_in_bytes, double score)
{
if (handler->should_emit_array_length) {
addReplyArrayLen(handler->client, 2);
}
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL
|| checkType(c,zobj,OBJ_ZSET)) return;
addReplyBulkCBuffer(handler->client, value, value_length_in_bytes);
if (handler->withscores) {
addReplyDouble(handler->client, score);
}
}
static void zrangeResultEmitLongLongToClient(zrange_result_handler *handler,
long long value, double score)
{
if (handler->should_emit_array_length) {
addReplyArrayLen(handler->client, 2);
}
addReplyBulkLongLong(handler->client, value);
if (handler->withscores) {
addReplyDouble(handler->client, score);
}
}
static void zrangeResultFinalizeClient(zrange_result_handler *handler,
size_t result_count)
{
if (handler->withscores && (handler->client->resp == 2)) {
result_count *= 2;
}
setDeferredArrayLen(handler->client, handler->userdata, result_count);
}
/* Result handler methods for storing the ZRANGESTORE to a zset. */
static void zrangeResultBeginStore(zrange_result_handler *handler)
{
handler->dstobj = createZsetZiplistObject();
}
static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler,
const void *value, size_t value_length_in_bytes, double score)
{
double newscore;
int retflags = 0;
sds ele = sdsnewlen(value, value_length_in_bytes);
int retval = zsetAdd(handler->dstobj, score, ele, &retflags, &newscore);
sdsfree(ele);
serverAssert(retval);
}
static void zrangeResultEmitLongLongForStore(zrange_result_handler *handler,
long long value, double score)
{
double newscore;
int retflags = 0;
sds ele = sdsfromlonglong(value);
int retval = zsetAdd(handler->dstobj, score, ele, &retflags, &newscore);
sdsfree(ele);
serverAssert(retval);
}
static void zrangeResultFinalizeStore(zrange_result_handler *handler, size_t result_count)
{
if (result_count) {
setKey(handler->client, handler->client->db, handler->dstkey, handler->dstobj);
addReplyLongLong(handler->client, result_count);
notifyKeyspaceEvent(NOTIFY_ZSET, "zrangestore", handler->dstkey, handler->client->db->id);
server.dirty++;
} else {
addReply(handler->client, shared.czero);
if (dbDelete(handler->client->db, handler->dstkey)) {
signalModifiedKey(handler->client, handler->client->db, handler->dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", handler->dstkey, handler->client->db->id);
server.dirty++;
}
}
decrRefCount(handler->dstobj);
}
/* Initialize the consumer interface type with the requested type. */
static void zrangeResultHandlerInit(zrange_result_handler *handler,
client *client, zrange_consumer_type type)
{
memset(handler, 0, sizeof(*handler));
handler->client = client;
switch (type) {
case ZRANGE_CONSUMER_TYPE_CLIENT:
handler->beginResultEmission = zrangeResultBeginClient;
handler->finalizeResultEmission = zrangeResultFinalizeClient;
handler->emitResultFromCBuffer = zrangeResultEmitCBufferToClient;
handler->emitResultFromLongLong = zrangeResultEmitLongLongToClient;
break;
case ZRANGE_CONSUMER_TYPE_INTERNAL:
handler->beginResultEmission = zrangeResultBeginStore;
handler->finalizeResultEmission = zrangeResultFinalizeStore;
handler->emitResultFromCBuffer = zrangeResultEmitCBufferForStore;
handler->emitResultFromLongLong = zrangeResultEmitLongLongForStore;
break;
}
}
static void zrangeResultHandlerScoreEmissionEnable(zrange_result_handler *handler) {
handler->withscores = 1;
handler->should_emit_array_length = (handler->client->resp > 2);
}
static void zrangeResultHandlerDestinationKeySet (zrange_result_handler *handler,
robj *dstkey)
{
handler->dstkey = dstkey;
}
/* This command implements ZRANGE, ZREVRANGE. */
void genericZrangebyrankCommand(zrange_result_handler *handler,
robj *zobj, long start, long end, int withscores, int reverse) {
client *c = handler->client;
long llen;
long rangelen;
size_t result_cardinality;
/* Sanitize indexes. */
llen = zsetLength(zobj);
@ -2806,22 +2970,17 @@ void zrangeGenericCommand(client *c, int reverse) {
if (end < 0) end = llen+end;
if (start < 0) start = 0;
handler->beginResultEmission(handler);
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptyarray);
handler->finalizeResultEmission(handler, 0);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply. RESP3 clients
* will receive sub arrays with score->element, while RESP2 returned
* a flat array. */
if (withscores && c->resp == 2)
addReplyArrayLen(c, rangelen*2);
else
addReplyArrayLen(c, rangelen);
result_cardinality = rangelen;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
@ -2829,6 +2988,7 @@ void zrangeGenericCommand(client *c, int reverse) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
double score = 0.0;
if (reverse)
eptr = ziplistIndex(zl,-2-(2*start));
@ -2842,12 +3002,14 @@ void zrangeGenericCommand(client *c, int reverse) {
serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (withscores && c->resp > 2) addReplyArrayLen(c,2);
if (vstr == NULL)
addReplyBulkLongLong(c,vlong);
else
addReplyBulkCBuffer(c,vstr,vlen);
if (withscores) addReplyDouble(c,zzlGetScore(sptr));
if (withscores) /* don't bother to extract the score if it's gonna be ignored. */
score = zzlGetScore(sptr);
if (vstr == NULL) {
handler->emitResultFromLongLong(handler, vlong, score);
} else {
handler->emitResultFromCBuffer(handler, vstr, vlen, score);
}
if (reverse)
zzlPrev(zl,&eptr,&sptr);
@ -2859,7 +3021,6 @@ void zrangeGenericCommand(client *c, int reverse) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
sds ele;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse) {
@ -2874,83 +3035,53 @@ void zrangeGenericCommand(client *c, int reverse) {
while(rangelen--) {
serverAssertWithInfo(c,zobj,ln != NULL);
ele = ln->ele;
if (withscores && c->resp > 2) addReplyArrayLen(c,2);
addReplyBulkCBuffer(c,ele,sdslen(ele));
if (withscores) addReplyDouble(c,ln->score);
sds ele = ln->ele;
handler->emitResultFromCBuffer(handler, ele, sdslen(ele), ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
handler->finalizeResultEmission(handler, result_cardinality);
}
/* ZRANGESTORE <dst> <src> <min> <max> [BYSCORE | BYLEX] [REV] [LIMIT offset count] */
void zrangestoreCommand (client *c) {
robj *dstkey = c->argv[1];
zrange_result_handler handler;
zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_INTERNAL);
zrangeResultHandlerDestinationKeySet(&handler, dstkey);
zrangeGenericCommand(&handler, 2, 1, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO);
}
/* ZRANGE <key> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] */
void zrangeCommand(client *c) {
zrangeGenericCommand(c,0);
zrange_result_handler handler;
zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
zrangeGenericCommand(&handler, 1, 0, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO);
}
/* ZREVRANGE <key> <min> <max> [WITHSCORES] */
void zrevrangeCommand(client *c) {
zrangeGenericCommand(c,1);
zrange_result_handler handler;
zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
zrangeGenericCommand(&handler, 1, 0, ZRANGE_RANK, ZRANGE_DIRECTION_REVERSE);
}
/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
void genericZrangebyscoreCommand(client *c, int reverse) {
zrangespec range;
robj *key = c->argv[1];
robj *zobj;
long offset = 0, limit = -1;
int withscores = 0;
void genericZrangebyscoreCommand(zrange_result_handler *handler,
zrangespec *range, robj *zobj, int withscores, long offset,
long limit, int reverse) {
client *c = handler->client;
unsigned long rangelen = 0;
void *replylen = NULL;
int minidx, maxidx;
/* Parse the range arguments. */
if (reverse) {
/* Range is given as [max,min] */
maxidx = 2; minidx = 3;
} else {
/* Range is given as [min,max] */
minidx = 2; maxidx = 3;
}
if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
addReplyError(c,"min or max is not a float");
return;
}
/* Parse optional extra arguments. Note that ZCOUNT will exactly have
* 4 arguments, so we'll never enter the following code path. */
if (c->argc > 4) {
int remaining = c->argc - 4;
int pos = 4;
while (remaining) {
if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
pos++; remaining--;
withscores = 1;
} else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL)
!= C_OK) ||
(getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL)
!= C_OK))
{
return;
}
pos += 3; remaining -= 3;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
}
/* Ok, lookup the key and get the range */
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL ||
checkType(c,zobj,OBJ_ZSET)) return;
handler->beginResultEmission(handler);
/* For invalid offset, return directly. */
if (offset > 0 && offset >= (long)zsetLength(zobj)) {
addReply(c,shared.emptyarray);
handler->finalizeResultEmission(handler, 0);
return;
}
@ -2960,29 +3091,17 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
double score;
/* If reversed, get the last node in range as starting point. */
if (reverse) {
eptr = zzlLastInRange(zl,&range);
eptr = zzlLastInRange(zl,range);
} else {
eptr = zzlFirstInRange(zl,&range);
}
/* No "first" element in the specified interval. */
if (eptr == NULL) {
addReply(c,shared.emptyarray);
return;
eptr = zzlFirstInRange(zl,range);
}
/* Get score pointer for the first element. */
serverAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addReplyDeferredLen(c);
if (eptr)
sptr = ziplistNext(zl,eptr);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
@ -2995,13 +3114,13 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
}
while (eptr && limit--) {
score = zzlGetScore(sptr);
double score = zzlGetScore(sptr);
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zslValueGteMin(score,&range)) break;
if (!zslValueGteMin(score,range)) break;
} else {
if (!zslValueLteMax(score,&range)) break;
if (!zslValueLteMax(score,range)) break;
}
/* We know the element exists, so ziplistGet should always
@ -3009,13 +3128,11 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
rangelen++;
if (withscores && c->resp > 2) addReplyArrayLen(c,2);
if (vstr == NULL) {
addReplyBulkLongLong(c,vlong);
handler->emitResultFromLongLong(handler, vlong, score);
} else {
addReplyBulkCBuffer(c,vstr,vlen);
handler->emitResultFromCBuffer(handler, vstr, vlen, score);
}
if (withscores) addReplyDouble(c,score);
/* Move to next node */
if (reverse) {
@ -3031,22 +3148,11 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
/* If reversed, get the last node in range as starting point. */
if (reverse) {
ln = zslLastInRange(zsl,&range);
ln = zslLastInRange(zsl,range);
} else {
ln = zslFirstInRange(zsl,&range);
ln = zslFirstInRange(zsl,range);
}
/* No "first" element in the specified interval. */
if (ln == NULL) {
addReply(c,shared.emptyarray);
return;
}
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addReplyDeferredLen(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (ln && offset--) {
@ -3060,15 +3166,14 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
while (ln && limit--) {
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zslValueGteMin(ln->score,&range)) break;
if (!zslValueGteMin(ln->score,range)) break;
} else {
if (!zslValueLteMax(ln->score,&range)) break;
if (!zslValueLteMax(ln->score,range)) break;
}
rangelen++;
if (withscores && c->resp > 2) addReplyArrayLen(c,2);
addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
if (withscores) addReplyDouble(c,ln->score);
handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele),
((withscores) ? ln->score : ln->score));
/* Move to next node */
if (reverse) {
@ -3081,16 +3186,21 @@ void genericZrangebyscoreCommand(client *c, int reverse) {
serverPanic("Unknown sorted set encoding");
}
if (withscores && c->resp == 2) rangelen *= 2;
setDeferredArrayLen(c, replylen, rangelen);
handler->finalizeResultEmission(handler, rangelen);
}
/* ZRANGEBYSCORE <key> <min> <max> [WITHSCORES] [LIMIT offset count] */
void zrangebyscoreCommand(client *c) {
genericZrangebyscoreCommand(c,0);
zrange_result_handler handler;
zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_FORWARD);
}
/* ZREVRANGEBYSCORE <key> <min> <max> [WITHSCORES] [LIMIT offset count] */
void zrevrangebyscoreCommand(client *c) {
genericZrangebyscoreCommand(c,1);
zrange_result_handler handler;
zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_REVERSE);
}
void zcountCommand(client *c) {
@ -3250,58 +3360,14 @@ void zlexcountCommand(client *c) {
}
/* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
void genericZrangebylexCommand(client *c, int reverse) {
zlexrangespec range;
robj *key = c->argv[1];
robj *zobj;
long offset = 0, limit = -1;
void genericZrangebylexCommand(zrange_result_handler *handler,
zlexrangespec *range, robj *zobj, int withscores, long offset, long limit,
int reverse)
{
client *c = handler->client;
unsigned long rangelen = 0;
void *replylen = NULL;
int minidx, maxidx;
/* Parse the range arguments. */
if (reverse) {
/* Range is given as [max,min] */
maxidx = 2; minidx = 3;
} else {
/* Range is given as [min,max] */
minidx = 2; maxidx = 3;
}
if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) {
addReplyError(c,"min or max not valid string range item");
return;
}
/* Parse optional extra arguments. Note that ZCOUNT will exactly have
* 4 arguments, so we'll never enter the following code path. */
if (c->argc > 4) {
int remaining = c->argc - 4;
int pos = 4;
while (remaining) {
if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) {
zslFreeLexRange(&range);
return;
}
pos += 3; remaining -= 3;
} else {
zslFreeLexRange(&range);
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
}
/* Ok, lookup the key and get the range */
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL ||
checkType(c,zobj,OBJ_ZSET))
{
zslFreeLexRange(&range);
return;
}
handler->beginResultEmission(handler);
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
@ -3312,26 +3378,14 @@ void genericZrangebylexCommand(client *c, int reverse) {
/* If reversed, get the last node in range as starting point. */
if (reverse) {
eptr = zzlLastInLexRange(zl,&range);
eptr = zzlLastInLexRange(zl,range);
} else {
eptr = zzlFirstInLexRange(zl,&range);
}
/* No "first" element in the specified interval. */
if (eptr == NULL) {
addReply(c,shared.emptyarray);
zslFreeLexRange(&range);
return;
eptr = zzlFirstInLexRange(zl,range);
}
/* Get score pointer for the first element. */
serverAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addReplyDeferredLen(c);
if (eptr)
sptr = ziplistNext(zl,eptr);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
@ -3344,11 +3398,15 @@ void genericZrangebylexCommand(client *c, int reverse) {
}
while (eptr && limit--) {
double score = 0;
if (withscores) /* don't bother to extract the score if it's gonna be ignored. */
score = zzlGetScore(sptr);
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zzlLexValueGteMin(eptr,&range)) break;
if (!zzlLexValueGteMin(eptr,range)) break;
} else {
if (!zzlLexValueLteMax(eptr,&range)) break;
if (!zzlLexValueLteMax(eptr,range)) break;
}
/* We know the element exists, so ziplistGet should always
@ -3357,9 +3415,9 @@ void genericZrangebylexCommand(client *c, int reverse) {
rangelen++;
if (vstr == NULL) {
addReplyBulkLongLong(c,vlong);
handler->emitResultFromLongLong(handler, vlong, score);
} else {
addReplyBulkCBuffer(c,vstr,vlen);
handler->emitResultFromCBuffer(handler, vstr, vlen, score);
}
/* Move to next node */
@ -3376,23 +3434,11 @@ void genericZrangebylexCommand(client *c, int reverse) {
/* If reversed, get the last node in range as starting point. */
if (reverse) {
ln = zslLastInLexRange(zsl,&range);
ln = zslLastInLexRange(zsl,range);
} else {
ln = zslFirstInLexRange(zsl,&range);
ln = zslFirstInLexRange(zsl,range);
}
/* No "first" element in the specified interval. */
if (ln == NULL) {
addReply(c,shared.emptyarray);
zslFreeLexRange(&range);
return;
}
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addReplyDeferredLen(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (ln && offset--) {
@ -3406,13 +3452,13 @@ void genericZrangebylexCommand(client *c, int reverse) {
while (ln && limit--) {
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zslLexValueGteMin(ln->ele,&range)) break;
if (!zslLexValueGteMin(ln->ele,range)) break;
} else {
if (!zslLexValueLteMax(ln->ele,&range)) break;
if (!zslLexValueLteMax(ln->ele,range)) break;
}
rangelen++;
addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele));
handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score);
/* Move to next node */
if (reverse) {
@ -3425,16 +3471,171 @@ void genericZrangebylexCommand(client *c, int reverse) {
serverPanic("Unknown sorted set encoding");
}
zslFreeLexRange(&range);
setDeferredArrayLen(c, replylen, rangelen);
handler->finalizeResultEmission(handler, rangelen);
}
/* ZRANGEBYLEX <key> <min> <max> [LIMIT offset count] */
void zrangebylexCommand(client *c) {
genericZrangebylexCommand(c,0);
zrange_result_handler handler;
zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_FORWARD);
}
/* ZREVRANGEBYLEX <key> <min> <max> [LIMIT offset count] */
void zrevrangebylexCommand(client *c) {
genericZrangebylexCommand(c,1);
zrange_result_handler handler;
zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT);
zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_REVERSE);
}
/**
* This function handles ZRANGE and ZRANGESTORE, and also the deprecated
* Z[REV]RANGE[BYPOS|BYLEX] commands.
*
* The simple ZRANGE and ZRANGESTORE can take _AUTO in rangetype and direction,
* other command pass explicit value.
*
* The argc_start points to the src key argument, so following syntax is like:
* <src> <min> <max> [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count]
*/
void zrangeGenericCommand(zrange_result_handler *handler, int argc_start, int store,
zrange_type rangetype, zrange_direction direction)
{
client *c = handler->client;
robj *key = c->argv[argc_start];
robj *zobj;
zrangespec range;
zlexrangespec lexrange;
int minidx = argc_start + 1;
int maxidx = argc_start + 2;
/* Options common to all */
long opt_start = 0;
long opt_end = 0;
int opt_withscores = 0;
long opt_offset = 0;
long opt_limit = -1;
/* Step 1: Skip the <src> <min> <max> args and parse remaining optional arguments. */
for (int j=argc_start + 3; j < c->argc; j++) {
int leftargs = c->argc-j-1;
if (!store && !strcasecmp(c->argv[j]->ptr,"withscores")) {
opt_withscores = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
if ((getLongFromObjectOrReply(c, c->argv[j+1], &opt_offset, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[j+2], &opt_limit, NULL) != C_OK))
{
return;
}
j += 2;
} else if (direction == ZRANGE_DIRECTION_AUTO &&
!strcasecmp(c->argv[j]->ptr,"rev"))
{
direction = ZRANGE_DIRECTION_REVERSE;
} else if (rangetype == ZRANGE_AUTO &&
!strcasecmp(c->argv[j]->ptr,"bylex"))
{
rangetype = ZRANGE_LEX;
} else if (rangetype == ZRANGE_AUTO &&
!strcasecmp(c->argv[j]->ptr,"byscore"))
{
rangetype = ZRANGE_SCORE;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
/* Use defaults if not overriden by arguments. */
if (direction == ZRANGE_DIRECTION_AUTO)
direction = ZRANGE_DIRECTION_FORWARD;
if (rangetype == ZRANGE_AUTO)
rangetype = ZRANGE_RANK;
/* Check for conflicting arguments. */
if (opt_limit != -1 && rangetype == ZRANGE_RANK) {
addReplyError(c,"syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX");
return;
}
if (opt_withscores && rangetype == ZRANGE_LEX) {
addReplyError(c,"syntax error, WITHSCORES not supported in combination with BYLEX");
return;
}
if (direction == ZRANGE_DIRECTION_REVERSE &&
((ZRANGE_SCORE == rangetype) || (ZRANGE_LEX == rangetype)))
{
/* Range is given as [max,min] */
int tmp = maxidx;
maxidx = minidx;
minidx = tmp;
}
/* Step 2: Parse the range. */
switch (rangetype) {
case ZRANGE_AUTO:
case ZRANGE_RANK:
/* Z[REV]RANGE, ZRANGESTORE [REV]RANGE */
if ((getLongFromObjectOrReply(c, c->argv[minidx], &opt_start,NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[maxidx], &opt_end,NULL) != C_OK))
{
return;
}
break;
case ZRANGE_SCORE:
/* Z[REV]RANGEBYSCORE, ZRANGESTORE [REV]RANGEBYSCORE */
if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) {
addReplyError(c, "min or max is not a float");
return;
}
break;
case ZRANGE_LEX:
/* Z[REV]RANGEBYLEX, ZRANGESTORE [REV]RANGEBYLEX */
if (zslParseLexRange(c->argv[minidx], c->argv[maxidx], &lexrange) != C_OK) {
addReplyError(c, "min or max not valid string range item");
return;
}
break;
}
if (opt_withscores || store) {
zrangeResultHandlerScoreEmissionEnable(handler);
}
/* Step 3: Lookup the key and get the range. */
if (((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL)
|| checkType(c, zobj, OBJ_ZSET)) {
goto cleanup;
}
/* Step 4: Pass this to the command-specific handler. */
switch (rangetype) {
case ZRANGE_AUTO:
case ZRANGE_RANK:
genericZrangebyrankCommand(handler, zobj, opt_start, opt_end,
opt_withscores || store, direction == ZRANGE_DIRECTION_REVERSE);
break;
case ZRANGE_SCORE:
genericZrangebyscoreCommand(handler, &range, zobj, opt_withscores || store,
opt_offset, opt_limit, direction == ZRANGE_DIRECTION_REVERSE);
break;
case ZRANGE_LEX:
genericZrangebylexCommand(handler, &lexrange, zobj, opt_withscores || store,
opt_offset, opt_limit, direction == ZRANGE_DIRECTION_REVERSE);
break;
}
/* Instead of returning here, we'll just fall-through the clean-up. */
cleanup:
if (rangetype == ZRANGE_LEX) {
zslFreeLexRange(&lexrange);
}
}
void zcardCommand(client *c) {

View File

@ -1472,4 +1472,86 @@ start_server {tags {"zset"}} {
}
r config set zset-max-ziplist-entries $original_max
}
test {ZRANGESTORE basic} {
r flushall
r zadd z1 1 a 2 b 3 c 4 d
set res [r zrangestore z2 z1 0 -1]
assert_equal $res 4
r zrange z2 0 -1 withscores
} {a 1 b 2 c 3 d 4}
test {ZRANGESTORE range} {
set res [r zrangestore z2 z1 1 2]
assert_equal $res 2
r zrange z2 0 -1 withscores
} {b 2 c 3}
test {ZRANGESTORE BYLEX} {
set res [r zrangestore z2 z1 \[b \[c BYLEX]
assert_equal $res 2
r zrange z2 0 -1 withscores
} {b 2 c 3}
test {ZRANGESTORE BYSCORE} {
set res [r zrangestore z2 z1 1 2 BYSCORE]
assert_equal $res 2
r zrange z2 0 -1 withscores
} {a 1 b 2}
test {ZRANGESTORE BYSCORE LIMIT} {
set res [r zrangestore z2 z1 0 5 BYSCORE LIMIT 0 2]
assert_equal $res 2
r zrange z2 0 -1 withscores
} {a 1 b 2}
test {ZRANGESTORE BYSCORE REV LIMIT} {
set res [r zrangestore z2 z1 5 0 BYSCORE REV LIMIT 0 2]
assert_equal $res 2
r zrange z2 0 -1 withscores
} {c 3 d 4}
test {ZRANGE BYSCORE REV LIMIT} {
r zrange z1 5 0 BYSCORE REV LIMIT 0 2 WITHSCORES
} {d 4 c 3}
test {ZRANGESTORE - empty range} {
set res [r zrangestore z2 z1 5 6]
assert_equal $res 0
r exists z2
} {0}
test {ZRANGESTORE BYLEX - empty range} {
set res [r zrangestore z2 z1 \[f \[g BYLEX]
assert_equal $res 0
r exists z2
} {0}
test {ZRANGESTORE BYSCORE - empty range} {
set res [r zrangestore z2 z1 5 6 BYSCORE]
assert_equal $res 0
r exists z2
} {0}
test {ZRANGE BYLEX} {
r zrange z1 \[b \[c BYLEX
} {b c}
test {ZRANGESTORE invalid syntax} {
catch {r zrangestore z2 z1 0 -1 limit 1 2} err
assert_match "*syntax*" $err
catch {r zrangestore z2 z1 0 -1 WITHSCORES} err
assert_match "*syntax*" $err
}
test {ZRANGE invalid syntax} {
catch {r zrange z1 0 -1 limit 1 2} err
assert_match "*syntax*" $err
catch {r zrange z1 0 -1 BYLEX WITHSCORES} err
assert_match "*syntax*" $err
catch {r zrevrange z1 0 -1 BYSCORE} err
assert_match "*syntax*" $err
catch {r zrangebyscore z1 0 -1 REV} err
assert_match "*syntax*" $err
}
}