Add ZINTER/ZUNION command

Syntax: ZINTER/ZUNION numkeys key [key ...] [WEIGHTS weight [weight ...]]
[AGGREGATE SUM|MIN|MAX] [WITHSCORES]

see #7624
This commit is contained in:
bodong.ybd 2020-09-11 16:59:15 +08:00 committed by Oran Agra
parent 66a13ccbdf
commit e08bf16637
5 changed files with 146 additions and 21 deletions

View File

@ -1388,7 +1388,7 @@ void getKeysFreeResult(int *result) {
/* Helper function to extract keys from following commands: /* Helper function to extract keys from following commands:
* ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options> * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
* ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */ * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { int *zunionInterStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
int i, num, *keys; int i, num, *keys;
UNUSED(cmd); UNUSED(cmd);
@ -1416,6 +1416,31 @@ int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *nu
return keys; return keys;
} }
/* Helper function to extract keys from following commands:
* ZUNION <num-keys> <key> <key> ... <key> <options>
* ZINTER <num-keys> <key> <key> ... <key> <options> */
int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
int i, num, *keys;
UNUSED(cmd);
num = atoi(argv[1]->ptr);
/* Sanity check. Don't return any key if the command is going to
* reply with syntax error. */
if (num < 1 || num > (argc-2)) {
*numkeys = 0;
return NULL;
}
keys = getKeysTempBuffer;
if (num>MAX_KEYS_BUFFER)
keys = zmalloc(sizeof(int)*(num));
/* Add all key positions for argv[2...n] to keys[] */
for (i = 0; i < num; i++) keys[i] = 2+i;
*numkeys = num;
return keys;
}
/* Helper function to extract keys from the following commands: /* Helper function to extract keys from the following commands:
* EVAL <script> <num-keys> <key> <key> ... <key> [more stuff] * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
* EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */ * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */

View File

@ -428,10 +428,18 @@ struct redisCommand redisCommandTable[] = {
{"zunionstore",zunionstoreCommand,-4, {"zunionstore",zunionstoreCommand,-4,
"write use-memory @sortedset", "write use-memory @sortedset",
0,zunionInterGetKeys,0,0,0,0,0,0}, 0,zunionInterStoreGetKeys,0,0,0,0,0,0},
{"zinterstore",zinterstoreCommand,-4, {"zinterstore",zinterstoreCommand,-4,
"write use-memory @sortedset", "write use-memory @sortedset",
0,zunionInterStoreGetKeys,0,0,0,0,0,0},
{"zunion",zunionCommand,-3,
"read-only @sortedset",
0,zunionInterGetKeys,0,0,0,0,0,0},
{"zinter",zinterCommand,-3,
"read-only @sortedset",
0,zunionInterGetKeys,0,0,0,0,0,0}, 0,zunionInterGetKeys,0,0,0,0,0,0},
{"zrange",zrangeCommand,-4, {"zrange",zrangeCommand,-4,

View File

@ -2175,6 +2175,7 @@ void freeObjAsync(robj *o);
int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
void getKeysFreeResult(int *result); void getKeysFreeResult(int *result);
int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys); int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys);
int *zunionInterStoreGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys);
int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
@ -2391,6 +2392,8 @@ void hstrlenCommand(client *c);
void zremrangebyrankCommand(client *c); void zremrangebyrankCommand(client *c);
void zunionstoreCommand(client *c); void zunionstoreCommand(client *c);
void zinterstoreCommand(client *c); void zinterstoreCommand(client *c);
void zunionCommand(client *c);
void zinterCommand(client *c);
void zscanCommand(client *c); void zscanCommand(client *c);
void hkeysCommand(client *c); void hkeysCommand(client *c);
void hvalsCommand(client *c); void hvalsCommand(client *c);

View File

