futriix/src/t_set.c
Meir Shpilraien (Spielrein) 508a138885
Fix replication inconsistency on modules that uses key space notifications (#10969)
Fix replication inconsistency on modules that uses key space notifications.

### The Problem

In general, key space notifications are invoked after the command logic was
executed (this is not always the case, we will discuss later about specific
command that do not follow this rules). For example, the `set x 1` will trigger
a `set` notification that will be invoked after the `set` logic was performed, so
if the notification logic will try to fetch `x`, it will see the new data that was written.
Consider the scenario on which the notification logic performs some write
commands. for example, the notification logic increase some counter,
`incr x{counter}`, indicating how many times `x` was changed.
The logical order by which the logic was executed is has follow:

```
set x 1
incr x{counter}
```

The issue is that the `set x 1` command is added to the replication buffer
at the end of the command invocation (specifically after the key space
notification logic was invoked and performed the `incr` command).
The replication/aof sees the commands in the wrong order:

```
incr x{counter}
set x 1
```

In this specific example the order is less important.
But if, for example, the notification would have deleted `x` then we would
end up with primary-replica inconsistency.

### The Solution

Put the command that cause the notification in its rightful place. In the
above example, the `set x 1` command logic was executed before the
notification logic, so it should be added to the replication buffer before
the commands that is invoked by the notification logic. To achieve this,
without a major code refactoring, we save a placeholder in the replication
buffer, when finishing invoking the command logic we check if the command
need to be replicated, and if it does, we use the placeholder to add it to the
replication buffer instead of appending it to the end.

To be efficient and not allocating memory on each command to save the
placeholder, the replication buffer array was modified to reuse memory
(instead of allocating it each time we want to replicate commands).
Also, to avoid saving a placeholder when not needed, we do it only for
WRITE or MAY_REPLICATE commands.

#### Additional Fixes

* Expire and Eviction notifications:
  * Expire/Eviction logical order was to first perform the Expire/Eviction
    and then the notification logic. The replication buffer got this in the
    other way around (first notification effect and then the `del` command).
    The PR fixes this issue.
  * The notification effect and the `del` command was not wrap with
    `multi-exec` (if needed). The PR also fix this issue.
* SPOP command:
  * On spop, the `spop` notification was fired before the command logic
    was executed. The change in this PR would have cause the replication
    order to be change (first `spop` command and then notification `logic`)
    although the logical order is first the notification logic and then the
    `spop` logic. The right fix would have been to move the notification to
    be fired after the command was executed (like all the other commands),
    but this can be considered a breaking change. To overcome this, the PR
    keeps the current behavior and changes the `spop` code to keep the right
    logical order when pushing commands to the replication buffer. Another PR
    will follow to fix the SPOP properly and match it to the other command (we
    split it to 2 separate PR's so it will be easy to cherry-pick this PR to 7.0 if
    we chose to).

#### Unhanded Known Limitations

* key miss event:
  * On key miss event, if a module performed some write command on the
    event (using `RM_Call`), the `dirty` counter would increase and the read
    command that cause the key miss event would be replicated to the replication
    and aof. This problem can also happened on a write command that open
    some keys but eventually decides not to perform any action. We decided
    not to handle this problem on this PR because the solution is complex
    and will cause additional risks in case we will want to cherry-pick this PR.
    We should decide if we want to handle it in future PR's. For now, modules
    writers is advice not to perform any write commands on key miss event.

#### Testing

* We already have tests to cover cases where a notification is invoking write
  commands that are also added to the replication buffer, the tests was modified
  to verify that the replica gets the command in the correct logical order.
* Test was added to verify that `spop` behavior was kept unchanged.
* Test was added to verify key miss event behave as expected.
* Test was added to verify the changes do not break lazy expiration.

#### Additional Changes

* `propagateNow` function can accept a special dbid, -1, indicating not
  to replicate `select`. We use this to replicate `multi/exec` on `propagatePendingCommands`
  function. The side effect of this change is that now the `select` command
  will appear inside the `multi/exec` block on the replication stream (instead of
  outside of the `multi/exec` block). Tests was modified to match this new behavior.
2022-08-18 10:16:32 +03:00

1253 lines
43 KiB
C

/*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
/*-----------------------------------------------------------------------------
* Set Commands
*----------------------------------------------------------------------------*/
void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
robj *dstkey, int op);
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
* hash table. */
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
}
/* Add the specified value into a set.
*
* If the value was already member of the set, nothing is done and 0 is
* returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* Convert to regular set when the intset contains
* too many entries. */
size_t max_entries = server.set_max_intset_entries;
/* limit to 1G entries due to intset internals. */
if (max_entries >= 1<<30) max_entries = 1<<30;
if (intsetLen(subject->ptr) > max_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
/* Failed to get integer from object, convert to regular set. */
setTypeConvert(subject,OBJ_ENCODING_HT);
/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}
int setTypeRemove(robj *setobj, sds value) {
long long llval;
if (setobj->encoding == OBJ_ENCODING_HT) {
if (dictDelete(setobj->ptr,value) == DICT_OK) {
if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr);
return 1;
}
} else if (setobj->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
int success;
setobj->ptr = intsetRemove(setobj->ptr,llval,&success);
if (success) return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}
int setTypeIsMember(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
return dictFind((dict*)subject->ptr,value) != NULL;
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
return intsetFind((intset*)subject->ptr,llval);
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}
setTypeIterator *setTypeInitIterator(robj *subject) {
setTypeIterator *si = zmalloc(sizeof(setTypeIterator));
si->subject = subject;
si->encoding = subject->encoding;
if (si->encoding == OBJ_ENCODING_HT) {
si->di = dictGetIterator(subject->ptr);
} else if (si->encoding == OBJ_ENCODING_INTSET) {
si->ii = 0;
} else {
serverPanic("Unknown set encoding");
}
return si;
}
void setTypeReleaseIterator(setTypeIterator *si) {
if (si->encoding == OBJ_ENCODING_HT)
dictReleaseIterator(si->di);
zfree(si);
}
/* Move to the next entry in the set. Returns the object at the current
* position.
*
* Since set elements can be internally be stored as SDS strings or
* simple arrays of integers, setTypeNext returns the encoding of the
* set object you are iterating, and will populate the appropriate pointer
* (sdsele) or (llele) accordingly.
*
* Note that both the sdsele and llele pointers should be passed and cannot
* be NULL since the function will try to defensively populate the non
* used field with values which are easy to trap if misused.
*
* When there are no longer elements -1 is returned. */
int setTypeNext(setTypeIterator *si, sds *sdsele, int64_t *llele) {
if (si->encoding == OBJ_ENCODING_HT) {
dictEntry *de = dictNext(si->di);
if (de == NULL) return -1;
*sdsele = dictGetKey(de);
*llele = -123456789; /* Not needed. Defensive. */
} else if (si->encoding == OBJ_ENCODING_INTSET) {
if (!intsetGet(si->subject->ptr,si->ii++,llele))
return -1;
*sdsele = NULL; /* Not needed. Defensive. */
} else {
serverPanic("Wrong set encoding in setTypeNext");
}
return si->encoding;
}
/* The not copy on write friendly version but easy to use version
* of setTypeNext() is setTypeNextObject(), returning new SDS
* strings. So if you don't retain a pointer to this object you should call
* sdsfree() against it.
*
* This function is the way to go for write operations where COW is not
* an issue. */
sds setTypeNextObject(setTypeIterator *si) {
int64_t intele;
sds sdsele;
int encoding;
encoding = setTypeNext(si,&sdsele,&intele);
switch(encoding) {
case -1: return NULL;
case OBJ_ENCODING_INTSET:
return sdsfromlonglong(intele);
case OBJ_ENCODING_HT:
return sdsdup(sdsele);
default:
serverPanic("Unsupported encoding");
}
return NULL; /* just to suppress warnings */
}
/* Return random element from a non empty set.
* The returned element can be an int64_t value if the set is encoded
* as an "intset" blob of integers, or an SDS string if the set
* is a regular set.
*
* The caller provides both pointers to be populated with the right
* object. The return value of the function is the object->encoding
* field of the object and is used by the caller to check if the
* int64_t pointer or the sds pointer was populated.
*
* Note that both the sdsele and llele pointers should be passed and cannot
* be NULL since the function will try to defensively populate the non
* used field with values which are easy to trap if misused. */
int setTypeRandomElement(robj *setobj, sds *sdsele, int64_t *llele) {
if (setobj->encoding == OBJ_ENCODING_HT) {
dictEntry *de = dictGetFairRandomKey(setobj->ptr);
*sdsele = dictGetKey(de);
*llele = -123456789; /* Not needed. Defensive. */
} else if (setobj->encoding == OBJ_ENCODING_INTSET) {
*llele = intsetRandom(setobj->ptr);
*sdsele = NULL; /* Not needed. Defensive. */
} else {
serverPanic("Unknown set encoding");
}
return setobj->encoding;
}
unsigned long setTypeSize(const robj *subject) {
if (subject->encoding == OBJ_ENCODING_HT) {
return dictSize((const dict*)subject->ptr);
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
return intsetLen((const intset*)subject->ptr);
} else {
serverPanic("Unknown set encoding");
}
}
/* Convert the set to specified encoding. The resulting dict (when converting
* to a hash table) is presized to hold the number of elements in the original
* set. */
void setTypeConvert(robj *setobj, int enc) {
setTypeIterator *si;
serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&
setobj->encoding == OBJ_ENCODING_INTSET);
if (enc == OBJ_ENCODING_HT) {
int64_t intele;
dict *d = dictCreate(&setDictType);
sds element;
/* Presize the dict to avoid rehashing */
dictExpand(d,intsetLen(setobj->ptr));
/* To add the elements we extract integers and create redis objects */
si = setTypeInitIterator(setobj);
while (setTypeNext(si,&element,&intele) != -1) {
element = sdsfromlonglong(intele);
serverAssert(dictAdd(d,element,NULL) == DICT_OK);
}
setTypeReleaseIterator(si);
setobj->encoding = OBJ_ENCODING_HT;
zfree(setobj->ptr);
setobj->ptr = d;
} else {
serverPanic("Unsupported set conversion");
}
}
/* This is a helper function for the COPY command.
* Duplicate a set object, with the guarantee that the returned object
* has the same encoding as the original one.
*
* The resulting object always has refcount set to 1 */
robj *setTypeDup(robj *o) {
robj *set;
setTypeIterator *si;
sds elesds;
int64_t intobj;
serverAssert(o->type == OBJ_SET);
/* Create a new set object that have the same encoding as the original object's encoding */
if (o->encoding == OBJ_ENCODING_INTSET) {
intset *is = o->ptr;
size_t size = intsetBlobLen(is);
intset *newis = zmalloc(size);
memcpy(newis,is,size);
set = createObject(OBJ_SET, newis);
set->encoding = OBJ_ENCODING_INTSET;
} else if (o->encoding == OBJ_ENCODING_HT) {
set = createSetObject();
dict *d = o->ptr;
dictExpand(set->ptr, dictSize(d));
si = setTypeInitIterator(o);
while (setTypeNext(si, &elesds, &intobj) != -1) {
setTypeAdd(set, elesds);
}
setTypeReleaseIterator(si);
} else {
serverPanic("Unknown set encoding");
}
return set;
}
void saddCommand(client *c) {
robj *set;
int j, added = 0;
set = lookupKeyWrite(c->db,c->argv[1]);
if (checkType(c,set,OBJ_SET)) return;
if (set == NULL) {
set = setTypeCreate(c->argv[2]->ptr);
dbAdd(c->db,c->argv[1],set);
}
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
addReplyLongLong(c,added);
}
void sremCommand(client *c) {
robj *set;
int j, deleted = 0, keyremoved = 0;
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;
for (j = 2; j < c->argc; j++) {
if (setTypeRemove(set,c->argv[j]->ptr)) {
deleted++;
if (setTypeSize(set) == 0) {
dbDelete(c->db,c->argv[1]);
keyremoved = 1;
break;
}
}
}
if (deleted) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
c->db->id);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}
void smoveCommand(client *c) {
robj *srcset, *dstset, *ele;
srcset = lookupKeyWrite(c->db,c->argv[1]);
dstset = lookupKeyWrite(c->db,c->argv[2]);
ele = c->argv[3];
/* If the source key does not exist return 0 */
if (srcset == NULL) {
addReply(c,shared.czero);
return;
}
/* If the source key has the wrong type, or the destination key
* is set and has the wrong type, return with an error. */
if (checkType(c,srcset,OBJ_SET) ||
checkType(c,dstset,OBJ_SET)) return;
/* If srcset and dstset are equal, SMOVE is a no-op */
if (srcset == dstset) {
addReply(c,setTypeIsMember(srcset,ele->ptr) ?
shared.cone : shared.czero);
return;
}
/* If the element cannot be removed from the src set, return 0. */
if (!setTypeRemove(srcset,ele->ptr)) {
addReply(c,shared.czero);
return;
}
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
/* Remove the src set from the database when empty */
if (setTypeSize(srcset) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
/* Create the destination set when it doesn't exist */
if (!dstset) {
dstset = setTypeCreate(ele->ptr);
dbAdd(c->db,c->argv[2],dstset);
}
signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++;
/* An extra key has changed when ele was successfully added to dstset */
if (setTypeAdd(dstset,ele->ptr)) {
server.dirty++;
signalModifiedKey(c,c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id);
}
addReply(c,shared.cone);
}
void sismemberCommand(client *c) {
robj *set;
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;
if (setTypeIsMember(set,c->argv[2]->ptr))
addReply(c,shared.cone);
else
addReply(c,shared.czero);
}
void smismemberCommand(client *c) {
robj *set;
int j;
/* Don't abort when the key cannot be found. Non-existing keys are empty
* sets, where SMISMEMBER should respond with a series of zeros. */
set = lookupKeyRead(c->db,c->argv[1]);
if (set && checkType(c,set,OBJ_SET)) return;
addReplyArrayLen(c,c->argc - 2);
for (j = 2; j < c->argc; j++) {
if (set && setTypeIsMember(set,c->argv[j]->ptr))
addReply(c,shared.cone);
else
addReply(c,shared.czero);
}
}
void scardCommand(client *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_SET)) return;
addReplyLongLong(c,setTypeSize(o));
}
/* Handle the "SPOP key <count>" variant. The normal version of the
* command is handled by the spopCommand() function itself. */
/* How many times bigger should be the set compared to the remaining size
* for us to use the "create new set" strategy? Read later in the
* implementation for more info. */
#define SPOP_MOVE_STRATEGY_MUL 5
void spopWithCountCommand(client *c) {
long l;
unsigned long count, size;
robj *set;
/* Get the count argument */
if (getPositiveLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
count = (unsigned long) l;
/* Make sure a key with the name inputted exists, and that it's type is
* indeed a set. Otherwise, return nil */
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.emptyset[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
/* If count is zero, serve an empty set ASAP to avoid special
* cases later. */
if (count == 0) {
addReply(c,shared.emptyset[c->resp]);
return;
}
size = setTypeSize(set);
/* Generate an SPOP keyspace notification */
notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id);
server.dirty += (count >= size) ? size : count;
/* CASE 1:
* The number of requested elements is greater than or equal to
* the number of elements inside the set: simply return the whole set. */
if (count >= size) {
/* We just return the entire set */
sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);
/* Delete the set as it is now empty */
dbDelete(c->db,c->argv[1]);
/* todo: Move the spop notification to be executed after the command logic.
* We can then decide if we want to keep the `alsoPropagate` or move to `rewriteClientCommandVector`. */
/* Propagate del command */
robj *propagate[2];
propagate[0] = shared.del;
propagate[1] = c->argv[1];
alsoPropagate(c->db->id,propagate,2,PROPAGATE_AOF|PROPAGATE_REPL);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
signalModifiedKey(c,c->db,c->argv[1]);
preventCommandPropagation(c);
return;
}
/* Case 2 and 3 require to replicate SPOP as a set of SREM commands.
* Prepare our replication argument vector. Also send the array length
* which is common to both the code paths. */
robj *propargv[3];
propargv[0] = shared.srem;
propargv[1] = c->argv[1];
addReplySetLen(c,count);
/* Common iteration vars. */
sds sdsele;
robj *objele;
int encoding;
int64_t llele;
unsigned long remaining = size-count; /* Elements left after SPOP. */
/* If we are here, the number of requested elements is less than the
* number of elements inside the set. Also we are sure that count < size.
* Use two different strategies.
*
* CASE 2: The number of elements to return is small compared to the
* set size. We can just extract random elements and return them to
* the set. */
if (remaining*SPOP_MOVE_STRATEGY_MUL > count) {
while(count--) {
/* Emit and remove. */
encoding = setTypeRandomElement(set,&sdsele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
objele = createStringObjectFromLongLong(llele);
set->ptr = intsetRemove(set->ptr,llele,NULL);
} else {
addReplyBulkCBuffer(c,sdsele,sdslen(sdsele));
objele = createStringObject(sdsele,sdslen(sdsele));
setTypeRemove(set,sdsele);
}
/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
}
} else {
/* CASE 3: The number of elements to return is very big, approaching
* the size of the set itself. After some time extracting random elements
* from such a set becomes computationally expensive, so we use
* a different strategy, we extract random elements that we don't
* want to return (the elements that will remain part of the set),
* creating a new set as we do this (that will be stored as the original
* set). Then we return the elements left in the original set and
* release it. */
robj *newset = NULL;
/* Create a new set with just the remaining elements. */
while(remaining--) {
encoding = setTypeRandomElement(set,&sdsele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
sdsele = sdsfromlonglong(llele);
} else {
sdsele = sdsdup(sdsele);
}
if (!newset) newset = setTypeCreate(sdsele);
setTypeAdd(newset,sdsele);
setTypeRemove(set,sdsele);
sdsfree(sdsele);
}
/* Transfer the old set to the client. */
setTypeIterator *si;
si = setTypeInitIterator(set);
while((encoding = setTypeNext(si,&sdsele,&llele)) != -1) {
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
objele = createStringObjectFromLongLong(llele);
} else {
addReplyBulkCBuffer(c,sdsele,sdslen(sdsele));
objele = createStringObject(sdsele,sdslen(sdsele));
}
/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
}
setTypeReleaseIterator(si);
/* Assign the new set as the key value. */
dbOverwrite(c->db,c->argv[1],newset);
}
/* Don't propagate the command itself even if we incremented the
* dirty counter. We don't want to propagate an SPOP command since
* we propagated the command as a set of SREMs operations using
* the alsoPropagate() API. */
preventCommandPropagation(c);
signalModifiedKey(c,c->db,c->argv[1]);
}
void spopCommand(client *c) {
robj *set, *ele;
sds sdsele;
int64_t llele;
int encoding;
if (c->argc == 3) {
spopWithCountCommand(c);
return;
} else if (c->argc > 3) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
/* Make sure a key with the name inputted exists, and that it's type is
* indeed a set */
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
/* Get a random element from the set */
encoding = setTypeRandomElement(set,&sdsele,&llele);
/* Remove the element from the set */
if (encoding == OBJ_ENCODING_INTSET) {
ele = createStringObjectFromLongLong(llele);
set->ptr = intsetRemove(set->ptr,llele,NULL);
} else {
ele = createStringObject(sdsele,sdslen(sdsele));
setTypeRemove(set,ele->ptr);
}
notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id);
/* Replicate/AOF this command as an SREM operation */
rewriteClientCommandVector(c,3,shared.srem,c->argv[1],ele);
/* Add the element to the reply */
addReplyBulk(c,ele);
decrRefCount(ele);
/* Delete the set if it's empty */
if (setTypeSize(set) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
/* Set has been modified */
signalModifiedKey(c,c->db,c->argv[1]);
server.dirty++;
}
/* handle the "SRANDMEMBER key <count>" variant. The normal version of the
* command is handled by the srandmemberCommand() function itself. */
/* How many times bigger should be the set compared to the requested size
* for us to don't use the "remove elements" strategy? Read later in the
* implementation for more info. */
#define SRANDMEMBER_SUB_STRATEGY_MUL 3
void srandmemberWithCountCommand(client *c) {
long l;
unsigned long count, size;
int uniq = 1;
robj *set;
sds ele;
int64_t llele;
int encoding;
dict *d;
if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
if (l >= 0) {
count = (unsigned long) l;
} else {
/* A negative count means: return the same elements multiple times
* (i.e. don't remove the extracted element after every extraction). */
count = -l;
uniq = 0;
}
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray))
== NULL || checkType(c,set,OBJ_SET)) return;
size = setTypeSize(set);
/* If count is zero, serve it ASAP to avoid special cases later. */
if (count == 0) {
addReply(c,shared.emptyarray);
return;
}
/* CASE 1: The count was negative, so the extraction method is just:
* "return N random elements" sampling the whole set every time.
* This case is trivial and can be served without auxiliary data
* structures. This case is the only one that also needs to return the
* elements in random order. */
if (!uniq || count == 1) {
addReplyArrayLen(c,count);
while(count--) {
encoding = setTypeRandomElement(set,&ele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
} else {
addReplyBulkCBuffer(c,ele,sdslen(ele));
}
}
return;
}
/* CASE 2:
* The number of requested elements is greater than the number of
* elements inside the set: simply return the whole set. */
if (count >= size) {
setTypeIterator *si;
addReplyArrayLen(c,size);
si = setTypeInitIterator(set);
while ((encoding = setTypeNext(si,&ele,&llele)) != -1) {
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
} else {
addReplyBulkCBuffer(c,ele,sdslen(ele));
}
size--;
}
setTypeReleaseIterator(si);
serverAssert(size==0);
return;
}
/* For CASE 3 and CASE 4 we need an auxiliary dictionary. */
d = dictCreate(&sdsReplyDictType);
/* CASE 3:
* The number of elements inside the set is not greater than
* SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
* In this case we create a set from scratch with all the elements, and
* subtract random elements to reach the requested number of elements.
*
* This is done because if the number of requested elements is just
* a bit less than the number of elements in the set, the natural approach
* used into CASE 4 is highly inefficient. */
if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
setTypeIterator *si;
/* Add all the elements into the temporary dictionary. */
si = setTypeInitIterator(set);
dictExpand(d, size);
while ((encoding = setTypeNext(si,&ele,&llele)) != -1) {
int retval = DICT_ERR;
if (encoding == OBJ_ENCODING_INTSET) {
retval = dictAdd(d,sdsfromlonglong(llele),NULL);
} else {
retval = dictAdd(d,sdsdup(ele),NULL);
}
serverAssert(retval == DICT_OK);
}
setTypeReleaseIterator(si);
serverAssert(dictSize(d) == size);
/* Remove random elements to reach the right count. */
while (size > count) {
dictEntry *de;
de = dictGetFairRandomKey(d);
dictUnlink(d,dictGetKey(de));
sdsfree(dictGetKey(de));
dictFreeUnlinkedEntry(d,de);
size--;
}
}
/* CASE 4: We have a big set compared to the requested number of elements.
* In this case we can simply get random elements from the set and add
* to the temporary set, trying to eventually get enough unique elements
* to reach the specified count. */
else {
unsigned long added = 0;
sds sdsele;
dictExpand(d, count);
while (added < count) {
encoding = setTypeRandomElement(set,&ele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
sdsele = sdsfromlonglong(llele);
} else {
sdsele = sdsdup(ele);
}
/* Try to add the object to the dictionary. If it already exists
* free it, otherwise increment the number of objects we have
* in the result dictionary. */
if (dictAdd(d,sdsele,NULL) == DICT_OK)
added++;
else
sdsfree(sdsele);
}
}
/* CASE 3 & 4: send the result to the user. */
{
dictIterator *di;
dictEntry *de;
addReplyArrayLen(c,count);
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL)
addReplyBulkSds(c,dictGetKey(de));
dictReleaseIterator(di);
dictRelease(d);
}
}
/* SRANDMEMBER <key> [<count>] */
void srandmemberCommand(client *c) {
robj *set;
sds ele;
int64_t llele;
int encoding;
if (c->argc == 3) {
srandmemberWithCountCommand(c);
return;
} else if (c->argc > 3) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
/* Handle variant without <count> argument. Reply with simple bulk string */
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
== NULL || checkType(c,set,OBJ_SET)) return;
encoding = setTypeRandomElement(set,&ele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
} else {
addReplyBulkCBuffer(c,ele,sdslen(ele));
}
}
int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
if (setTypeSize(*(robj**)s1) > setTypeSize(*(robj**)s2)) return 1;
if (setTypeSize(*(robj**)s1) < setTypeSize(*(robj**)s2)) return -1;
return 0;
}
/* This is used by SDIFF and in this case we can receive NULL that should
* be handled as empty sets. */
int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {
robj *o1 = *(robj**)s1, *o2 = *(robj**)s2;
unsigned long first = o1 ? setTypeSize(o1) : 0;
unsigned long second = o2 ? setTypeSize(o2) : 0;
if (first < second) return 1;
if (first > second) return -1;
return 0;
}
/* SINTER / SMEMBERS / SINTERSTORE / SINTERCARD
*
* 'cardinality_only' work for SINTERCARD, only return the cardinality
* with minimum processing and memory overheads.
*
* 'limit' work for SINTERCARD, stop searching after reaching the limit.
* Passing a 0 means unlimited.
*/
void sinterGenericCommand(client *c, robj **setkeys,
unsigned long setnum, robj *dstkey,
int cardinality_only, unsigned long limit) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *dstset = NULL;
sds elesds;
int64_t intobj;
void *replylen = NULL;
unsigned long j, cardinality = 0;
int encoding, empty = 0;
for (j = 0; j < setnum; j++) {
robj *setobj = lookupKeyRead(c->db, setkeys[j]);
if (!setobj) {
/* A NULL is considered an empty set */
empty += 1;
sets[j] = NULL;
continue;
}
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
/* Set intersection with an empty set always results in an empty set.
* Return ASAP if there is an empty set. */
if (empty > 0) {
zfree(sets);
if (dstkey) {
if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
server.dirty++;
}
addReply(c,shared.czero);
} else if (cardinality_only) {
addReplyLongLong(c,cardinality);
} else {
addReply(c,shared.emptyset[c->resp]);
}
return;
}
/* Sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
/* The first thing we should output is the total number of elements...
* since this is a multi-bulk write, but at this stage we don't know
* the intersection set size, so we use a trick, append an empty object
* to the output list and save the pointer to later modify it with the
* right length */
if (dstkey) {
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
dstset = createIntsetObject();
} else if (!cardinality_only) {
replylen = addReplyDeferredLen(c);
}
/* Iterate all the elements of the first (smallest) set, and test
* the element against all the other sets, if at least one set does
* not include the element it is discarded */
si = setTypeInitIterator(sets[0]);
while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
for (j = 1; j < setnum; j++) {
if (sets[j] == sets[0]) continue;
if (encoding == OBJ_ENCODING_INTSET) {
/* intset with intset is simple... and fast */
if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,intobj))
{
break;
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
} else if (sets[j]->encoding == OBJ_ENCODING_HT) {
elesds = sdsfromlonglong(intobj);
if (!setTypeIsMember(sets[j],elesds)) {
sdsfree(elesds);
break;
}
sdsfree(elesds);
}
} else if (encoding == OBJ_ENCODING_HT) {
if (!setTypeIsMember(sets[j],elesds)) {
break;
}
}
}
/* Only take action when all sets contain the member */
if (j == setnum) {
if (cardinality_only) {
cardinality++;
/* We stop the searching after reaching the limit. */
if (limit && cardinality >= limit)
break;
} else if (!dstkey) {
if (encoding == OBJ_ENCODING_HT)
addReplyBulkCBuffer(c,elesds,sdslen(elesds));
else
addReplyBulkLongLong(c,intobj);
cardinality++;
} else {
if (encoding == OBJ_ENCODING_INTSET) {
elesds = sdsfromlonglong(intobj);
setTypeAdd(dstset,elesds);
sdsfree(elesds);
} else {
setTypeAdd(dstset,elesds);
}
}
}
}
setTypeReleaseIterator(si);
if (cardinality_only) {
addReplyLongLong(c,cardinality);
} else if (dstkey) {
/* Store the resulting set into the target, if the intersection
* is not an empty set. */
if (setTypeSize(dstset) > 0) {
setKey(c,c->db,dstkey,dstset,0);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",
dstkey,c->db->id);
server.dirty++;
} else {
addReply(c,shared.czero);
if (dbDelete(c->db,dstkey)) {
server.dirty++;
signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
}
}
decrRefCount(dstset);
} else {
setDeferredSetLen(c,replylen,cardinality);
}
zfree(sets);
}
/* SINTER key [key ...] */
void sinterCommand(client *c) {
sinterGenericCommand(c, c->argv+1, c->argc-1, NULL, 0, 0);
}
/* SINTERCARD numkeys key [key ...] [LIMIT limit] */
void sinterCardCommand(client *c) {
long j;
long numkeys = 0; /* Number of keys. */
long limit = 0; /* 0 means not limit. */
if (getRangeLongFromObjectOrReply(c, c->argv[1], 1, LONG_MAX,
&numkeys, "numkeys should be greater than 0") != C_OK)
return;
if (numkeys > (c->argc - 2)) {
addReplyError(c, "Number of keys can't be greater than number of args");
return;
}
for (j = 2 + numkeys; j < c->argc; j++) {
char *opt = c->argv[j]->ptr;
int moreargs = (c->argc - 1) - j;
if (!strcasecmp(opt, "LIMIT") && moreargs) {
j++;
if (getPositiveLongFromObjectOrReply(c, c->argv[j], &limit,
"LIMIT can't be negative") != C_OK)
return;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
}
sinterGenericCommand(c, c->argv+2, numkeys, NULL, 1, limit);
}
/* SINTERSTORE destination key [key ...] */
void sinterstoreCommand(client *c) {
sinterGenericCommand(c, c->argv+2, c->argc-2, c->argv[1], 0, 0);
}
void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
robj *dstkey, int op) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *dstset = NULL;
sds ele;
int j, cardinality = 0;
int diff_algo = 1;
int sameset = 0;
for (j = 0; j < setnum; j++) {
robj *setobj = lookupKeyRead(c->db, setkeys[j]);
if (!setobj) {
sets[j] = NULL;
continue;
}
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
if (j > 0 && sets[0] == sets[j]) {
sameset = 1;
}
}
/* Select what DIFF algorithm to use.
*
* Algorithm 1 is O(N*M) where N is the size of the element first set
* and M the total number of sets.
*
* Algorithm 2 is O(N) where N is the total number of elements in all
* the sets.
*
* We compute what is the best bet with the current input here. */
if (op == SET_OP_DIFF && sets[0] && !sameset) {
long long algo_one_work = 0, algo_two_work = 0;
for (j = 0; j < setnum; j++) {
if (sets[j] == NULL) continue;
algo_one_work += setTypeSize(sets[0]);
algo_two_work += setTypeSize(sets[j]);
}
/* Algorithm 1 has better constant times and performs less operations
* if there are elements in common. Give it some advantage. */
algo_one_work /= 2;
diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;
if (diff_algo == 1 && setnum > 1) {
/* With algorithm 1 it is better to order the sets to subtract
* by decreasing size, so that we are more likely to find
* duplicated elements ASAP. */
qsort(sets+1,setnum-1,sizeof(robj*),
qsortCompareSetsByRevCardinality);
}
}
/* We need a temp set object to store our union. If the dstkey
* is not NULL (that is, we are inside an SUNIONSTORE operation) then
* this set object will be the resulting object to set into the target key*/
dstset = createIntsetObject();
if (op == SET_OP_UNION) {
/* Union is trivial, just add every element of every set to the
* temporary set. */
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (setTypeAdd(dstset,ele)) cardinality++;
sdsfree(ele);
}
setTypeReleaseIterator(si);
}
} else if (op == SET_OP_DIFF && sameset) {
/* At least one of the sets is the same one (same key) as the first one, result must be empty. */
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {
/* DIFF Algorithm 1:
*
* We perform the diff by iterating all the elements of the first set,
* and only adding it to the target set if the element does not exist
* into all the other sets.
*
* This way we perform at max N*M operations, where N is the size of
* the first set, and M the number of sets. */
si = setTypeInitIterator(sets[0]);
while((ele = setTypeNextObject(si)) != NULL) {
for (j = 1; j < setnum; j++) {
if (!sets[j]) continue; /* no key is an empty set. */
if (sets[j] == sets[0]) break; /* same set! */
if (setTypeIsMember(sets[j],ele)) break;
}
if (j == setnum) {
/* There is no other set with this element. Add it. */
setTypeAdd(dstset,ele);
cardinality++;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {
/* DIFF Algorithm 2:
*
* Add all the elements of the first set to the auxiliary set.
* Then remove all the elements of all the next sets from it.
*
* This is O(N) where N is the sum of all the elements in every
* set. */
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (j == 0) {
if (setTypeAdd(dstset,ele)) cardinality++;
} else {
if (setTypeRemove(dstset,ele)) cardinality--;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
/* Exit if result set is empty as any additional removal
* of elements will have no effect. */
if (cardinality == 0) break;
}
}
/* Output the content of the resulting set, if not in STORE mode */
if (!dstkey) {
addReplySetLen(c,cardinality);
si = setTypeInitIterator(dstset);
while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulkCBuffer(c,ele,sdslen(ele));
sdsfree(ele);
}
setTypeReleaseIterator(si);
server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset, -1) :
decrRefCount(dstset);
} else {
/* If we have a target key where to store the resulting set
* create this key with the result set inside */
if (setTypeSize(dstset) > 0) {
setKey(c,c->db,dstkey,dstset,0);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(NOTIFY_SET,
op == SET_OP_UNION ? "sunionstore" : "sdiffstore",
dstkey,c->db->id);
server.dirty++;
} else {
addReply(c,shared.czero);
if (dbDelete(c->db,dstkey)) {
server.dirty++;
signalModifiedKey(c,c->db,dstkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);
}
}
decrRefCount(dstset);
}
zfree(sets);
}
/* SUNION key [key ...] */
void sunionCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_UNION);
}
/* SUNIONSTORE destination key [key ...] */
void sunionstoreCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_UNION);
}
/* SDIFF key [key ...] */
void sdiffCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_DIFF);
}
/* SDIFFSTORE destination key [key ...] */
void sdiffstoreCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_DIFF);
}
void sscanCommand(client *c) {
robj *set;
unsigned long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
checkType(c,set,OBJ_SET)) return;
scanGenericCommand(c,set,cursor);
}