diff --git a/src/geo.c b/src/geo.c index ac25a20c6..d168d9841 100644 --- a/src/geo.c +++ b/src/geo.c @@ -300,7 +300,7 @@ int geoGetPointsInRange(robj *zobj, double min, double max, GeoShape *shape, geo zskiplist *zsl = zs->zsl; zskiplistNode *ln; - if ((ln = zslFirstInRange(zsl, &range)) == NULL) { + if ((ln = zslNthInRange(zsl, &range, 0)) == NULL) { /* Nothing exists starting at our min. No results. */ return 0; } diff --git a/src/module.c b/src/module.c index 5598da180..56500f007 100644 --- a/src/module.c +++ b/src/module.c @@ -4880,8 +4880,8 @@ int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, i } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = key->value->ptr; zskiplist *zsl = zs->zsl; - key->u.zset.current = first ? zslFirstInRange(zsl,zrs) : - zslLastInRange(zsl,zrs); + key->u.zset.current = first ? zslNthInRange(zsl,zrs,0) : + zslNthInRange(zsl,zrs,-1); } else { serverPanic("Unsupported zset encoding"); } @@ -4944,8 +4944,8 @@ int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleStr } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = key->value->ptr; zskiplist *zsl = zs->zsl; - key->u.zset.current = first ? zslFirstInLexRange(zsl,zlrs) : - zslLastInLexRange(zsl,zlrs); + key->u.zset.current = first ? zslNthInLexRange(zsl,zlrs,0) : + zslNthInLexRange(zsl,zlrs,-1); } else { serverPanic("Unsupported zset encoding"); } diff --git a/src/server.h b/src/server.h index 939da4365..52a2f8584 100644 --- a/src/server.h +++ b/src/server.h @@ -521,6 +521,7 @@ typedef enum { #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */ #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */ +#define ZSKIPLIST_MAX_SEARCH 10 /* Append only defines */ #define AOF_FSYNC_NO 0 @@ -2981,8 +2982,7 @@ void zslFree(zskiplist *zsl); zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele); unsigned char *zzlInsert(unsigned char *zl, sds ele, double score); int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node); -zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range); -zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range); +zskiplistNode *zslNthInRange(zskiplist *zsl, zrangespec *range, long n); double zzlGetScore(unsigned char *sptr); void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); @@ -3005,8 +3005,7 @@ void zslFreeLexRange(zlexrangespec *spec); int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec); unsigned char *zzlFirstInLexRange(unsigned char *zl, zlexrangespec *range); unsigned char *zzlLastInLexRange(unsigned char *zl, zlexrangespec *range); -zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range); -zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range); +zskiplistNode *zslNthInLexRange(zskiplist *zsl, zlexrangespec *range, long n); int zzlLexValueGteMin(unsigned char *p, zlexrangespec *spec); int zzlLexValueLteMax(unsigned char *p, zlexrangespec *spec); int zslLexValueGteMin(sds value, zlexrangespec *spec); diff --git a/src/t_zset.c b/src/t_zset.c index 7717a4a14..1242732d1 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -67,6 +67,8 @@ int zslLexValueGteMin(sds value, zlexrangespec *spec); int zslLexValueLteMax(sds value, zlexrangespec *spec); void zsetConvertAndExpand(robj *zobj, int encoding, unsigned long cap); +zskiplistNode *zslGetElementByRankFromNode(zskiplistNode *start_node, int start_level, unsigned long rank); +zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank); /* Create a skiplist node with the specified number of levels. * The SDS string 'ele' is referenced by the node after the call. */ @@ -328,54 +330,82 @@ int zslIsInRange(zskiplist *zsl, zrangespec *range) { return 1; } -/* Find the first node that is contained in the specified range. - * Returns NULL when no element is contained in the range. */ -zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) { +/* Find the Nth node that is contained in the specified range. N should be 0-based. + * Negative N works for reversed order (-1 represents the last element). Returns + * NULL when no element is contained in the range. */ +zskiplistNode *zslNthInRange(zskiplist *zsl, zrangespec *range, long n) { zskiplistNode *x; int i; + long edge_rank = 0; + long last_highest_level_rank = 0; + zskiplistNode *last_highest_level_node = NULL; + unsigned long rank_diff; /* If everything is out of range, return early. */ if (!zslIsInRange(zsl,range)) return NULL; + /* Go forward while *OUT* of range at level of zsl->level-1. */ x = zsl->header; - for (i = zsl->level-1; i >= 0; i--) { - /* Go forward while *OUT* of range. */ - while (x->level[i].forward && - !zslValueGteMin(x->level[i].forward->score,range)) + i = zsl->level - 1; + while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score, range)) { + edge_rank += x->level[i].span; + x = x->level[i].forward; + } + /* Remember the last node which has zsl->level-1 levels and its rank. */ + last_highest_level_node = x; + last_highest_level_rank = edge_rank; + + if (n >= 0) { + for (i = zsl->level - 2; i >= 0; i--) { + /* Go forward while *OUT* of range. */ + while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score, range)) { + /* Count the rank of the last element smaller than the range. */ + edge_rank += x->level[i].span; x = x->level[i].forward; + } + } + /* Check if zsl is long enough. */ + if ((unsigned long)(edge_rank + n) >= zsl->length) return NULL; + if (n < ZSKIPLIST_MAX_SEARCH) { + /* If offset is small, we can just jump node by node */ + /* rank+1 is the first element in range, so we need n+1 steps to reach target. */ + for (i = 0; i < n + 1; i++) { + x = x->level[0].forward; + } + } else { + /* If offset is big, we can jump from the last zsl->level-1 node. */ + rank_diff = edge_rank + 1 + n - last_highest_level_rank; + x = zslGetElementByRankFromNode(last_highest_level_node, zsl->level - 1, rank_diff); + } + /* Check if score <= max. */ + if (x && !zslValueLteMax(x->score,range)) return NULL; + } else { + for (i = zsl->level - 1; i >= 0; i--) { + /* Go forward while *IN* range. */ + while (x->level[i].forward && zslValueLteMax(x->level[i].forward->score, range)) { + /* Count the rank of the last element in range. */ + edge_rank += x->level[i].span; + x = x->level[i].forward; + } + } + /* Check if the range is big enough. */ + if (edge_rank < -n) return NULL; + if (n + 1 > -ZSKIPLIST_MAX_SEARCH) { + /* If offset is small, we can just jump node by node */ + /* rank is the -1th element in range, so we need -n-1 steps to reach target. */ + for (i = 0; i < -n - 1; i++) { + x = x->backward; + } + } else { + /* If offset is big, we can jump from the last zsl->level-1 node. */ + /* rank is the last element in range, n is -1-based, so we need n+1 to count backwards. */ + rank_diff = edge_rank + 1 + n - last_highest_level_rank; + x = zslGetElementByRankFromNode(last_highest_level_node, zsl->level - 1, rank_diff); + } + /* Check if score >= min. */ + if (x && !zslValueGteMin(x->score, range)) return NULL; } - /* This is an inner range, so the next node cannot be NULL. */ - x = x->level[0].forward; - serverAssert(x != NULL); - - /* Check if score <= max. */ - if (!zslValueLteMax(x->score,range)) return NULL; - return x; -} - -/* Find the last node that is contained in the specified range. - * Returns NULL when no element is contained in the range. */ -zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) { - zskiplistNode *x; - int i; - - /* If everything is out of range, return early. */ - if (!zslIsInRange(zsl,range)) return NULL; - - x = zsl->header; - for (i = zsl->level-1; i >= 0; i--) { - /* Go forward while *IN* range. */ - while (x->level[i].forward && - zslValueLteMax(x->level[i].forward->score,range)) - x = x->level[i].forward; - } - - /* This is an inner range, so this node cannot be NULL. */ - serverAssert(x != NULL); - - /* Check if score >= min. */ - if (!zslValueGteMin(x->score,range)) return NULL; return x; } @@ -498,14 +528,14 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) { return 0; } -/* Finds an element by its rank. The rank argument needs to be 1-based. */ -zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { +/* Finds an element by its rank from start node. The rank argument needs to be 1-based. */ +zskiplistNode *zslGetElementByRankFromNode(zskiplistNode *start_node, int start_level, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i; - x = zsl->header; - for (i = zsl->level-1; i >= 0; i--) { + x = start_node; + for (i = start_level; i >= 0; i--) { while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { traversed += x->level[i].span; @@ -518,6 +548,11 @@ zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { return NULL; } +/* Finds an element by its rank. The rank argument needs to be 1-based. */ +zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) { + return zslGetElementByRankFromNode(zsl->header, zsl->level - 1, rank); +} + /* Populate the rangespec according to the objects min and max. */ static int zslParseRange(robj *min, robj *max, zrangespec *spec) { char *eptr; @@ -666,54 +701,81 @@ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) { return 1; } -/* Find the first node that is contained in the specified lex range. - * Returns NULL when no element is contained in the range. */ -zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range) { +/* Find the Nth node that is contained in the specified range. N should be 0-based. + * Negative N works for reversed order (-1 represents the last element). Returns + * NULL when no element is contained in the range. */ +zskiplistNode *zslNthInLexRange(zskiplist *zsl, zlexrangespec *range, long n) { zskiplistNode *x; int i; + long edge_rank = 0; + long last_highest_level_rank = 0; + zskiplistNode *last_highest_level_node = NULL; + unsigned long rank_diff; /* If everything is out of range, return early. */ if (!zslIsInLexRange(zsl,range)) return NULL; + /* Go forward while *OUT* of range at level of zsl->level-1. */ x = zsl->header; - for (i = zsl->level-1; i >= 0; i--) { - /* Go forward while *OUT* of range. */ - while (x->level[i].forward && - !zslLexValueGteMin(x->level[i].forward->ele,range)) + i = zsl->level - 1; + while (x->level[i].forward && !zslLexValueGteMin(x->level[i].forward->ele, range)) { + edge_rank += x->level[i].span; + x = x->level[i].forward; + } + /* Remember the last node which has zsl->level-1 levels and its rank. */ + last_highest_level_node = x; + last_highest_level_rank = edge_rank; + + if (n >= 0) { + for (i = zsl->level - 2; i >= 0; i--) { + /* Go forward while *OUT* of range. */ + while (x->level[i].forward && !zslLexValueGteMin(x->level[i].forward->ele, range)) { + /* Count the rank of the last element smaller than the range. */ + edge_rank += x->level[i].span; x = x->level[i].forward; + } + } + /* Check if zsl is long enough. */ + if ((unsigned long)(edge_rank + n) >= zsl->length) return NULL; + if (n < ZSKIPLIST_MAX_SEARCH) { + /* If offset is small, we can just jump node by node */ + /* rank+1 is the first element in range, so we need n+1 steps to reach target. */ + for (i = 0; i < n + 1; i++) { + x = x->level[0].forward; + } + } else { + /* If offset is big, we caasn jump from the last zsl->level-1 node. */ + rank_diff = edge_rank + 1 + n - last_highest_level_rank; + x = zslGetElementByRankFromNode(last_highest_level_node, zsl->level - 1, rank_diff); + } + /* Check if score <= max. */ + if (x && !zslLexValueLteMax(x->ele,range)) return NULL; + } else { + for (i = zsl->level - 1; i >= 0; i--) { + /* Go forward while *IN* range. */ + while (x->level[i].forward && zslLexValueLteMax(x->level[i].forward->ele, range)) { + /* Count the rank of the last element in range. */ + edge_rank += x->level[i].span; + x = x->level[i].forward; + } + } + /* Check if the range is big enough. */ + if (edge_rank < -n) return NULL; + if (n + 1 > -ZSKIPLIST_MAX_SEARCH) { + /* If offset is small, we can just jump node by node */ + for (i = 0; i < -n - 1; i++) { + x = x->backward; + } + } else { + /* If offset is big, we can jump from the last zsl->level-1 node. */ + /* rank is the last element in range, n is -1-based, so we need n+1 to count backwards. */ + rank_diff = edge_rank + 1 + n - last_highest_level_rank; + x = zslGetElementByRankFromNode(last_highest_level_node, zsl->level - 1, rank_diff); + } + /* Check if score >= min. */ + if (x && !zslLexValueGteMin(x->ele, range)) return NULL; } - /* This is an inner range, so the next node cannot be NULL. */ - x = x->level[0].forward; - serverAssert(x != NULL); - - /* Check if score <= max. */ - if (!zslLexValueLteMax(x->ele,range)) return NULL; - return x; -} - -/* Find the last node that is contained in the specified range. - * Returns NULL when no element is contained in the range. */ -zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range) { - zskiplistNode *x; - int i; - - /* If everything is out of range, return early. */ - if (!zslIsInLexRange(zsl,range)) return NULL; - - x = zsl->header; - for (i = zsl->level-1; i >= 0; i--) { - /* Go forward while *IN* range. */ - while (x->level[i].forward && - zslLexValueLteMax(x->level[i].forward->ele,range)) - x = x->level[i].forward; - } - - /* This is an inner range, so this node cannot be NULL. */ - serverAssert(x != NULL); - - /* Check if score >= min. */ - if (!zslLexValueGteMin(x->ele,range)) return NULL; return x; } @@ -3269,19 +3331,9 @@ void genericZrangebyscoreCommand(zrange_result_handler *handler, /* If reversed, get the last node in range as starting point. */ if (reverse) { - ln = zslLastInRange(zsl,range); + ln = zslNthInRange(zsl,range,-offset-1); } else { - ln = zslFirstInRange(zsl,range); - } - - /* If there is an offset, just traverse the number of elements without - * checking the score because that is done in the next loop. */ - while (ln && offset--) { - if (reverse) { - ln = ln->backward; - } else { - ln = ln->level[0].forward; - } + ln = zslNthInRange(zsl,range,offset); } while (ln && limit--) { @@ -3377,7 +3429,7 @@ void zcountCommand(client *c) { unsigned long rank; /* Find first element in range */ - zn = zslFirstInRange(zsl, &range); + zn = zslNthInRange(zsl, &range, 0); /* Use rank of first element, if any, to determine preliminary count */ if (zn != NULL) { @@ -3385,7 +3437,7 @@ void zcountCommand(client *c) { count = (zsl->length - (rank - 1)); /* Find last element in range */ - zn = zslLastInRange(zsl, &range); + zn = zslNthInRange(zsl, &range, -1); /* Use rank of last element, if any, to determine the actual count */ if (zn != NULL) { @@ -3455,7 +3507,7 @@ void zlexcountCommand(client *c) { unsigned long rank; /* Find first element in range */ - zn = zslFirstInLexRange(zsl, &range); + zn = zslNthInLexRange(zsl, &range, 0); /* Use rank of first element, if any, to determine preliminary count */ if (zn != NULL) { @@ -3463,7 +3515,7 @@ void zlexcountCommand(client *c) { count = (zsl->length - (rank - 1)); /* Find last element in range */ - zn = zslLastInLexRange(zsl, &range); + zn = zslNthInLexRange(zsl, &range, -1); /* Use rank of last element, if any, to determine the actual count */ if (zn != NULL) { @@ -3550,19 +3602,9 @@ void genericZrangebylexCommand(zrange_result_handler *handler, /* If reversed, get the last node in range as starting point. */ if (reverse) { - ln = zslLastInLexRange(zsl,range); + ln = zslNthInLexRange(zsl,range,-offset-1); } else { - ln = zslFirstInLexRange(zsl,range); - } - - /* If there is an offset, just traverse the number of elements without - * checking the score because that is done in the next loop. */ - while (ln && offset--) { - if (reverse) { - ln = ln->backward; - } else { - ln = ln->level[0].forward; - } + ln = zslNthInLexRange(zsl,range,offset); } while (ln && limit--) { diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 33427d89c..4f11dea94 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -506,6 +506,13 @@ start_server {tags {"zset"}} { create_zset zset {-inf a 1 b 2 c 3 d 4 e 5 f +inf g} } + proc create_long_zset {key length} { + r del $key + for {set i 0} {$i < $length} {incr i 1} { + r zadd $key $i i$i + } + } + test "ZRANGEBYSCORE/ZREVRANGEBYSCORE/ZCOUNT basics - $encoding" { create_default_zset @@ -574,6 +581,16 @@ start_server {tags {"zset"}} { assert_equal {d c b} [r zrevrangebyscore zset 10 0 LIMIT 2 3] assert_equal {d c b} [r zrevrangebyscore zset 10 0 LIMIT 2 10] assert_equal {} [r zrevrangebyscore zset 10 0 LIMIT 20 10] + # zrangebyscore uses different logic when offset > ZSKIPLIST_MAX_SEARCH + create_long_zset zset 30 + assert_equal {i12 i13 i14} [r zrangebyscore zset 0 20 LIMIT 12 3] + assert_equal {i14 i15} [r zrangebyscore zset 0 20 LIMIT 14 2] + assert_equal {i19 i20 i21} [r zrangebyscore zset 0 30 LIMIT 19 3] + assert_equal {i29} [r zrangebyscore zset 10 30 LIMIT 19 2] + assert_equal {i17 i16 i15} [r zrevrangebyscore zset 30 10 LIMIT 12 3] + assert_equal {i6 i5} [r zrevrangebyscore zset 20 0 LIMIT 14 2] + assert_equal {i2 i1 i0} [r zrevrangebyscore zset 20 0 LIMIT 18 5] + assert_equal {i0} [r zrevrangebyscore zset 20 0 LIMIT 20 5] } test "ZRANGEBYSCORE with LIMIT and WITHSCORES - $encoding" { @@ -595,6 +612,14 @@ start_server {tags {"zset"}} { 0 omega} } + proc create_long_lex_zset {} { + create_zset zset {0 alpha 0 bar 0 cool 0 down + 0 elephant 0 foo 0 great 0 hill + 0 island 0 jacket 0 key 0 lip + 0 max 0 null 0 omega 0 point + 0 query 0 result 0 sea 0 tree} + } + test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZLEXCOUNT basics - $encoding" { create_default_lex_zset @@ -640,7 +665,7 @@ start_server {tags {"zset"}} { assert_equal 1 [r zlexcount zset (maxstring +] } - test "ZRANGEBYSLEX with LIMIT - $encoding" { + test "ZRANGEBYLEX with LIMIT - $encoding" { create_default_lex_zset assert_equal {alpha bar} [r zrangebylex zset - \[cool LIMIT 0 2] assert_equal {bar cool} [r zrangebylex zset - \[cool LIMIT 1 2] @@ -651,6 +676,22 @@ start_server {tags {"zset"}} { assert_equal {bar cool down} [r zrangebylex zset \[bar \[down LIMIT 0 100] assert_equal {omega hill great foo elephant} [r zrevrangebylex zset + \[d LIMIT 0 5] assert_equal {omega hill great foo} [r zrevrangebylex zset + \[d LIMIT 0 4] + assert_equal {great foo elephant} [r zrevrangebylex zset + \[d LIMIT 2 3] + # zrangebylex uses different logic when offset > ZSKIPLIST_MAX_SEARCH + create_long_lex_zset + assert_equal {max null} [r zrangebylex zset - \[tree LIMIT 12 2] + assert_equal {point query} [r zrangebylex zset - \[tree LIMIT 15 2] + assert_equal {} [r zrangebylex zset \[max \[tree LIMIT 10 0] + assert_equal {} [r zrangebylex zset \[max \[tree LIMIT 12 0] + assert_equal {max} [r zrangebylex zset \[max \[null LIMIT 0 1] + assert_equal {null} [r zrangebylex zset \[max \[null LIMIT 1 1] + assert_equal {max null omega point} [r zrangebylex zset \[max \[point LIMIT 0 100] + assert_equal {tree sea result query point} [r zrevrangebylex zset + \[o LIMIT 0 5] + assert_equal {tree sea result query} [r zrevrangebylex zset + \[o LIMIT 0 4] + assert_equal {omega null max lip} [r zrevrangebylex zset + \[l LIMIT 5 4] + assert_equal {elephant down} [r zrevrangebylex zset + \[a LIMIT 15 2] + assert_equal {bar alpha} [r zrevrangebylex zset + - LIMIT 18 6] + assert_equal {hill great foo} [r zrevrangebylex zset + \[c LIMIT 12 3] } test "ZRANGEBYLEX with invalid lex range specifiers - $encoding" {