futriix-old/src/t_zset.c
Matt Stancliff 7f4ac3d19c [In-Progress] Add Geo Commands
Current todo:
  - replace functions in zset.{c,h} with a new unified Redis
    zset access API.

Once we get the zset interface fixed, we can squash
relevant commits in this branch and have one nice commit
to merge into unstable.

This commit adds:
  - Geo commands
  - Tests; runnable with: ./runtest --single unit/geo
  - Geo helpers in deps/geohash-int/
  - src/geo.{c,h} and src/geojson.{c,h} implementing geo commands
  - Updated build configurations to get everything working
  - TEMPORARY: src/zset.{c,h} implementing zset score and zset
    range reading without writing to client output buffers.
  - Modified linkage of one t_zset.c function for use in zset.c

Conflicts:
	src/Makefile
	src/redis.c
2015-06-22 09:07:13 +02:00

2919 lines
94 KiB
C

/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*-----------------------------------------------------------------------------
* Sorted set API
*----------------------------------------------------------------------------*/
/* ZSETs are ordered sets using two data structures to hold the same elements
* in order to get O(log(N)) INSERT and REMOVE operations into a sorted
* data structure.
*
* The elements are added to a hash table mapping Redis objects to scores.
* At the same time the elements are added to a skip list mapping scores
* to Redis objects (so objects are sorted by scores in this "view"). */
/* This skiplist implementation is almost a C translation of the original
* algorithm described by William Pugh in "Skip Lists: A Probabilistic
* Alternative to Balanced Trees", modified in three ways:
* a) this implementation allows for repeated scores.
* b) the comparison is not just by key (our 'score') but by satellite data.
* c) there is a back pointer, so it's a doubly linked list with the back
* pointers being only at "level 1". This allows to traverse the list
* from tail to head, useful for ZREVRANGE. */
#include "redis.h"
#include <math.h>
static int zslLexValueGteMin(robj *value, zlexrangespec *spec);
static int zslLexValueLteMax(robj *value, zlexrangespec *spec);
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
}
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
void zslFreeNode(zskiplistNode *node) {
decrRefCount(node->obj);
zfree(node);
}
void zslFree(zskiplist *zsl) {
zskiplistNode *node = zsl->header->level[0].forward, *next;
zfree(zsl->header);
while(node) {
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zsl);
}
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
/* Delete an element with matching score/object from the skiplist. */
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
}
return 0; /* not found */
}
static int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
int zslValueLteMax(double value, zrangespec *spec) {
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}
/* Returns if there is a part of the zset is in range. */
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
/* Test for ranges that will always be empty. */
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
x = zsl->tail;
if (x == NULL || !zslValueGteMin(x->score,range))
return 0;
x = zsl->header->level[0].forward;
if (x == NULL || !zslValueLteMax(x->score,range))
return 0;
return 1;
}
/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score,range))
x = x->level[i].forward;
}
/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward;
redisAssert(x != NULL);
/* Check if score <= max. */
if (!zslValueLteMax(x->score,range)) return NULL;
return x;
}
/* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *IN* range. */
while (x->level[i].forward &&
zslValueLteMax(x->level[i].forward->score,range))
x = x->level[i].forward;
}
/* This is an inner range, so this node cannot be NULL. */
redisAssert(x != NULL);
/* Check if score >= min. */
if (!zslValueGteMin(x->score,range)) return NULL;
return x;
}
/* Delete all the elements with score between min and max from the skiplist.
* Min and max are inclusive, so a score >= min || score <= max is deleted.
* Note that this function takes the reference to the hash table view of the
* sorted set, in order to remove the elements from the hash table too. */
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (range->minex ?
x->level[i].forward->score <= range->min :
x->level[i].forward->score < range->min))
x = x->level[i].forward;
update[i] = x;
}
/* Current node is the last with score < or <= min. */
x = x->level[0].forward;
/* Delete nodes while in range. */
while (x &&
(range->maxex ? x->score < range->max : x->score <= range->max))
{
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}
unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
!zslLexValueGteMin(x->level[i].forward->obj,range))
x = x->level[i].forward;
update[i] = x;
}
/* Current node is the last with score < or <= min. */
x = x->level[0].forward;
/* Delete nodes while in range. */
while (x && zslLexValueLteMax(x->obj,range)) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}
/* Delete all the elements with rank between start and end from the skiplist.
* Start and end are inclusive. Note that start and end need to be 1-based */
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long traversed = 0, removed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) < start) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
traversed++;
x = x->level[0].forward;
while (x && traversed <= end) {
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
traversed++;
x = next;
}
return removed;
}
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
return 0;
}
/* Finds an element by its rank. The rank argument needs to be 1-based. */
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
/* Populate the rangespec according to the objects min and max. */
static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
char *eptr;
spec->minex = spec->maxex = 0;
/* Parse the min-max interval. If one of the values is prefixed
* by the "(" character, it's considered "open". For instance
* ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
* ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
if (min->encoding == REDIS_ENCODING_INT) {
spec->min = (long)min->ptr;
} else {
if (((char*)min->ptr)[0] == '(') {
spec->min = strtod((char*)min->ptr+1,&eptr);
if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
spec->minex = 1;
} else {
spec->min = strtod((char*)min->ptr,&eptr);
if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
}
}
if (max->encoding == REDIS_ENCODING_INT) {
spec->max = (long)max->ptr;
} else {
if (((char*)max->ptr)[0] == '(') {
spec->max = strtod((char*)max->ptr+1,&eptr);
if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
spec->maxex = 1;
} else {
spec->max = strtod((char*)max->ptr,&eptr);
if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
}
}
return REDIS_OK;
}
/* ------------------------ Lexicographic ranges ---------------------------- */
/* Parse max or min argument of ZRANGEBYLEX.
* (foo means foo (open interval)
* [foo means foo (closed interval)
* - means the min string possible
* + means the max string possible
*
* If the string is valid the *dest pointer is set to the redis object
* that will be used for the comparision, and ex will be set to 0 or 1
* respectively if the item is exclusive or inclusive. REDIS_OK will be
* returned.
*
* If the string is not a valid range REDIS_ERR is returned, and the value
* of *dest and *ex is undefined. */
int zslParseLexRangeItem(robj *item, robj **dest, int *ex) {
char *c = item->ptr;
switch(c[0]) {
case '+':
if (c[1] != '\0') return REDIS_ERR;
*ex = 0;
*dest = shared.maxstring;
incrRefCount(shared.maxstring);
return REDIS_OK;
case '-':
if (c[1] != '\0') return REDIS_ERR;
*ex = 0;
*dest = shared.minstring;
incrRefCount(shared.minstring);
return REDIS_OK;
case '(':
*ex = 1;
*dest = createStringObject(c+1,sdslen(c)-1);
return REDIS_OK;
case '[':
*ex = 0;
*dest = createStringObject(c+1,sdslen(c)-1);
return REDIS_OK;
default:
return REDIS_ERR;
}
}
/* Populate the rangespec according to the objects min and max.
*
* Return REDIS_OK on success. On error REDIS_ERR is returned.
* When OK is returned the structure must be freed with zslFreeLexRange(),
* otherwise no release is needed. */
static int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
/* The range can't be valid if objects are integer encoded.
* Every item must start with ( or [. */
if (min->encoding == REDIS_ENCODING_INT ||
max->encoding == REDIS_ENCODING_INT) return REDIS_ERR;
spec->min = spec->max = NULL;
if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == REDIS_ERR ||
zslParseLexRangeItem(max, &spec->max, &spec->maxex) == REDIS_ERR) {
if (spec->min) decrRefCount(spec->min);
if (spec->max) decrRefCount(spec->max);
return REDIS_ERR;
} else {
return REDIS_OK;
}
}
/* Free a lex range structure, must be called only after zelParseLexRange()
* populated the structure with success (REDIS_OK returned). */
void zslFreeLexRange(zlexrangespec *spec) {
decrRefCount(spec->min);
decrRefCount(spec->max);
}
/* This is just a wrapper to compareStringObjects() that is able to
* handle shared.minstring and shared.maxstring as the equivalent of
* -inf and +inf for strings */
int compareStringObjectsForLexRange(robj *a, robj *b) {
if (a == b) return 0; /* This makes sure that we handle inf,inf and
-inf,-inf ASAP. One special case less. */
if (a == shared.minstring || b == shared.maxstring) return -1;
if (a == shared.maxstring || b == shared.minstring) return 1;
return compareStringObjects(a,b);
}
static int zslLexValueGteMin(robj *value, zlexrangespec *spec) {
return spec->minex ?
(compareStringObjectsForLexRange(value,spec->min) > 0) :
(compareStringObjectsForLexRange(value,spec->min) >= 0);
}
static int zslLexValueLteMax(robj *value, zlexrangespec *spec) {
return spec->maxex ?
(compareStringObjectsForLexRange(value,spec->max) < 0) :
(compareStringObjectsForLexRange(value,spec->max) <= 0);
}
/* Returns if there is a part of the zset is in the lex range. */
int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *x;
/* Test for ranges that will always be empty. */
if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
(compareStringObjects(range->min,range->max) == 0 &&
(range->minex || range->maxex)))
return 0;
x = zsl->tail;
if (x == NULL || !zslLexValueGteMin(x->obj,range))
return 0;
x = zsl->header->level[0].forward;
if (x == NULL || !zslLexValueLteMax(x->obj,range))
return 0;
return 1;
}
/* Find the first node that is contained in the specified lex range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInLexRange(zsl,range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level[i].forward &&
!zslLexValueGteMin(x->level[i].forward->obj,range))
x = x->level[i].forward;
}
/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward;
redisAssert(x != NULL);
/* Check if score <= max. */
if (!zslLexValueLteMax(x->obj,range)) return NULL;
return x;
}
/* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInLexRange(zsl,range)) return NULL;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *IN* range. */
while (x->level[i].forward &&
zslLexValueLteMax(x->level[i].forward->obj,range))
x = x->level[i].forward;
}
/* This is an inner range, so this node cannot be NULL. */
redisAssert(x != NULL);
/* Check if score >= min. */
if (!zslLexValueGteMin(x->obj,range)) return NULL;
return x;
}
/*-----------------------------------------------------------------------------
* Ziplist-backed sorted set API
*----------------------------------------------------------------------------*/
double zzlGetScore(unsigned char *sptr) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
char buf[128];
double score;
redisAssert(sptr != NULL);
redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
if (vstr) {
memcpy(buf,vstr,vlen);
buf[vlen] = '\0';
score = strtod(buf,NULL);
} else {
score = vlong;
}
return score;
}
/* Return a ziplist element as a Redis string object.
* This simple abstraction can be used to simplifies some code at the
* cost of some performance. */
robj *ziplistGetObject(unsigned char *sptr) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
redisAssert(sptr != NULL);
redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
if (vstr) {
return createStringObject((char*)vstr,vlen);
} else {
return createStringObjectFromLongLong(vlong);
}
}
/* Compare element in sorted set with given element. */
int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
unsigned char vbuf[32];
int minlen, cmp;
redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL) {
/* Store string representation of long long in buf. */
vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
vstr = vbuf;
}
minlen = (vlen < clen) ? vlen : clen;
cmp = memcmp(vstr,cstr,minlen);
if (cmp == 0) return vlen-clen;
return cmp;
}
unsigned int zzlLength(unsigned char *zl) {
return ziplistLen(zl)/2;
}
/* Move to next entry based on the values in eptr and sptr. Both are set to
* NULL when there is no next entry. */
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
unsigned char *_eptr, *_sptr;
redisAssert(*eptr != NULL && *sptr != NULL);
_eptr = ziplistNext(zl,*sptr);
if (_eptr != NULL) {
_sptr = ziplistNext(zl,_eptr);
redisAssert(_sptr != NULL);
} else {
/* No next entry. */
_sptr = NULL;
}
*eptr = _eptr;
*sptr = _sptr;
}
/* Move to the previous entry based on the values in eptr and sptr. Both are
* set to NULL when there is no next entry. */
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
unsigned char *_eptr, *_sptr;
redisAssert(*eptr != NULL && *sptr != NULL);
_sptr = ziplistPrev(zl,*eptr);
if (_sptr != NULL) {
_eptr = ziplistPrev(zl,_sptr);
redisAssert(_eptr != NULL);
} else {
/* No previous entry. */
_eptr = NULL;
}
*eptr = _eptr;
*sptr = _sptr;
}
/* Returns if there is a part of the zset is in range. Should only be used
* internally by zzlFirstInRange and zzlLastInRange. */
int zzlIsInRange(unsigned char *zl, zrangespec *range) {
unsigned char *p;
double score;
/* Test for ranges that will always be empty. */
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
p = ziplistIndex(zl,-1); /* Last score. */
if (p == NULL) return 0; /* Empty sorted set */
score = zzlGetScore(p);
if (!zslValueGteMin(score,range))
return 0;
p = ziplistIndex(zl,1); /* First score. */
redisAssert(p != NULL);
score = zzlGetScore(p);
if (!zslValueLteMax(score,range))
return 0;
return 1;
}
/* Find pointer to the first element contained in the specified range.
* Returns NULL when no element is contained in the range. */
unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double score;
/* If everything is out of range, return early. */
if (!zzlIsInRange(zl,range)) return NULL;
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
redisAssert(sptr != NULL);
score = zzlGetScore(sptr);
if (zslValueGteMin(score,range)) {
/* Check if score <= max. */
if (zslValueLteMax(score,range))
return eptr;
return NULL;
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
return NULL;
}
/* Find pointer to the last element contained in the specified range.
* Returns NULL when no element is contained in the range. */
unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range) {
unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
double score;
/* If everything is out of range, return early. */
if (!zzlIsInRange(zl,range)) return NULL;
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
redisAssert(sptr != NULL);
score = zzlGetScore(sptr);
if (zslValueLteMax(score,range)) {
/* Check if score >= min. */
if (zslValueGteMin(score,range))
return eptr;
return NULL;
}
/* Move to previous element by moving to the score of previous element.
* When this returns NULL, we know there also is no element. */
sptr = ziplistPrev(zl,eptr);
if (sptr != NULL)
redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
else
eptr = NULL;
}
return NULL;
}
static int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec) {
robj *value = ziplistGetObject(p);
int res = zslLexValueGteMin(value,spec);
decrRefCount(value);
return res;
}
static int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec) {
robj *value = ziplistGetObject(p);
int res = zslLexValueLteMax(value,spec);
decrRefCount(value);
return res;
}
/* Returns if there is a part of the zset is in range. Should only be used
* internally by zzlFirstInRange and zzlLastInRange. */
int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
unsigned char *p;
/* Test for ranges that will always be empty. */
if (compareStringObjectsForLexRange(range->min,range->max) > 1 ||
(compareStringObjects(range->min,range->max) == 0 &&
(range->minex || range->maxex)))
return 0;
p = ziplistIndex(zl,-2); /* Last element. */
if (p == NULL) return 0;
if (!zzlLexValueGteMin(p,range))
return 0;
p = ziplistIndex(zl,0); /* First element. */
redisAssert(p != NULL);
if (!zzlLexValueLteMax(p,range))
return 0;
return 1;
}
/* Find pointer to the first element contained in the specified lex range.
* Returns NULL when no element is contained in the range. */
unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
/* If everything is out of range, return early. */
if (!zzlIsInLexRange(zl,range)) return NULL;
while (eptr != NULL) {
if (zzlLexValueGteMin(eptr,range)) {
/* Check if score <= max. */
if (zzlLexValueLteMax(eptr,range))
return eptr;
return NULL;
}
/* Move to next element. */
sptr = ziplistNext(zl,eptr); /* This element score. Skip it. */
redisAssert(sptr != NULL);
eptr = ziplistNext(zl,sptr); /* Next element. */
}
return NULL;
}
/* Find pointer to the last element contained in the specified lex range.
* Returns NULL when no element is contained in the range. */
unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range) {
unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
/* If everything is out of range, return early. */
if (!zzlIsInLexRange(zl,range)) return NULL;
while (eptr != NULL) {
if (zzlLexValueLteMax(eptr,range)) {
/* Check if score >= min. */
if (zzlLexValueGteMin(eptr,range))
return eptr;
return NULL;
}
/* Move to previous element by moving to the score of previous element.
* When this returns NULL, we know there also is no element. */
sptr = ziplistPrev(zl,eptr);
if (sptr != NULL)
redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
else
eptr = NULL;
}
return NULL;
}
unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
ele = getDecodedObject(ele);
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(NULL,ele,sptr != NULL);
if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
/* Matching element, pull out score. */
if (score != NULL) *score = zzlGetScore(sptr);
decrRefCount(ele);
return eptr;
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
decrRefCount(ele);
return NULL;
}
/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
* don't want to modify the one given as argument. */
unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
unsigned char *p = eptr;
/* TODO: add function to ziplist API to delete N elements from offset. */
zl = ziplistDelete(zl,&p);
zl = ziplistDelete(zl,&p);
return zl;
}
unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
unsigned char *sptr;
char scorebuf[128];
int scorelen;
size_t offset;
redisAssertWithInfo(NULL,ele,sdsEncodedObject(ele));
scorelen = d2string(scorebuf,sizeof(scorebuf),score);
if (eptr == NULL) {
zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
} else {
/* Keep offset relative to zl, as it might be re-allocated. */
offset = eptr-zl;
zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
eptr = zl+offset;
/* Insert score after the element. */
redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
}
return zl;
}
/* Insert (element,score) pair in ziplist. This function assumes the element is
* not yet present in the list. */
unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double s;
ele = getDecodedObject(ele);
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(NULL,ele,sptr != NULL);
s = zzlGetScore(sptr);
if (s > score) {
/* First element with score larger than score for element to be
* inserted. This means we should take its spot in the list to
* maintain ordering. */
zl = zzlInsertAt(zl,eptr,ele,score);
break;
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
zl = zzlInsertAt(zl,eptr,ele,score);
break;
}
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
/* Push on tail of list when it was not yet inserted. */
if (eptr == NULL)
zl = zzlInsertAt(zl,NULL,ele,score);
decrRefCount(ele);
return zl;
}
unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
unsigned char *eptr, *sptr;
double score;
unsigned long num = 0;
if (deleted != NULL) *deleted = 0;
eptr = zzlFirstInRange(zl,range);
if (eptr == NULL) return zl;
/* When the tail of the ziplist is deleted, eptr will point to the sentinel
* byte and ziplistNext will return NULL. */
while ((sptr = ziplistNext(zl,eptr)) != NULL) {
score = zzlGetScore(sptr);
if (zslValueLteMax(score,range)) {
/* Delete both the element and the score. */
zl = ziplistDelete(zl,&eptr);
zl = ziplistDelete(zl,&eptr);
num++;
} else {
/* No longer in range. */
break;
}
}
if (deleted != NULL) *deleted = num;
return zl;
}
unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
unsigned char *eptr, *sptr;
unsigned long num = 0;
if (deleted != NULL) *deleted = 0;
eptr = zzlFirstInLexRange(zl,range);
if (eptr == NULL) return zl;
/* When the tail of the ziplist is deleted, eptr will point to the sentinel
* byte and ziplistNext will return NULL. */
while ((sptr = ziplistNext(zl,eptr)) != NULL) {
if (zzlLexValueLteMax(eptr,range)) {
/* Delete both the element and the score. */
zl = ziplistDelete(zl,&eptr);
zl = ziplistDelete(zl,&eptr);
num++;
} else {
/* No longer in range. */
break;
}
}
if (deleted != NULL) *deleted = num;
return zl;
}
/* Delete all the elements with rank between start and end from the skiplist.
* Start and end are inclusive. Note that start and end need to be 1-based */
unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
unsigned int num = (end-start)+1;
if (deleted) *deleted = num;
zl = ziplistDeleteRange(zl,2*(start-1),2*num);
return zl;
}
/*-----------------------------------------------------------------------------
* Common sorted set API
*----------------------------------------------------------------------------*/
unsigned int zsetLength(robj *zobj) {
int length = -1;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
length = zzlLength(zobj->ptr);
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
length = ((zset*)zobj->ptr)->zsl->length;
} else {
redisPanic("Unknown sorted set encoding");
}
return length;
}
void zsetConvert(robj *zobj, int encoding) {
zset *zs;
zskiplistNode *node, *next;
robj *ele;
double score;
if (zobj->encoding == encoding) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (encoding != REDIS_ENCODING_SKIPLIST)
redisPanic("Unknown target encoding");
zs = zmalloc(sizeof(*zs));
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
eptr = ziplistIndex(zl,0);
redisAssertWithInfo(NULL,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(NULL,zobj,sptr != NULL);
while (eptr != NULL) {
score = zzlGetScore(sptr);
redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = createStringObjectFromLongLong(vlong);
else
ele = createStringObject((char*)vstr,vlen);
/* Has incremented refcount since it was just created. */
node = zslInsert(zs->zsl,score,ele);
redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
zzlNext(zl,&eptr,&sptr);
}
zfree(zobj->ptr);
zobj->ptr = zs;
zobj->encoding = REDIS_ENCODING_SKIPLIST;
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
unsigned char *zl = ziplistNew();
if (encoding != REDIS_ENCODING_ZIPLIST)
redisPanic("Unknown target encoding");
/* Approach similar to zslFree(), since we want to free the skiplist at
* the same time as creating the ziplist. */
zs = zobj->ptr;
dictRelease(zs->dict);
node = zs->zsl->header->level[0].forward;
zfree(zs->zsl->header);
zfree(zs->zsl);
while (node) {
ele = getDecodedObject(node->obj);
zl = zzlInsertAt(zl,NULL,ele,node->score);
decrRefCount(ele);
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zs);
zobj->ptr = zl;
zobj->encoding = REDIS_ENCODING_ZIPLIST;
} else {
redisPanic("Unknown sorted set encoding");
}
}
/*-----------------------------------------------------------------------------
* Sorted set commands
*----------------------------------------------------------------------------*/
/* This generic command implements both ZADD and ZINCRBY. */
#define ZADD_NONE 0
#define ZADD_INCR (1<<0) /* Increment the score instead of setting it. */
#define ZADD_NX (1<<1) /* Don't touch elements not already existing. */
#define ZADD_XX (1<<2) /* Only touch elements already exisitng. */
#define ZADD_CH (1<<3) /* Return num of elements added or updated. */
void zaddGenericCommand(redisClient *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *ele;
robj *zobj;
robj *curobj;
double score = 0, *scores = NULL, curscore = 0.0;
int j, elements;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
int added = 0; /* Number of new elements added. */
int updated = 0; /* Number of elements with updated score. */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. */
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
else break;
scoreidx++;
}
/* Turn options into simple to check vars. */
int incr = (flags & ZADD_INCR) != 0;
int nx = (flags & ZADD_NX) != 0;
int xx = (flags & ZADD_XX) != 0;
int ch = (flags & ZADD_CH) != 0;
/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
elements = c->argc-scoreidx;
if (elements % 2) {
addReply(c,shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */
/* Check for incompatible options. */
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= REDIS_OK) goto cleanup;
}
/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != REDIS_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
for (j = 0; j < elements; j++) {
score = scores[j];
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *eptr;
/* Prefer non-encoded element when dealing with ziplists. */
ele = c->argv[scoreidx+1+j*2];
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
if (nx) continue;
if (incr) {
score += curscore;
if (isnan(score)) {
addReplyError(c,nanerr);
goto cleanup;
}
}
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
server.dirty++;
updated++;
}
processed++;
} else if (!xx) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
server.dirty++;
added++;
processed++;
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
ele = c->argv[scoreidx+1+j*2] =
tryObjectEncoding(c->argv[scoreidx+1+j*2]);
de = dictFind(zs->dict,ele);
if (de != NULL) {
if (nx) continue;
curobj = dictGetKey(de);
curscore = *(double*)dictGetVal(de);
if (incr) {
score += curscore;
if (isnan(score)) {
addReplyError(c,nanerr);
/* Don't need to check if the sorted set is empty
* because we know it has at least one element. */
goto cleanup;
}
}
/* Remove and re-insert when score changed. We can safely
* delete the key object from the skiplist, since the
* dictionary still has a reference to it. */
if (score != curscore) {
redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
znode = zslInsert(zs->zsl,score,curobj);
incrRefCount(curobj); /* Re-inserted in skiplist. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
server.dirty++;
updated++;
}
processed++;
} else if (!xx) {
znode = zslInsert(zs->zsl,score,ele);
incrRefCount(ele); /* Inserted in skiplist. */
redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
server.dirty++;
added++;
processed++;
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReply(c,shared.nullbulk);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
void zaddCommand(redisClient *c) {
zaddGenericCommand(c,ZADD_NONE);
}
void zincrbyCommand(redisClient *c) {
zaddGenericCommand(c,ZADD_INCR);
}
void zremCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
int deleted = 0, keyremoved = 0, j;
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *eptr;
for (j = 2; j < c->argc; j++) {
if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
deleted++;
zobj->ptr = zzlDelete(zobj->ptr,eptr);
if (zzlLength(zobj->ptr) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
break;
}
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
double score;
for (j = 2; j < c->argc; j++) {
de = dictFind(zs->dict,c->argv[j]);
if (de != NULL) {
deleted++;
/* Delete from the skiplist */
score = *(double*)dictGetVal(de);
redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
/* Delete from the hash table */
dictDelete(zs->dict,c->argv[j]);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
break;
}
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
if (deleted) {
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,"zrem",key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
signalModifiedKey(c->db,key);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}
/* Implements ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZREMRANGEBYLEX commands. */
#define ZRANGE_RANK 0
#define ZRANGE_SCORE 1
#define ZRANGE_LEX 2
void zremrangeGenericCommand(redisClient *c, int rangetype) {
robj *key = c->argv[1];
robj *zobj;
int keyremoved = 0;
unsigned long deleted = 0;
zrangespec range;
zlexrangespec lexrange;
long start, end, llen;
/* Step 1: Parse the range. */
if (rangetype == ZRANGE_RANK) {
if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != REDIS_OK))
return;
} else if (rangetype == ZRANGE_SCORE) {
if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
addReplyError(c,"min or max is not a float");
return;
}
} else if (rangetype == ZRANGE_LEX) {
if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != REDIS_OK) {
addReplyError(c,"min or max not valid string range item");
return;
}
}
/* Step 2: Lookup & range sanity checks if needed. */
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) goto cleanup;
if (rangetype == ZRANGE_RANK) {
/* Sanitize indexes. */
llen = zsetLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.czero);
goto cleanup;
}
if (end >= llen) end = llen-1;
}
/* Step 3: Perform the range deletion operation. */
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
switch(rangetype) {
case ZRANGE_RANK:
zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
break;
case ZRANGE_SCORE:
zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted);
break;
case ZRANGE_LEX:
zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted);
break;
}
if (zzlLength(zobj->ptr) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
switch(rangetype) {
case ZRANGE_RANK:
deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
break;
case ZRANGE_SCORE:
deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict);
break;
case ZRANGE_LEX:
deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict);
break;
}
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) {
dbDelete(c->db,key);
keyremoved = 1;
}
} else {
redisPanic("Unknown sorted set encoding");
}
/* Step 4: Notifications and reply. */
if (deleted) {
char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"};
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,event[rangetype],key,c->db->id);
if (keyremoved)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
}
server.dirty += deleted;
addReplyLongLong(c,deleted);
cleanup:
if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);
}
void zremrangebyrankCommand(redisClient *c) {
zremrangeGenericCommand(c,ZRANGE_RANK);
}
void zremrangebyscoreCommand(redisClient *c) {
zremrangeGenericCommand(c,ZRANGE_SCORE);
}
void zremrangebylexCommand(redisClient *c) {
zremrangeGenericCommand(c,ZRANGE_LEX);
}
typedef struct {
robj *subject;
int type; /* Set, sorted set */
int encoding;
double weight;
union {
/* Set iterators. */
union _iterset {
struct {
intset *is;
int ii;
} is;
struct {
dict *dict;
dictIterator *di;
dictEntry *de;
} ht;
} set;
/* Sorted set iterators. */
union _iterzset {
struct {
unsigned char *zl;
unsigned char *eptr, *sptr;
} zl;
struct {
zset *zs;
zskiplistNode *node;
} sl;
} zset;
} iter;
} zsetopsrc;
/* Use dirty flags for pointers that need to be cleaned up in the next
* iteration over the zsetopval. The dirty flag for the long long value is
* special, since long long values don't need cleanup. Instead, it means that
* we already checked that "ell" holds a long long, or tried to convert another
* representation into a long long value. When this was successful,
* OPVAL_VALID_LL is set as well. */
#define OPVAL_DIRTY_ROBJ 1
#define OPVAL_DIRTY_LL 2
#define OPVAL_VALID_LL 4
/* Store value retrieved from the iterator. */
typedef struct {
int flags;
unsigned char _buf[32]; /* Private buffer. */
robj *ele;
unsigned char *estr;
unsigned int elen;
long long ell;
double score;
} zsetopval;
typedef union _iterset iterset;
typedef union _iterzset iterzset;
void zuiInitIterator(zsetopsrc *op) {
if (op->subject == NULL)
return;
if (op->type == REDIS_SET) {
iterset *it = &op->iter.set;
if (op->encoding == REDIS_ENCODING_INTSET) {
it->is.is = op->subject->ptr;
it->is.ii = 0;
} else if (op->encoding == REDIS_ENCODING_HT) {
it->ht.dict = op->subject->ptr;
it->ht.di = dictGetIterator(op->subject->ptr);
it->ht.de = dictNext(it->ht.di);
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
iterzset *it = &op->iter.zset;
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
it->zl.zl = op->subject->ptr;
it->zl.eptr = ziplistIndex(it->zl.zl,0);
if (it->zl.eptr != NULL) {
it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
redisAssert(it->zl.sptr != NULL);
}
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
it->sl.zs = op->subject->ptr;
it->sl.node = it->sl.zs->zsl->header->level[0].forward;
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
}
void zuiClearIterator(zsetopsrc *op) {
if (op->subject == NULL)
return;
if (op->type == REDIS_SET) {
iterset *it = &op->iter.set;
if (op->encoding == REDIS_ENCODING_INTSET) {
REDIS_NOTUSED(it); /* skip */
} else if (op->encoding == REDIS_ENCODING_HT) {
dictReleaseIterator(it->ht.di);
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
iterzset *it = &op->iter.zset;
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
REDIS_NOTUSED(it); /* skip */
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
REDIS_NOTUSED(it); /* skip */
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
}
int zuiLength(zsetopsrc *op) {
if (op->subject == NULL)
return 0;
if (op->type == REDIS_SET) {
if (op->encoding == REDIS_ENCODING_INTSET) {
return intsetLen(op->subject->ptr);
} else if (op->encoding == REDIS_ENCODING_HT) {
dict *ht = op->subject->ptr;
return dictSize(ht);
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
return zzlLength(op->subject->ptr);
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = op->subject->ptr;
return zs->zsl->length;
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
}
/* Check if the current value is valid. If so, store it in the passed structure
* and move to the next element. If not valid, this means we have reached the
* end of the structure and can abort. */
int zuiNext(zsetopsrc *op, zsetopval *val) {
if (op->subject == NULL)
return 0;
if (val->flags & OPVAL_DIRTY_ROBJ)
decrRefCount(val->ele);
memset(val,0,sizeof(zsetopval));
if (op->type == REDIS_SET) {
iterset *it = &op->iter.set;
if (op->encoding == REDIS_ENCODING_INTSET) {
int64_t ell;
if (!intsetGet(it->is.is,it->is.ii,&ell))
return 0;
val->ell = ell;
val->score = 1.0;
/* Move to next element. */
it->is.ii++;
} else if (op->encoding == REDIS_ENCODING_HT) {
if (it->ht.de == NULL)
return 0;
val->ele = dictGetKey(it->ht.de);
val->score = 1.0;
/* Move to next element. */
it->ht.de = dictNext(it->ht.di);
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
iterzset *it = &op->iter.zset;
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
/* No need to check both, but better be explicit. */
if (it->zl.eptr == NULL || it->zl.sptr == NULL)
return 0;
redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
val->score = zzlGetScore(it->zl.sptr);
/* Move to next element. */
zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
if (it->sl.node == NULL)
return 0;
val->ele = it->sl.node->obj;
val->score = it->sl.node->score;
/* Move to next element. */
it->sl.node = it->sl.node->level[0].forward;
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
return 1;
}
int zuiLongLongFromValue(zsetopval *val) {
if (!(val->flags & OPVAL_DIRTY_LL)) {
val->flags |= OPVAL_DIRTY_LL;
if (val->ele != NULL) {
if (val->ele->encoding == REDIS_ENCODING_INT) {
val->ell = (long)val->ele->ptr;
val->flags |= OPVAL_VALID_LL;
} else if (sdsEncodedObject(val->ele)) {
if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
val->flags |= OPVAL_VALID_LL;
} else {
redisPanic("Unsupported element encoding");
}
} else if (val->estr != NULL) {
if (string2ll((char*)val->estr,val->elen,&val->ell))
val->flags |= OPVAL_VALID_LL;
} else {
/* The long long was already set, flag as valid. */
val->flags |= OPVAL_VALID_LL;
}
}
return val->flags & OPVAL_VALID_LL;
}
robj *zuiObjectFromValue(zsetopval *val) {
if (val->ele == NULL) {
if (val->estr != NULL) {
val->ele = createStringObject((char*)val->estr,val->elen);
} else {
val->ele = createStringObjectFromLongLong(val->ell);
}
val->flags |= OPVAL_DIRTY_ROBJ;
}
return val->ele;
}
int zuiBufferFromValue(zsetopval *val) {
if (val->estr == NULL) {
if (val->ele != NULL) {
if (val->ele->encoding == REDIS_ENCODING_INT) {
val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
val->estr = val->_buf;
} else if (sdsEncodedObject(val->ele)) {
val->elen = sdslen(val->ele->ptr);
val->estr = val->ele->ptr;
} else {
redisPanic("Unsupported element encoding");
}
} else {
val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
val->estr = val->_buf;
}
}
return 1;
}
/* Find value pointed to by val in the source pointer to by op. When found,
* return 1 and store its score in target. Return 0 otherwise. */
int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
if (op->subject == NULL)
return 0;
if (op->type == REDIS_SET) {
if (op->encoding == REDIS_ENCODING_INTSET) {
if (zuiLongLongFromValue(val) &&
intsetFind(op->subject->ptr,val->ell))
{
*score = 1.0;
return 1;
} else {
return 0;
}
} else if (op->encoding == REDIS_ENCODING_HT) {
dict *ht = op->subject->ptr;
zuiObjectFromValue(val);
if (dictFind(ht,val->ele) != NULL) {
*score = 1.0;
return 1;
} else {
return 0;
}
} else {
redisPanic("Unknown set encoding");
}
} else if (op->type == REDIS_ZSET) {
zuiObjectFromValue(val);
if (op->encoding == REDIS_ENCODING_ZIPLIST) {
if (zzlFind(op->subject->ptr,val->ele,score) != NULL) {
/* Score is already set by zzlFind. */
return 1;
} else {
return 0;
}
} else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = op->subject->ptr;
dictEntry *de;
if ((de = dictFind(zs->dict,val->ele)) != NULL) {
*score = *(double*)dictGetVal(de);
return 1;
} else {
return 0;
}
} else {
redisPanic("Unknown sorted set encoding");
}
} else {
redisPanic("Unsupported type");
}
}
int zuiCompareByCardinality(const void *s1, const void *s2) {
return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
}
#define REDIS_AGGR_SUM 1
#define REDIS_AGGR_MIN 2
#define REDIS_AGGR_MAX 3
#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
inline static void zunionInterAggregate(double *target, double val, int aggregate) {
if (aggregate == REDIS_AGGR_SUM) {
*target = *target + val;
/* The result of adding two doubles is NaN when one variable
* is +inf and the other is -inf. When these numbers are added,
* we maintain the convention of the result being 0.0. */
if (isnan(*target)) *target = 0.0;
} else if (aggregate == REDIS_AGGR_MIN) {
*target = val < *target ? val : *target;
} else if (aggregate == REDIS_AGGR_MAX) {
*target = val > *target ? val : *target;
} else {
/* safety net */
redisPanic("Unknown ZUNION/INTER aggregate type");
}
}
void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
int i, j;
long setnum;
int aggregate = REDIS_AGGR_SUM;
zsetopsrc *src;
zsetopval zval;
robj *tmp;
unsigned int maxelelen = 0;
robj *dstobj;
zset *dstzset;
zskiplistNode *znode;
int touched = 0;
/* expect setnum input keys to be given */
if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
return;
if (setnum < 1) {
addReplyError(c,
"at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return;
}
/* test if the expected number of keys would overflow */
if (setnum > c->argc-3) {
addReply(c,shared.syntaxerr);
return;
}
/* read keys to be used for input */
src = zcalloc(sizeof(zsetopsrc) * setnum);
for (i = 0, j = 3; i < setnum; i++, j++) {
robj *obj = lookupKeyWrite(c->db,c->argv[j]);
if (obj != NULL) {
if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
zfree(src);
addReply(c,shared.wrongtypeerr);
return;
}
src[i].subject = obj;
src[i].type = obj->type;
src[i].encoding = obj->encoding;
} else {
src[i].subject = NULL;
}
/* Default all weights to 1. */
src[i].weight = 1.0;
}
/* parse optional extra arguments */
if (j < c->argc) {
int remaining = c->argc - j;
while (remaining) {
if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
j++; remaining--;
for (i = 0; i < setnum; i++, j++, remaining--) {
if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
"weight value is not a float") != REDIS_OK)
{
zfree(src);
return;
}
}
} else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
j++; remaining--;
if (!strcasecmp(c->argv[j]->ptr,"sum")) {
aggregate = REDIS_AGGR_SUM;
} else if (!strcasecmp(c->argv[j]->ptr,"min")) {
aggregate = REDIS_AGGR_MIN;
} else if (!strcasecmp(c->argv[j]->ptr,"max")) {
aggregate = REDIS_AGGR_MAX;
} else {
zfree(src);
addReply(c,shared.syntaxerr);
return;
}
j++; remaining--;
} else {
zfree(src);
addReply(c,shared.syntaxerr);
return;
}
}
}
/* sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
dstobj = createZsetObject();
dstzset = dstobj->ptr;
memset(&zval, 0, sizeof(zval));
if (op == REDIS_OP_INTER) {
/* Skip everything if the smallest input is empty. */
if (zuiLength(&src[0]) > 0) {
/* Precondition: as src[0] is non-empty and the inputs are ordered
* by size, all src[i > 0] are non-empty too. */
zuiInitIterator(&src[0]);
while (zuiNext(&src[0],&zval)) {
double score, value;
score = src[0].weight * zval.score;
if (isnan(score)) score = 0;
for (j = 1; j < setnum; j++) {
/* It is not safe to access the zset we are
* iterating, so explicitly check for equal object. */
if (src[j].subject == src[0].subject) {
value = zval.score*src[j].weight;
zunionInterAggregate(&score,value,aggregate);
} else if (zuiFind(&src[j],&zval,&value)) {
value *= src[j].weight;
zunionInterAggregate(&score,value,aggregate);
} else {
break;
}
}
/* Only continue when present in every input. */
if (j == setnum) {
tmp = zuiObjectFromValue(&zval);
znode = zslInsert(dstzset->zsl,score,tmp);
incrRefCount(tmp); /* added to skiplist */
dictAdd(dstzset->dict,tmp,&znode->score);
incrRefCount(tmp); /* added to dictionary */
if (sdsEncodedObject(tmp)) {
if (sdslen(tmp->ptr) > maxelelen)
maxelelen = sdslen(tmp->ptr);
}
}
}
zuiClearIterator(&src[0]);
}
} else if (op == REDIS_OP_UNION) {
dict *accumulator = dictCreate(&setDictType,NULL);
dictIterator *di;
dictEntry *de;
double score;
if (setnum) {
/* Our union is at least as large as the largest set.
* Resize the dictionary ASAP to avoid useless rehashing. */
dictExpand(accumulator,zuiLength(&src[setnum-1]));
}
/* Step 1: Create a dictionary of elements -> aggregated-scores
* by iterating one sorted set after the other. */
for (i = 0; i < setnum; i++) {
if (zuiLength(&src[i]) == 0) continue;
zuiInitIterator(&src[i]);
while (zuiNext(&src[i],&zval)) {
/* Initialize value */
score = src[i].weight * zval.score;
if (isnan(score)) score = 0;
/* Search for this element in the accumulating dictionary. */
de = dictFind(accumulator,zuiObjectFromValue(&zval));
/* If we don't have it, we need to create a new entry. */
if (de == NULL) {
tmp = zuiObjectFromValue(&zval);
/* Remember the longest single element encountered,
* to understand if it's possible to convert to ziplist
* at the end. */
if (sdsEncodedObject(tmp)) {
if (sdslen(tmp->ptr) > maxelelen)
maxelelen = sdslen(tmp->ptr);
}
/* Add the element with its initial score. */
de = dictAddRaw(accumulator,tmp);
incrRefCount(tmp);
dictSetDoubleVal(de,score);
} else {
/* Update the score with the score of the new instance
* of the element found in the current sorted set.
*
* Here we access directly the dictEntry double
* value inside the union as it is a big speedup
* compared to using the getDouble/setDouble API. */
zunionInterAggregate(&de->v.d,score,aggregate);
}
}
zuiClearIterator(&src[i]);
}
/* Step 2: convert the dictionary into the final sorted set. */
di = dictGetIterator(accumulator);
/* We now are aware of the final size of the resulting sorted set,
* let's resize the dictionary embedded inside the sorted set to the
* right size, in order to save rehashing time. */
dictExpand(dstzset->dict,dictSize(accumulator));
while((de = dictNext(di)) != NULL) {
robj *ele = dictGetKey(de);
score = dictGetDoubleVal(de);
znode = zslInsert(dstzset->zsl,score,ele);
incrRefCount(ele); /* added to skiplist */
dictAdd(dstzset->dict,ele,&znode->score);
incrRefCount(ele); /* added to dictionary */
}
dictReleaseIterator(di);
/* We can free the accumulator dictionary now. */
dictRelease(accumulator);
} else {
redisPanic("Unknown operator");
}
if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c->db,dstkey);
touched = 1;
server.dirty++;
}
if (dstzset->zsl->length) {
/* Convert to ziplist when in limits. */
if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
maxelelen <= server.zset_max_ziplist_value)
zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c,zsetLength(dstobj));
if (!touched) signalModifiedKey(c->db,dstkey);
notifyKeyspaceEvent(REDIS_NOTIFY_ZSET,
(op == REDIS_OP_UNION) ? "zunionstore" : "zinterstore",
dstkey,c->db->id);
server.dirty++;
} else {
decrRefCount(dstobj);
addReply(c,shared.czero);
if (touched)
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",dstkey,c->db->id);
}
zfree(src);
}
void zunionstoreCommand(redisClient *c) {
zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
}
void zinterstoreCommand(redisClient *c) {
zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
}
void zrangeGenericCommand(redisClient *c, int reverse) {
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
int llen;
int rangelen;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
withscores = 1;
} else if (c->argc >= 5) {
addReply(c,shared.syntaxerr);
return;
}
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
|| checkType(c,zobj,REDIS_ZSET)) return;
/* Sanitize indexes. */
llen = zsetLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply */
addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (reverse)
eptr = ziplistIndex(zl,-2-(2*start));
else
eptr = ziplistIndex(zl,2*start);
redisAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
while (rangelen--) {
redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
addReplyBulkLongLong(c,vlong);
else
addReplyBulkCBuffer(c,vstr,vlen);
if (withscores)
addReplyDouble(c,zzlGetScore(sptr));
if (reverse)
zzlPrev(zl,&eptr,&sptr);
else
zzlNext(zl,&eptr,&sptr);
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
robj *ele;
/* Check if starting point is trivial, before doing log(N) lookup. */
if (reverse) {
ln = zsl->tail;
if (start > 0)
ln = zslGetElementByRank(zsl,llen-start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl,start+1);
}
while(rangelen--) {
redisAssertWithInfo(c,zobj,ln != NULL);
ele = ln->obj;
addReplyBulk(c,ele);
if (withscores)
addReplyDouble(c,ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
void zrangeCommand(redisClient *c) {
zrangeGenericCommand(c,0);
}
void zrevrangeCommand(redisClient *c) {
zrangeGenericCommand(c,1);
}
/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
void genericZrangebyscoreCommand(redisClient *c, int reverse) {
zrangespec range;
robj *key = c->argv[1];
robj *zobj;
long offset = 0, limit = -1;
int withscores = 0;
unsigned long rangelen = 0;
void *replylen = NULL;
int minidx, maxidx;
/* Parse the range arguments. */
if (reverse) {
/* Range is given as [max,min] */
maxidx = 2; minidx = 3;
} else {
/* Range is given as [min,max] */
minidx = 2; maxidx = 3;
}
if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
addReplyError(c,"min or max is not a float");
return;
}
/* Parse optional extra arguments. Note that ZCOUNT will exactly have
* 4 arguments, so we'll never enter the following code path. */
if (c->argc > 4) {
int remaining = c->argc - 4;
int pos = 4;
while (remaining) {
if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
pos++; remaining--;
withscores = 1;
} else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
pos += 3; remaining -= 3;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
}
/* Ok, lookup the key and get the range */
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
double score;
/* If reversed, get the last node in range as starting point. */
if (reverse) {
eptr = zzlLastInRange(zl,&range);
} else {
eptr = zzlFirstInRange(zl,&range);
}
/* No "first" element in the specified interval. */
if (eptr == NULL) {
addReply(c, shared.emptymultibulk);
return;
}
/* Get score pointer for the first element. */
redisAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addDeferredMultiBulkLength(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (eptr && offset--) {
if (reverse) {
zzlPrev(zl,&eptr,&sptr);
} else {
zzlNext(zl,&eptr,&sptr);
}
}
while (eptr && limit--) {
score = zzlGetScore(sptr);
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zslValueGteMin(score,&range)) break;
} else {
if (!zslValueLteMax(score,&range)) break;
}
/* We know the element exists, so ziplistGet should always succeed */
redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
rangelen++;
if (vstr == NULL) {
addReplyBulkLongLong(c,vlong);
} else {
addReplyBulkCBuffer(c,vstr,vlen);
}
if (withscores) {
addReplyDouble(c,score);
}
/* Move to next node */
if (reverse) {
zzlPrev(zl,&eptr,&sptr);
} else {
zzlNext(zl,&eptr,&sptr);
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
/* If reversed, get the last node in range as starting point. */
if (reverse) {
ln = zslLastInRange(zsl,&range);
} else {
ln = zslFirstInRange(zsl,&range);
}
/* No "first" element in the specified interval. */
if (ln == NULL) {
addReply(c, shared.emptymultibulk);
return;
}
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addDeferredMultiBulkLength(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (ln && offset--) {
if (reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
while (ln && limit--) {
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zslValueGteMin(ln->score,&range)) break;
} else {
if (!zslValueLteMax(ln->score,&range)) break;
}
rangelen++;
addReplyBulk(c,ln->obj);
if (withscores) {
addReplyDouble(c,ln->score);
}
/* Move to next node */
if (reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
if (withscores) {
rangelen *= 2;
}
setDeferredMultiBulkLength(c, replylen, rangelen);
}
void zrangebyscoreCommand(redisClient *c) {
genericZrangebyscoreCommand(c,0);
}
void zrevrangebyscoreCommand(redisClient *c) {
genericZrangebyscoreCommand(c,1);
}
void zcountCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
zrangespec range;
int count = 0;
/* Parse the range arguments */
if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
addReplyError(c,"min or max is not a float");
return;
}
/* Lookup the sorted set */
if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
checkType(c, zobj, REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
double score;
/* Use the first element in range as the starting point */
eptr = zzlFirstInRange(zl,&range);
/* No "first" element */
if (eptr == NULL) {
addReply(c, shared.czero);
return;
}
/* First element is in range */
sptr = ziplistNext(zl,eptr);
score = zzlGetScore(sptr);
redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
/* Iterate over elements in range */
while (eptr) {
score = zzlGetScore(sptr);
/* Abort when the node is no longer in range. */
if (!zslValueLteMax(score,&range)) {
break;
} else {
count++;
zzlNext(zl,&eptr,&sptr);
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *zn;
unsigned long rank;
/* Find first element in range */
zn = zslFirstInRange(zsl, &range);
/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->obj);
count = (zsl->length - (rank - 1));
/* Find last element in range */
zn = zslLastInRange(zsl, &range);
/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->obj);
count -= (zsl->length - rank);
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
addReplyLongLong(c, count);
}
void zlexcountCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
zlexrangespec range;
int count = 0;
/* Parse the range arguments */
if (zslParseLexRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
addReplyError(c,"min or max not valid string range item");
return;
}
/* Lookup the sorted set */
if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
checkType(c, zobj, REDIS_ZSET))
{
zslFreeLexRange(&range);
return;
}
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
/* Use the first element in range as the starting point */
eptr = zzlFirstInLexRange(zl,&range);
/* No "first" element */
if (eptr == NULL) {
zslFreeLexRange(&range);
addReply(c, shared.czero);
return;
}
/* First element is in range */
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(c,zobj,zzlLexValueLteMax(eptr,&range));
/* Iterate over elements in range */
while (eptr) {
/* Abort when the node is no longer in range. */
if (!zzlLexValueLteMax(eptr,&range)) {
break;
} else {
count++;
zzlNext(zl,&eptr,&sptr);
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *zn;
unsigned long rank;
/* Find first element in range */
zn = zslFirstInLexRange(zsl, &range);
/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->obj);
count = (zsl->length - (rank - 1));
/* Find last element in range */
zn = zslLastInLexRange(zsl, &range);
/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->obj);
count -= (zsl->length - rank);
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
zslFreeLexRange(&range);
addReplyLongLong(c, count);
}
/* This command implements ZRANGEBYLEX, ZREVRANGEBYLEX. */
void genericZrangebylexCommand(redisClient *c, int reverse) {
zlexrangespec range;
robj *key = c->argv[1];
robj *zobj;
long offset = 0, limit = -1;
unsigned long rangelen = 0;
void *replylen = NULL;
int minidx, maxidx;
/* Parse the range arguments. */
if (reverse) {
/* Range is given as [max,min] */
maxidx = 2; minidx = 3;
} else {
/* Range is given as [min,max] */
minidx = 2; maxidx = 3;
}
if (zslParseLexRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
addReplyError(c,"min or max not valid string range item");
return;
}
/* Parse optional extra arguments. Note that ZCOUNT will exactly have
* 4 arguments, so we'll never enter the following code path. */
if (c->argc > 4) {
int remaining = c->argc - 4;
int pos = 4;
while (remaining) {
if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
pos += 3; remaining -= 3;
} else {
zslFreeLexRange(&range);
addReply(c,shared.syntaxerr);
return;
}
}
}
/* Ok, lookup the key and get the range */
if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
checkType(c,zobj,REDIS_ZSET))
{
zslFreeLexRange(&range);
return;
}
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
/* If reversed, get the last node in range as starting point. */
if (reverse) {
eptr = zzlLastInLexRange(zl,&range);
} else {
eptr = zzlFirstInLexRange(zl,&range);
}
/* No "first" element in the specified interval. */
if (eptr == NULL) {
addReply(c, shared.emptymultibulk);
zslFreeLexRange(&range);
return;
}
/* Get score pointer for the first element. */
redisAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addDeferredMultiBulkLength(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (eptr && offset--) {
if (reverse) {
zzlPrev(zl,&eptr,&sptr);
} else {
zzlNext(zl,&eptr,&sptr);
}
}
while (eptr && limit--) {
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zzlLexValueGteMin(eptr,&range)) break;
} else {
if (!zzlLexValueLteMax(eptr,&range)) break;
}
/* We know the element exists, so ziplistGet should always
* succeed. */
redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
rangelen++;
if (vstr == NULL) {
addReplyBulkLongLong(c,vlong);
} else {
addReplyBulkCBuffer(c,vstr,vlen);
}
/* Move to next node */
if (reverse) {
zzlPrev(zl,&eptr,&sptr);
} else {
zzlNext(zl,&eptr,&sptr);
}
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
/* If reversed, get the last node in range as starting point. */
if (reverse) {
ln = zslLastInLexRange(zsl,&range);
} else {
ln = zslFirstInLexRange(zsl,&range);
}
/* No "first" element in the specified interval. */
if (ln == NULL) {
addReply(c, shared.emptymultibulk);
zslFreeLexRange(&range);
return;
}
/* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addDeferredMultiBulkLength(c);
/* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop. */
while (ln && offset--) {
if (reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
while (ln && limit--) {
/* Abort when the node is no longer in range. */
if (reverse) {
if (!zslLexValueGteMin(ln->obj,&range)) break;
} else {
if (!zslLexValueLteMax(ln->obj,&range)) break;
}
rangelen++;
addReplyBulk(c,ln->obj);
/* Move to next node */
if (reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
} else {
redisPanic("Unknown sorted set encoding");
}
zslFreeLexRange(&range);
setDeferredMultiBulkLength(c, replylen, rangelen);
}
void zrangebylexCommand(redisClient *c) {
genericZrangebylexCommand(c,0);
}
void zrevrangebylexCommand(redisClient *c) {
genericZrangebylexCommand(c,1);
}
void zcardCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
addReplyLongLong(c,zsetLength(zobj));
}
void zscoreCommand(redisClient *c) {
robj *key = c->argv[1];
robj *zobj;
double score;
if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
addReplyDouble(c,score);
else
addReply(c,shared.nullbulk);
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de;
c->argv[2] = tryObjectEncoding(c->argv[2]);
de = dictFind(zs->dict,c->argv[2]);
if (de != NULL) {
score = *(double*)dictGetVal(de);
addReplyDouble(c,score);
} else {
addReply(c,shared.nullbulk);
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
void zrankGenericCommand(redisClient *c, int reverse) {
robj *key = c->argv[1];
robj *ele = c->argv[2];
robj *zobj;
unsigned long llen;
unsigned long rank;
if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
llen = zsetLength(zobj);
redisAssertWithInfo(c,ele,sdsEncodedObject(ele));
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
eptr = ziplistIndex(zl,0);
redisAssertWithInfo(c,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(c,zobj,sptr != NULL);
rank = 1;
while(eptr != NULL) {
if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
break;
rank++;
zzlNext(zl,&eptr,&sptr);
}
if (eptr != NULL) {
if (reverse)
addReplyLongLong(c,llen-rank);
else
addReplyLongLong(c,rank-1);
} else {
addReply(c,shared.nullbulk);
}
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
dictEntry *de;
double score;
ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
de = dictFind(zs->dict,ele);
if (de != NULL) {
score = *(double*)dictGetVal(de);
rank = zslGetRank(zsl,score,ele);
redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
if (reverse)
addReplyLongLong(c,llen-rank);
else
addReplyLongLong(c,rank-1);
} else {
addReply(c,shared.nullbulk);
}
} else {
redisPanic("Unknown sorted set encoding");
}
}
void zrankCommand(redisClient *c) {
zrankGenericCommand(c, 0);
}
void zrevrankCommand(redisClient *c) {
zrankGenericCommand(c, 1);
}
void zscanCommand(redisClient *c) {
robj *o;
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == REDIS_ERR) return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
checkType(c,o,REDIS_ZSET)) return;
scanGenericCommand(c,o,cursor);
}