@ -2196,7 +2196,15 @@ dictType setAccumulatorDictType = {
NULL /* val destructor */ NULL /* val destructor */
}; };
void zunionInterGenericCommand(client *c, robj *dstkey, int op) { /* The zunionInterGenericCommand() function is called in order to implement the
* following commands: ZUNION, ZINTER, ZUNIONSTORE, ZINTERSTORE.
*
* 'numkeysIndex' parameter position of key number. for ZUNION/ZINTER command, this
* value is 1, for ZUNIONSTORE/ZINTERSTORE command, this value is 2.
*
* 'op' SET_OP_INTER or SET_OP_UNION.
*/
void zunionInterGenericCommand(client *c, robj *dstkey, int numkeysIndex, int op) {
int i, j; int i, j;
long setnum; long setnum;
int aggregate = REDIS_AGGR_SUM; int aggregate = REDIS_AGGR_SUM;
@ -2207,9 +2215,10 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
robj *dstobj; robj *dstobj;
zset *dstzset; zset *dstzset;
zskiplistNode *znode; zskiplistNode *znode;
int withscores = 0;
/* expect setnum input keys to be given */ /* expect setnum input keys to be given */
if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != C_OK)) if ((getLongFromObjectOrReply(c, c->argv[numkeysIndex], &setnum, NULL) != C_OK))
return; return;
if (setnum < 1) { if (setnum < 1) {
@ -2219,14 +2228,14 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
} }
/* test if the expected number of keys would overflow */ /* test if the expected number of keys would overflow */
if (setnum > c->argc-3) { if (setnum > (c->argc-(numkeysIndex+1))) {
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
return; return;
} }
/* read keys to be used for input */ /* read keys to be used for input */
src = zcalloc(sizeof(zsetopsrc) * setnum); src = zcalloc(sizeof(zsetopsrc) * setnum);
for (i = 0, j = 3; i < setnum; i++, j++) { for (i = 0, j = numkeysIndex+1; i < setnum; i++, j++) {
robj *obj = lookupKeyWrite(c->db,c->argv[j]); robj *obj = lookupKeyWrite(c->db,c->argv[j]);
if (obj != NULL) { if (obj != NULL) {
if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) { if (obj->type != OBJ_ZSET && obj->type != OBJ_SET) {
@ -2279,6 +2288,11 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
return; return;
} }
j++; remaining--; j++; remaining--;
} else if (remaining >= 1 &&
!strcasecmp(c->argv[j]->ptr,"withscores"))
{
j++; remaining--;
withscores = 1;
} else { } else {
zfree(src); zfree(src);
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
@ -2399,32 +2413,57 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) {
serverPanic("Unknown operator"); serverPanic("Unknown operator");
} }
if (dstkey) {
if (dstzset->zsl->length) { if (dstzset->zsl->length) {
zsetConvertToZiplistIfNeeded(dstobj,maxelelen); zsetConvertToZiplistIfNeeded(dstobj, maxelelen);
setKey(c,c->db,dstkey,dstobj); setKey(c, c->db, dstkey, dstobj);
addReplyLongLong(c,zsetLength(dstobj)); addReplyLongLong(c, zsetLength(dstobj));
notifyKeyspaceEvent(NOTIFY_ZSET, notifyKeyspaceEvent(NOTIFY_ZSET,
(op == SET_OP_UNION) ? "zunionstore" : "zinterstore", (op == SET_OP_UNION) ? "zunionstore" : "zinterstore",
dstkey,c->db->id); dstkey, c->db->id);
server.dirty++; server.dirty++;
} else { } else {
addReply(c,shared.czero); addReply(c, shared.czero);
if (dbDelete(c->db,dstkey)) { if (dbDelete(c->db, dstkey)) {
signalModifiedKey(c,c->db,dstkey); signalModifiedKey(c, c->db, dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC, "del", dstkey, c->db->id);
server.dirty++; server.dirty++;
} }
} }
} else {
unsigned long length = dstzset->zsl->length;
zskiplist *zsl = dstzset->zsl;
zskiplistNode *zn = zsl->header->level[0].forward;
if (withscores && c->resp == 2)
addReplyArrayLen(c, length*2);
else
addReplyArrayLen(c, length);
while (zn != NULL) {
if (withscores && c->resp > 2) addReplyArrayLen(c,2);
addReplyBulkCBuffer(c,zn->ele,sdslen(zn->ele));
if (withscores) addReplyDouble(c,zn->score);
zn = zn->level[0].forward;
}
}
decrRefCount(dstobj); decrRefCount(dstobj);
zfree(src); zfree(src);
} }
void zunionstoreCommand(client *c) { void zunionstoreCommand(client *c) {
zunionInterGenericCommand(c,c->argv[1], SET_OP_UNION); zunionInterGenericCommand(c, c->argv[1], 2, SET_OP_UNION);
} }
void zinterstoreCommand(client *c) { void zinterstoreCommand(client *c) {
zunionInterGenericCommand(c,c->argv[1], SET_OP_INTER); zunionInterGenericCommand(c, c->argv[1], 2, SET_OP_INTER);
}
void zunionCommand(client *c) {
zunionInterGenericCommand(c, NULL, 1, SET_OP_UNION);
}
void zinterCommand(client *c) {
zunionInterGenericCommand(c, NULL, 1, SET_OP_INTER);
} }
void zrangeGenericCommand(client *c, int reverse) { void zrangeGenericCommand(client *c, int reverse) {

View File

@ -612,6 +612,12 @@ start_server {tags {"zset"}} {
assert_equal 0 [r exists dst_key] assert_equal 0 [r exists dst_key]
} }
test "ZUNION/ZINTER against non-existing key - $encoding" {
r del zseta
assert_equal {} [r zunion 1 zseta]
assert_equal {} [r zinter 1 zseta]
}
test "ZUNIONSTORE with empty set - $encoding" { test "ZUNIONSTORE with empty set - $encoding" {
r del zseta zsetb r del zseta zsetb
r zadd zseta 1 a r zadd zseta 1 a
@ -620,6 +626,14 @@ start_server {tags {"zset"}} {
r zrange zsetc 0 -1 withscores r zrange zsetc 0 -1 withscores
} {a 1 b 2} } {a 1 b 2}
test "ZUNION/ZINTER with empty set - $encoding" {
r del zseta zsetb
r zadd zseta 1 a
r zadd zseta 2 b
assert_equal {a 1 b 2} [r zunion 2 zseta zsetb withscores]
assert_equal {} [r zinter 2 zseta zsetb withscores]
}
test "ZUNIONSTORE basics - $encoding" { test "ZUNIONSTORE basics - $encoding" {
r del zseta zsetb zsetc r del zseta zsetb zsetc
r zadd zseta 1 a r zadd zseta 1 a
@ -633,11 +647,29 @@ start_server {tags {"zset"}} {
assert_equal {a 1 b 3 d 3 c 5} [r zrange zsetc 0 -1 withscores] assert_equal {a 1 b 3 d 3 c 5} [r zrange zsetc 0 -1 withscores]
} }
test "ZUNION/ZINTER with integer members - $encoding" {
r del zsetd zsetf
r zadd zsetd 1 1
r zadd zsetd 2 2
r zadd zsetd 3 3
r zadd zsetf 1 1
r zadd zsetf 3 3
r zadd zsetf 4 4
assert_equal {1 2 2 2 4 4 3 6} [r zunion 2 zsetd zsetf withscores]
assert_equal {1 2 3 6} [r zinter 2 zsetd zsetf withscores]
}
test "ZUNIONSTORE with weights - $encoding" { test "ZUNIONSTORE with weights - $encoding" {
assert_equal 4 [r zunionstore zsetc 2 zseta zsetb weights 2 3] assert_equal 4 [r zunionstore zsetc 2 zseta zsetb weights 2 3]
assert_equal {a 2 b 7 d 9 c 12} [r zrange zsetc 0 -1 withscores] assert_equal {a 2 b 7 d 9 c 12} [r zrange zsetc 0 -1 withscores]
} }
test "ZUNION with weights - $encoding" {
assert_equal {a 2 b 7 d 9 c 12} [r zunion 2 zseta zsetb weights 2 3 withscores]
assert_equal {b 7 c 12} [r zinter 2 zseta zsetb weights 2 3 withscores]
}
test "ZUNIONSTORE with a regular set and weights - $encoding" { test "ZUNIONSTORE with a regular set and weights - $encoding" {
r del seta r del seta
r sadd seta a r sadd seta a
@ -653,21 +685,39 @@ start_server {tags {"zset"}} {
assert_equal {a 1 b 1 c 2 d 3} [r zrange zsetc 0 -1 withscores] assert_equal {a 1 b 1 c 2 d 3} [r zrange zsetc 0 -1 withscores]
} }
test "ZUNION/ZINTER with AGGREGATE MIN - $encoding" {
assert_equal {a 1 b 1 c 2 d 3} [r zunion 2 zseta zsetb aggregate min withscores]
assert_equal {b 1 c 2} [r zinter 2 zseta zsetb aggregate min withscores]
}
test "ZUNIONSTORE with AGGREGATE MAX - $encoding" { test "ZUNIONSTORE with AGGREGATE MAX - $encoding" {
assert_equal 4 [r zunionstore zsetc 2 zseta zsetb aggregate max] assert_equal 4 [r zunionstore zsetc 2 zseta zsetb aggregate max]
assert_equal {a 1 b 2 c 3 d 3} [r zrange zsetc 0 -1 withscores] assert_equal {a 1 b 2 c 3 d 3} [r zrange zsetc 0 -1 withscores]
} }
test "ZUNION/ZINTER with AGGREGATE MAX - $encoding" {
assert_equal {a 1 b 2 c 3 d 3} [r zunion 2 zseta zsetb aggregate max withscores]
assert_equal {b 2 c 3} [r zinter 2 zseta zsetb aggregate max withscores]
}
test "ZINTERSTORE basics - $encoding" { test "ZINTERSTORE basics - $encoding" {
assert_equal 2 [r zinterstore zsetc 2 zseta zsetb] assert_equal 2 [r zinterstore zsetc 2 zseta zsetb]
assert_equal {b 3 c 5} [r zrange zsetc 0 -1 withscores] assert_equal {b 3 c 5} [r zrange zsetc 0 -1 withscores]
} }
test "ZINTER basics - $encoding" {
assert_equal {b 3 c 5} [r zinter 2 zseta zsetb withscores]
}
test "ZINTERSTORE with weights - $encoding" { test "ZINTERSTORE with weights - $encoding" {
assert_equal 2 [r zinterstore zsetc 2 zseta zsetb weights 2 3] assert_equal 2 [r zinterstore zsetc 2 zseta zsetb weights 2 3]
assert_equal {b 7 c 12} [r zrange zsetc 0 -1 withscores] assert_equal {b 7 c 12} [r zrange zsetc 0 -1 withscores]
} }
test "ZINTER with weights - $encoding" {
assert_equal {b 7 c 12} [r zinter 2 zseta zsetb weights 2 3 withscores]
}
test "ZINTERSTORE with a regular set and weights - $encoding" { test "ZINTERSTORE with a regular set and weights - $encoding" {
r del seta r del seta
r sadd seta a r sadd seta a