From b5029dfdadb387ba8a3b8517ec4a7117bf4070f4 Mon Sep 17 00:00:00 2001 From: "Jonah H. Harris" Date: Thu, 7 Jan 2021 03:58:53 -0500 Subject: [PATCH] Add ZRANGESTORE command, and improve ZSTORE command (#7844) Add ZRANGESTORE command, and improve ZSTORE command to deprecated Z[REV]RANGE[BYSCORE|BYLEX]. Syntax for the new ZRANGESTORE command: ZRANGESTORE [BYSCORE | BYLEX] [REV] [LIMIT offset count] New syntax for ZRANGE: ZRANGE [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] Old syntax for ZRANGE: ZRANGE [WITHSCORES] Other ZRANGE commands remain unchanged. The implementation uses common code for all of these, by utilizing a consumer interface that in one command response to the client, and in the other command stores a zset key. Co-authored-by: Oran Agra --- src/server.c | 4 + src/server.h | 1 + src/t_zset.c | 667 +++++++++++++++++++++++++-------------- tests/unit/type/zset.tcl | 82 +++++ 4 files changed, 521 insertions(+), 233 deletions(-) diff --git a/src/server.c b/src/server.c index dc5d52af8..6140b21ac 100644 --- a/src/server.c +++ b/src/server.c @@ -464,6 +464,10 @@ struct redisCommand redisCommandTable[] = { "read-only @sortedset", 0,NULL,1,1,1,0,0,0}, + {"zrangestore",zrangestoreCommand,-5, + "write use-memory @sortedset", + 0,NULL,1,2,1,0,0,0}, + {"zrangebyscore",zrangebyscoreCommand,-4, "read-only @sortedset", 0,NULL,1,1,1,0,0,0}, diff --git a/src/server.h b/src/server.h index 8b98f803d..b5610432c 100644 --- a/src/server.h +++ b/src/server.h @@ -2517,6 +2517,7 @@ void zinterstoreCommand(client *c); void zdiffstoreCommand(client *c); void zunionCommand(client *c); void zinterCommand(client *c); +void zrangestoreCommand(client *c); void zdiffCommand(client *c); void zscanCommand(client *c); void hkeysCommand(client *c); diff --git a/src/t_zset.c b/src/t_zset.c index 25c7dda4c..3d63c41c6 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1822,11 +1822,15 @@ void zremCommand(client *c) { addReplyLongLong(c,deleted); } +typedef enum { + ZRANGE_AUTO = 0, + ZRANGE_RANK, + ZRANGE_SCORE, + ZRANGE_LEX, +} zrange_type; + /* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */ -#define ZRANGE_RANK 0 -#define ZRANGE_SCORE 1 -#define ZRANGE_LEX 2 -void zremrangeGenericCommand(client *c, int rangetype) { +void zremrangeGenericCommand(client *c, zrange_type rangetype) { robj *key = c->argv[1]; robj *zobj; int keyremoved = 0; @@ -1834,22 +1838,28 @@ void zremrangeGenericCommand(client *c, int rangetype) { zrangespec range; zlexrangespec lexrange; long start, end, llen; + char *notify_type = NULL; /* Step 1: Parse the range. */ if (rangetype == ZRANGE_RANK) { + notify_type = "zremrangebyrank"; if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) || (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)) return; } else if (rangetype == ZRANGE_SCORE) { + notify_type = "zremrangebyscore"; if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) { addReplyError(c,"min or max is not a float"); return; } } else if (rangetype == ZRANGE_LEX) { + notify_type = "zremrangebylex"; if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) { addReplyError(c,"min or max not valid string range item"); return; } + } else { + serverPanic("unknown rangetype %d", (int)rangetype); } /* Step 2: Lookup & range sanity checks if needed. */ @@ -1875,6 +1885,7 @@ void zremrangeGenericCommand(client *c, int rangetype) { /* Step 3: Perform the range deletion operation. */ if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { switch(rangetype) { + case ZRANGE_AUTO: case ZRANGE_RANK: zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted); break; @@ -1892,6 +1903,7 @@ void zremrangeGenericCommand(client *c, int rangetype) { } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; switch(rangetype) { + case ZRANGE_AUTO: case ZRANGE_RANK: deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); break; @@ -1913,9 +1925,8 @@ void zremrangeGenericCommand(client *c, int rangetype) { /* Step 4: Notifications and reply. */ if (deleted) { - char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"}; signalModifiedKey(c,c->db,key); - notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id); + notifyKeyspaceEvent(NOTIFY_ZSET,notify_type,key,c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); } @@ -2778,27 +2789,180 @@ void zdiffCommand(client *c) { zunionInterDiffGenericCommand(c, NULL, 1, SET_OP_DIFF); } -void zrangeGenericCommand(client *c, int reverse) { - robj *key = c->argv[1]; - robj *zobj; - int withscores = 0; - long start; - long end; - long llen; - long rangelen; +typedef enum { + ZRANGE_DIRECTION_AUTO = 0, + ZRANGE_DIRECTION_FORWARD, + ZRANGE_DIRECTION_REVERSE +} zrange_direction; - if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || - (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; +typedef enum { + ZRANGE_CONSUMER_TYPE_CLIENT = 0, + ZRANGE_CONSUMER_TYPE_INTERNAL +} zrange_consumer_type; - if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) { - withscores = 1; - } else if (c->argc >= 5) { - addReplyErrorObject(c,shared.syntaxerr); - return; +typedef struct zrange_result_handler zrange_result_handler; + +typedef void (*zrangeResultBeginFunction)(zrange_result_handler *c); +typedef void (*zrangeResultFinalizeFunction)( + zrange_result_handler *c, size_t result_count); +typedef void (*zrangeResultEmitCBufferFunction)( + zrange_result_handler *c, const void *p, size_t len, double score); +typedef void (*zrangeResultEmitLongLongFunction)( + zrange_result_handler *c, long long ll, double score); + +void zrangeGenericCommand (zrange_result_handler *handler, int argc_start, int store, + zrange_type rangetype, zrange_direction direction); + +/* Interface struct for ZRANGE/ZRANGESTORE generic implementation. + * There is one implementation of this interface that sends a RESP reply to clients. + * and one implementation that stores the range result into a zset object. */ +struct zrange_result_handler { + zrange_consumer_type type; + client *client; + robj *dstkey; + robj *dstobj; + void *userdata; + int withscores; + int should_emit_array_length; + zrangeResultBeginFunction beginResultEmission; + zrangeResultFinalizeFunction finalizeResultEmission; + zrangeResultEmitCBufferFunction emitResultFromCBuffer; + zrangeResultEmitLongLongFunction emitResultFromLongLong; +}; + +/* Result handler methods for responding the ZRANGE to clients. */ +static void zrangeResultBeginClient(zrange_result_handler *handler) { + handler->userdata = addReplyDeferredLen(handler->client); +} + +static void zrangeResultEmitCBufferToClient(zrange_result_handler *handler, + const void *value, size_t value_length_in_bytes, double score) +{ + if (handler->should_emit_array_length) { + addReplyArrayLen(handler->client, 2); } - if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL - || checkType(c,zobj,OBJ_ZSET)) return; + addReplyBulkCBuffer(handler->client, value, value_length_in_bytes); + + if (handler->withscores) { + addReplyDouble(handler->client, score); + } +} + +static void zrangeResultEmitLongLongToClient(zrange_result_handler *handler, + long long value, double score) +{ + if (handler->should_emit_array_length) { + addReplyArrayLen(handler->client, 2); + } + + addReplyBulkLongLong(handler->client, value); + + if (handler->withscores) { + addReplyDouble(handler->client, score); + } +} + +static void zrangeResultFinalizeClient(zrange_result_handler *handler, + size_t result_count) +{ + if (handler->withscores && (handler->client->resp == 2)) { + result_count *= 2; + } + + setDeferredArrayLen(handler->client, handler->userdata, result_count); +} + +/* Result handler methods for storing the ZRANGESTORE to a zset. */ +static void zrangeResultBeginStore(zrange_result_handler *handler) +{ + handler->dstobj = createZsetZiplistObject(); +} + +static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler, + const void *value, size_t value_length_in_bytes, double score) +{ + double newscore; + int retflags = 0; + sds ele = sdsnewlen(value, value_length_in_bytes); + int retval = zsetAdd(handler->dstobj, score, ele, &retflags, &newscore); + sdsfree(ele); + serverAssert(retval); +} + +static void zrangeResultEmitLongLongForStore(zrange_result_handler *handler, + long long value, double score) +{ + double newscore; + int retflags = 0; + sds ele = sdsfromlonglong(value); + int retval = zsetAdd(handler->dstobj, score, ele, &retflags, &newscore); + sdsfree(ele); + serverAssert(retval); +} + +static void zrangeResultFinalizeStore(zrange_result_handler *handler, size_t result_count) +{ + if (result_count) { + setKey(handler->client, handler->client->db, handler->dstkey, handler->dstobj); + addReplyLongLong(handler->client, result_count); + notifyKeyspaceEvent(NOTIFY_ZSET, "zrangestore", handler->dstkey, handler->client->db->id); + server.dirty++; + } else { + addReply(handler->client, shared.czero); + if (dbDelete(handler->client->db, handler->dstkey)) { + signalModifiedKey(handler->client, handler->client->db, handler->dstkey); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", handler->dstkey, handler->client->db->id); + server.dirty++; + } + } + decrRefCount(handler->dstobj); +} + +/* Initialize the consumer interface type with the requested type. */ +static void zrangeResultHandlerInit(zrange_result_handler *handler, + client *client, zrange_consumer_type type) +{ + memset(handler, 0, sizeof(*handler)); + + handler->client = client; + + switch (type) { + case ZRANGE_CONSUMER_TYPE_CLIENT: + handler->beginResultEmission = zrangeResultBeginClient; + handler->finalizeResultEmission = zrangeResultFinalizeClient; + handler->emitResultFromCBuffer = zrangeResultEmitCBufferToClient; + handler->emitResultFromLongLong = zrangeResultEmitLongLongToClient; + break; + + case ZRANGE_CONSUMER_TYPE_INTERNAL: + handler->beginResultEmission = zrangeResultBeginStore; + handler->finalizeResultEmission = zrangeResultFinalizeStore; + handler->emitResultFromCBuffer = zrangeResultEmitCBufferForStore; + handler->emitResultFromLongLong = zrangeResultEmitLongLongForStore; + break; + } +} + +static void zrangeResultHandlerScoreEmissionEnable(zrange_result_handler *handler) { + handler->withscores = 1; + handler->should_emit_array_length = (handler->client->resp > 2); +} + +static void zrangeResultHandlerDestinationKeySet (zrange_result_handler *handler, + robj *dstkey) +{ + handler->dstkey = dstkey; +} + +/* This command implements ZRANGE, ZREVRANGE. */ +void genericZrangebyrankCommand(zrange_result_handler *handler, + robj *zobj, long start, long end, int withscores, int reverse) { + + client *c = handler->client; + long llen; + long rangelen; + size_t result_cardinality; /* Sanitize indexes. */ llen = zsetLength(zobj); @@ -2806,22 +2970,17 @@ void zrangeGenericCommand(client *c, int reverse) { if (end < 0) end = llen+end; if (start < 0) start = 0; + handler->beginResultEmission(handler); + /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { - addReply(c,shared.emptyarray); + handler->finalizeResultEmission(handler, 0); return; } if (end >= llen) end = llen-1; rangelen = (end-start)+1; - - /* Return the result in form of a multi-bulk reply. RESP3 clients - * will receive sub arrays with score->element, while RESP2 returned - * a flat array. */ - if (withscores && c->resp == 2) - addReplyArrayLen(c, rangelen*2); - else - addReplyArrayLen(c, rangelen); + result_cardinality = rangelen; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; @@ -2829,6 +2988,7 @@ void zrangeGenericCommand(client *c, int reverse) { unsigned char *vstr; unsigned int vlen; long long vlong; + double score = 0.0; if (reverse) eptr = ziplistIndex(zl,-2-(2*start)); @@ -2842,12 +3002,14 @@ void zrangeGenericCommand(client *c, int reverse) { serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); - if (withscores && c->resp > 2) addReplyArrayLen(c,2); - if (vstr == NULL) - addReplyBulkLongLong(c,vlong); - else - addReplyBulkCBuffer(c,vstr,vlen); - if (withscores) addReplyDouble(c,zzlGetScore(sptr)); + if (withscores) /* don't bother to extract the score if it's gonna be ignored. */ + score = zzlGetScore(sptr); + + if (vstr == NULL) { + handler->emitResultFromLongLong(handler, vlong, score); + } else { + handler->emitResultFromCBuffer(handler, vstr, vlen, score); + } if (reverse) zzlPrev(zl,&eptr,&sptr); @@ -2859,7 +3021,6 @@ void zrangeGenericCommand(client *c, int reverse) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; - sds ele; /* Check if starting point is trivial, before doing log(N) lookup. */ if (reverse) { @@ -2874,83 +3035,53 @@ void zrangeGenericCommand(client *c, int reverse) { while(rangelen--) { serverAssertWithInfo(c,zobj,ln != NULL); - ele = ln->ele; - if (withscores && c->resp > 2) addReplyArrayLen(c,2); - addReplyBulkCBuffer(c,ele,sdslen(ele)); - if (withscores) addReplyDouble(c,ln->score); + sds ele = ln->ele; + handler->emitResultFromCBuffer(handler, ele, sdslen(ele), ln->score); ln = reverse ? ln->backward : ln->level[0].forward; } } else { serverPanic("Unknown sorted set encoding"); } + + handler->finalizeResultEmission(handler, result_cardinality); } +/* ZRANGESTORE [BYSCORE | BYLEX] [REV] [LIMIT offset count] */ +void zrangestoreCommand (client *c) { + robj *dstkey = c->argv[1]; + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_INTERNAL); + zrangeResultHandlerDestinationKeySet(&handler, dstkey); + zrangeGenericCommand(&handler, 2, 1, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO); +} + +/* ZRANGE [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] */ void zrangeCommand(client *c) { - zrangeGenericCommand(c,0); + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_AUTO, ZRANGE_DIRECTION_AUTO); } +/* ZREVRANGE [WITHSCORES] */ void zrevrangeCommand(client *c) { - zrangeGenericCommand(c,1); + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_RANK, ZRANGE_DIRECTION_REVERSE); } /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */ -void genericZrangebyscoreCommand(client *c, int reverse) { - zrangespec range; - robj *key = c->argv[1]; - robj *zobj; - long offset = 0, limit = -1; - int withscores = 0; +void genericZrangebyscoreCommand(zrange_result_handler *handler, + zrangespec *range, robj *zobj, int withscores, long offset, + long limit, int reverse) { + + client *c = handler->client; unsigned long rangelen = 0; - void *replylen = NULL; - int minidx, maxidx; - /* Parse the range arguments. */ - if (reverse) { - /* Range is given as [max,min] */ - maxidx = 2; minidx = 3; - } else { - /* Range is given as [min,max] */ - minidx = 2; maxidx = 3; - } - - if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) { - addReplyError(c,"min or max is not a float"); - return; - } - - /* Parse optional extra arguments. Note that ZCOUNT will exactly have - * 4 arguments, so we'll never enter the following code path. */ - if (c->argc > 4) { - int remaining = c->argc - 4; - int pos = 4; - - while (remaining) { - if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) { - pos++; remaining--; - withscores = 1; - } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) { - if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) - != C_OK) || - (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) - != C_OK)) - { - return; - } - pos += 3; remaining -= 3; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - } - - /* Ok, lookup the key and get the range */ - if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL || - checkType(c,zobj,OBJ_ZSET)) return; + handler->beginResultEmission(handler); /* For invalid offset, return directly. */ if (offset > 0 && offset >= (long)zsetLength(zobj)) { - addReply(c,shared.emptyarray); + handler->finalizeResultEmission(handler, 0); return; } @@ -2960,29 +3091,17 @@ void genericZrangebyscoreCommand(client *c, int reverse) { unsigned char *vstr; unsigned int vlen; long long vlong; - double score; /* If reversed, get the last node in range as starting point. */ if (reverse) { - eptr = zzlLastInRange(zl,&range); + eptr = zzlLastInRange(zl,range); } else { - eptr = zzlFirstInRange(zl,&range); - } - - /* No "first" element in the specified interval. */ - if (eptr == NULL) { - addReply(c,shared.emptyarray); - return; + eptr = zzlFirstInRange(zl,range); } /* Get score pointer for the first element. */ - serverAssertWithInfo(c,zobj,eptr != NULL); - sptr = ziplistNext(zl,eptr); - - /* We don't know in advance how many matching elements there are in the - * list, so we push this object that will represent the multi-bulk - * length in the output buffer, and will "fix" it later */ - replylen = addReplyDeferredLen(c); + if (eptr) + sptr = ziplistNext(zl,eptr); /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ @@ -2995,13 +3114,13 @@ void genericZrangebyscoreCommand(client *c, int reverse) { } while (eptr && limit--) { - score = zzlGetScore(sptr); + double score = zzlGetScore(sptr); /* Abort when the node is no longer in range. */ if (reverse) { - if (!zslValueGteMin(score,&range)) break; + if (!zslValueGteMin(score,range)) break; } else { - if (!zslValueLteMax(score,&range)) break; + if (!zslValueLteMax(score,range)) break; } /* We know the element exists, so ziplistGet should always @@ -3009,13 +3128,11 @@ void genericZrangebyscoreCommand(client *c, int reverse) { serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); rangelen++; - if (withscores && c->resp > 2) addReplyArrayLen(c,2); if (vstr == NULL) { - addReplyBulkLongLong(c,vlong); + handler->emitResultFromLongLong(handler, vlong, score); } else { - addReplyBulkCBuffer(c,vstr,vlen); + handler->emitResultFromCBuffer(handler, vstr, vlen, score); } - if (withscores) addReplyDouble(c,score); /* Move to next node */ if (reverse) { @@ -3031,22 +3148,11 @@ void genericZrangebyscoreCommand(client *c, int reverse) { /* If reversed, get the last node in range as starting point. */ if (reverse) { - ln = zslLastInRange(zsl,&range); + ln = zslLastInRange(zsl,range); } else { - ln = zslFirstInRange(zsl,&range); + ln = zslFirstInRange(zsl,range); } - /* No "first" element in the specified interval. */ - if (ln == NULL) { - addReply(c,shared.emptyarray); - return; - } - - /* We don't know in advance how many matching elements there are in the - * list, so we push this object that will represent the multi-bulk - * length in the output buffer, and will "fix" it later */ - replylen = addReplyDeferredLen(c); - /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ while (ln && offset--) { @@ -3060,15 +3166,14 @@ void genericZrangebyscoreCommand(client *c, int reverse) { while (ln && limit--) { /* Abort when the node is no longer in range. */ if (reverse) { - if (!zslValueGteMin(ln->score,&range)) break; + if (!zslValueGteMin(ln->score,range)) break; } else { - if (!zslValueLteMax(ln->score,&range)) break; + if (!zslValueLteMax(ln->score,range)) break; } rangelen++; - if (withscores && c->resp > 2) addReplyArrayLen(c,2); - addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele)); - if (withscores) addReplyDouble(c,ln->score); + handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), + ((withscores) ? ln->score : ln->score)); /* Move to next node */ if (reverse) { @@ -3081,16 +3186,21 @@ void genericZrangebyscoreCommand(client *c, int reverse) { serverPanic("Unknown sorted set encoding"); } - if (withscores && c->resp == 2) rangelen *= 2; - setDeferredArrayLen(c, replylen, rangelen); + handler->finalizeResultEmission(handler, rangelen); } +/* ZRANGEBYSCORE [WITHSCORES] [LIMIT offset count] */ void zrangebyscoreCommand(client *c) { - genericZrangebyscoreCommand(c,0); + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_FORWARD); } +/* ZREVRANGEBYSCORE [WITHSCORES] [LIMIT offset count] */ void zrevrangebyscoreCommand(client *c) { - genericZrangebyscoreCommand(c,1); + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_SCORE, ZRANGE_DIRECTION_REVERSE); } void zcountCommand(client *c) { @@ -3250,58 +3360,14 @@ void zlexcountCommand(client *c) { } /* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */ -void genericZrangebylexCommand(client *c, int reverse) { - zlexrangespec range; - robj *key = c->argv[1]; - robj *zobj; - long offset = 0, limit = -1; +void genericZrangebylexCommand(zrange_result_handler *handler, + zlexrangespec *range, robj *zobj, int withscores, long offset, long limit, + int reverse) +{ + client *c = handler->client; unsigned long rangelen = 0; - void *replylen = NULL; - int minidx, maxidx; - /* Parse the range arguments. */ - if (reverse) { - /* Range is given as [max,min] */ - maxidx = 2; minidx = 3; - } else { - /* Range is given as [min,max] */ - minidx = 2; maxidx = 3; - } - - if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != C_OK) { - addReplyError(c,"min or max not valid string range item"); - return; - } - - /* Parse optional extra arguments. Note that ZCOUNT will exactly have - * 4 arguments, so we'll never enter the following code path. */ - if (c->argc > 4) { - int remaining = c->argc - 4; - int pos = 4; - - while (remaining) { - if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) { - if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) || - (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) { - zslFreeLexRange(&range); - return; - } - pos += 3; remaining -= 3; - } else { - zslFreeLexRange(&range); - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - } - - /* Ok, lookup the key and get the range */ - if ((zobj = lookupKeyReadOrReply(c,key,shared.emptyarray)) == NULL || - checkType(c,zobj,OBJ_ZSET)) - { - zslFreeLexRange(&range); - return; - } + handler->beginResultEmission(handler); if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; @@ -3312,26 +3378,14 @@ void genericZrangebylexCommand(client *c, int reverse) { /* If reversed, get the last node in range as starting point. */ if (reverse) { - eptr = zzlLastInLexRange(zl,&range); + eptr = zzlLastInLexRange(zl,range); } else { - eptr = zzlFirstInLexRange(zl,&range); - } - - /* No "first" element in the specified interval. */ - if (eptr == NULL) { - addReply(c,shared.emptyarray); - zslFreeLexRange(&range); - return; + eptr = zzlFirstInLexRange(zl,range); } /* Get score pointer for the first element. */ - serverAssertWithInfo(c,zobj,eptr != NULL); - sptr = ziplistNext(zl,eptr); - - /* We don't know in advance how many matching elements there are in the - * list, so we push this object that will represent the multi-bulk - * length in the output buffer, and will "fix" it later */ - replylen = addReplyDeferredLen(c); + if (eptr) + sptr = ziplistNext(zl,eptr); /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ @@ -3344,11 +3398,15 @@ void genericZrangebylexCommand(client *c, int reverse) { } while (eptr && limit--) { + double score = 0; + if (withscores) /* don't bother to extract the score if it's gonna be ignored. */ + score = zzlGetScore(sptr); + /* Abort when the node is no longer in range. */ if (reverse) { - if (!zzlLexValueGteMin(eptr,&range)) break; + if (!zzlLexValueGteMin(eptr,range)) break; } else { - if (!zzlLexValueLteMax(eptr,&range)) break; + if (!zzlLexValueLteMax(eptr,range)) break; } /* We know the element exists, so ziplistGet should always @@ -3357,9 +3415,9 @@ void genericZrangebylexCommand(client *c, int reverse) { rangelen++; if (vstr == NULL) { - addReplyBulkLongLong(c,vlong); + handler->emitResultFromLongLong(handler, vlong, score); } else { - addReplyBulkCBuffer(c,vstr,vlen); + handler->emitResultFromCBuffer(handler, vstr, vlen, score); } /* Move to next node */ @@ -3376,23 +3434,11 @@ void genericZrangebylexCommand(client *c, int reverse) { /* If reversed, get the last node in range as starting point. */ if (reverse) { - ln = zslLastInLexRange(zsl,&range); + ln = zslLastInLexRange(zsl,range); } else { - ln = zslFirstInLexRange(zsl,&range); + ln = zslFirstInLexRange(zsl,range); } - /* No "first" element in the specified interval. */ - if (ln == NULL) { - addReply(c,shared.emptyarray); - zslFreeLexRange(&range); - return; - } - - /* We don't know in advance how many matching elements there are in the - * list, so we push this object that will represent the multi-bulk - * length in the output buffer, and will "fix" it later */ - replylen = addReplyDeferredLen(c); - /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ while (ln && offset--) { @@ -3406,13 +3452,13 @@ void genericZrangebylexCommand(client *c, int reverse) { while (ln && limit--) { /* Abort when the node is no longer in range. */ if (reverse) { - if (!zslLexValueGteMin(ln->ele,&range)) break; + if (!zslLexValueGteMin(ln->ele,range)) break; } else { - if (!zslLexValueLteMax(ln->ele,&range)) break; + if (!zslLexValueLteMax(ln->ele,range)) break; } rangelen++; - addReplyBulkCBuffer(c,ln->ele,sdslen(ln->ele)); + handler->emitResultFromCBuffer(handler, ln->ele, sdslen(ln->ele), ln->score); /* Move to next node */ if (reverse) { @@ -3425,16 +3471,171 @@ void genericZrangebylexCommand(client *c, int reverse) { serverPanic("Unknown sorted set encoding"); } - zslFreeLexRange(&range); - setDeferredArrayLen(c, replylen, rangelen); + handler->finalizeResultEmission(handler, rangelen); } +/* ZRANGEBYLEX [LIMIT offset count] */ void zrangebylexCommand(client *c) { - genericZrangebylexCommand(c,0); + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_FORWARD); } +/* ZREVRANGEBYLEX [LIMIT offset count] */ void zrevrangebylexCommand(client *c) { - genericZrangebylexCommand(c,1); + zrange_result_handler handler; + zrangeResultHandlerInit(&handler, c, ZRANGE_CONSUMER_TYPE_CLIENT); + zrangeGenericCommand(&handler, 1, 0, ZRANGE_LEX, ZRANGE_DIRECTION_REVERSE); +} + +/** + * This function handles ZRANGE and ZRANGESTORE, and also the deprecated + * Z[REV]RANGE[BYPOS|BYLEX] commands. + * + * The simple ZRANGE and ZRANGESTORE can take _AUTO in rangetype and direction, + * other command pass explicit value. + * + * The argc_start points to the src key argument, so following syntax is like: + * [BYSCORE | BYLEX] [REV] [WITHSCORES] [LIMIT offset count] + */ +void zrangeGenericCommand(zrange_result_handler *handler, int argc_start, int store, + zrange_type rangetype, zrange_direction direction) +{ + client *c = handler->client; + robj *key = c->argv[argc_start]; + robj *zobj; + zrangespec range; + zlexrangespec lexrange; + int minidx = argc_start + 1; + int maxidx = argc_start + 2; + + /* Options common to all */ + long opt_start = 0; + long opt_end = 0; + int opt_withscores = 0; + long opt_offset = 0; + long opt_limit = -1; + + /* Step 1: Skip the args and parse remaining optional arguments. */ + for (int j=argc_start + 3; j < c->argc; j++) { + int leftargs = c->argc-j-1; + if (!store && !strcasecmp(c->argv[j]->ptr,"withscores")) { + opt_withscores = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) { + if ((getLongFromObjectOrReply(c, c->argv[j+1], &opt_offset, NULL) != C_OK) || + (getLongFromObjectOrReply(c, c->argv[j+2], &opt_limit, NULL) != C_OK)) + { + return; + } + j += 2; + } else if (direction == ZRANGE_DIRECTION_AUTO && + !strcasecmp(c->argv[j]->ptr,"rev")) + { + direction = ZRANGE_DIRECTION_REVERSE; + } else if (rangetype == ZRANGE_AUTO && + !strcasecmp(c->argv[j]->ptr,"bylex")) + { + rangetype = ZRANGE_LEX; + } else if (rangetype == ZRANGE_AUTO && + !strcasecmp(c->argv[j]->ptr,"byscore")) + { + rangetype = ZRANGE_SCORE; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + /* Use defaults if not overriden by arguments. */ + if (direction == ZRANGE_DIRECTION_AUTO) + direction = ZRANGE_DIRECTION_FORWARD; + if (rangetype == ZRANGE_AUTO) + rangetype = ZRANGE_RANK; + + /* Check for conflicting arguments. */ + if (opt_limit != -1 && rangetype == ZRANGE_RANK) { + addReplyError(c,"syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX"); + return; + } + if (opt_withscores && rangetype == ZRANGE_LEX) { + addReplyError(c,"syntax error, WITHSCORES not supported in combination with BYLEX"); + return; + } + + if (direction == ZRANGE_DIRECTION_REVERSE && + ((ZRANGE_SCORE == rangetype) || (ZRANGE_LEX == rangetype))) + { + /* Range is given as [max,min] */ + int tmp = maxidx; + maxidx = minidx; + minidx = tmp; + } + + /* Step 2: Parse the range. */ + switch (rangetype) { + case ZRANGE_AUTO: + case ZRANGE_RANK: + /* Z[REV]RANGE, ZRANGESTORE [REV]RANGE */ + if ((getLongFromObjectOrReply(c, c->argv[minidx], &opt_start,NULL) != C_OK) || + (getLongFromObjectOrReply(c, c->argv[maxidx], &opt_end,NULL) != C_OK)) + { + return; + } + break; + + case ZRANGE_SCORE: + /* Z[REV]RANGEBYSCORE, ZRANGESTORE [REV]RANGEBYSCORE */ + if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) { + addReplyError(c, "min or max is not a float"); + return; + } + break; + + case ZRANGE_LEX: + /* Z[REV]RANGEBYLEX, ZRANGESTORE [REV]RANGEBYLEX */ + if (zslParseLexRange(c->argv[minidx], c->argv[maxidx], &lexrange) != C_OK) { + addReplyError(c, "min or max not valid string range item"); + return; + } + break; + } + + if (opt_withscores || store) { + zrangeResultHandlerScoreEmissionEnable(handler); + } + + /* Step 3: Lookup the key and get the range. */ + if (((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL) + || checkType(c, zobj, OBJ_ZSET)) { + goto cleanup; + } + + /* Step 4: Pass this to the command-specific handler. */ + switch (rangetype) { + case ZRANGE_AUTO: + case ZRANGE_RANK: + genericZrangebyrankCommand(handler, zobj, opt_start, opt_end, + opt_withscores || store, direction == ZRANGE_DIRECTION_REVERSE); + break; + + case ZRANGE_SCORE: + genericZrangebyscoreCommand(handler, &range, zobj, opt_withscores || store, + opt_offset, opt_limit, direction == ZRANGE_DIRECTION_REVERSE); + break; + + case ZRANGE_LEX: + genericZrangebylexCommand(handler, &lexrange, zobj, opt_withscores || store, + opt_offset, opt_limit, direction == ZRANGE_DIRECTION_REVERSE); + break; + } + + /* Instead of returning here, we'll just fall-through the clean-up. */ + +cleanup: + + if (rangetype == ZRANGE_LEX) { + zslFreeLexRange(&lexrange); + } } void zcardCommand(client *c) { diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index c1406797b..8318ebb63 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -1472,4 +1472,86 @@ start_server {tags {"zset"}} { } r config set zset-max-ziplist-entries $original_max } + + test {ZRANGESTORE basic} { + r flushall + r zadd z1 1 a 2 b 3 c 4 d + set res [r zrangestore z2 z1 0 -1] + assert_equal $res 4 + r zrange z2 0 -1 withscores + } {a 1 b 2 c 3 d 4} + + test {ZRANGESTORE range} { + set res [r zrangestore z2 z1 1 2] + assert_equal $res 2 + r zrange z2 0 -1 withscores + } {b 2 c 3} + + test {ZRANGESTORE BYLEX} { + set res [r zrangestore z2 z1 \[b \[c BYLEX] + assert_equal $res 2 + r zrange z2 0 -1 withscores + } {b 2 c 3} + + test {ZRANGESTORE BYSCORE} { + set res [r zrangestore z2 z1 1 2 BYSCORE] + assert_equal $res 2 + r zrange z2 0 -1 withscores + } {a 1 b 2} + + test {ZRANGESTORE BYSCORE LIMIT} { + set res [r zrangestore z2 z1 0 5 BYSCORE LIMIT 0 2] + assert_equal $res 2 + r zrange z2 0 -1 withscores + } {a 1 b 2} + + test {ZRANGESTORE BYSCORE REV LIMIT} { + set res [r zrangestore z2 z1 5 0 BYSCORE REV LIMIT 0 2] + assert_equal $res 2 + r zrange z2 0 -1 withscores + } {c 3 d 4} + + test {ZRANGE BYSCORE REV LIMIT} { + r zrange z1 5 0 BYSCORE REV LIMIT 0 2 WITHSCORES + } {d 4 c 3} + + test {ZRANGESTORE - empty range} { + set res [r zrangestore z2 z1 5 6] + assert_equal $res 0 + r exists z2 + } {0} + + test {ZRANGESTORE BYLEX - empty range} { + set res [r zrangestore z2 z1 \[f \[g BYLEX] + assert_equal $res 0 + r exists z2 + } {0} + + test {ZRANGESTORE BYSCORE - empty range} { + set res [r zrangestore z2 z1 5 6 BYSCORE] + assert_equal $res 0 + r exists z2 + } {0} + + test {ZRANGE BYLEX} { + r zrange z1 \[b \[c BYLEX + } {b c} + + test {ZRANGESTORE invalid syntax} { + catch {r zrangestore z2 z1 0 -1 limit 1 2} err + assert_match "*syntax*" $err + catch {r zrangestore z2 z1 0 -1 WITHSCORES} err + assert_match "*syntax*" $err + } + + test {ZRANGE invalid syntax} { + catch {r zrange z1 0 -1 limit 1 2} err + assert_match "*syntax*" $err + catch {r zrange z1 0 -1 BYLEX WITHSCORES} err + assert_match "*syntax*" $err + catch {r zrevrange z1 0 -1 BYSCORE} err + assert_match "*syntax*" $err + catch {r zrangebyscore z1 0 -1 REV} err + assert_match "*syntax*" $err + } }