Add COPY command (#7953)
Syntax: COPY <key> <new-key> [DB <dest-db>] [REPLACE] No support for module keys yet. Co-authored-by: tmgauss Co-authored-by: Itamar Haber <itamar@redislabs.com> Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
9812e88959
commit
ea7cf737a1
102
src/db.c
102
src/db.c
@ -1079,6 +1079,108 @@ void moveCommand(client *c) {
|
|||||||
addReply(c,shared.cone);
|
addReply(c,shared.cone);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void copyCommand(client *c) {
|
||||||
|
robj *o;
|
||||||
|
redisDb *src, *dst;
|
||||||
|
int srcid;
|
||||||
|
long long dbid, expire;
|
||||||
|
int j, replace = 0, delete = 0;
|
||||||
|
|
||||||
|
/* Obtain source and target DB pointers
|
||||||
|
* Default target DB is the same as the source DB
|
||||||
|
* Parse the REPLACE option and targetDB option. */
|
||||||
|
src = c->db;
|
||||||
|
dst = c->db;
|
||||||
|
srcid = c->db->id;
|
||||||
|
dbid = c->db->id;
|
||||||
|
for (j = 3; j < c->argc; j++) {
|
||||||
|
int additional = c->argc - j - 1;
|
||||||
|
if (!strcasecmp(c->argv[j]->ptr,"replace")) {
|
||||||
|
replace = 1;
|
||||||
|
} else if (!strcasecmp(c->argv[j]->ptr, "db") && additional >= 1) {
|
||||||
|
if (getLongLongFromObject(c->argv[j+1], &dbid) == C_ERR ||
|
||||||
|
dbid < INT_MIN || dbid > INT_MAX ||
|
||||||
|
selectDb(c, dbid) == C_ERR)
|
||||||
|
{
|
||||||
|
addReplyError(c,"invalid DB index");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
dst = c->db;
|
||||||
|
selectDb(c,srcid); /* Back to the source DB */
|
||||||
|
j++; /* Consume additional arg. */
|
||||||
|
} else {
|
||||||
|
addReply(c, shared.syntaxerr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) {
|
||||||
|
addReplyError(c,"Copying to another database is not allowed in cluster mode");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* If the user select the same DB as
|
||||||
|
* the source DB and using newkey as the same key
|
||||||
|
* it is probably an error. */
|
||||||
|
robj *key = c->argv[1];
|
||||||
|
robj *newkey = c->argv[2];
|
||||||
|
if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) {
|
||||||
|
addReply(c,shared.sameobjecterr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check if the element exists and get a reference */
|
||||||
|
o = lookupKeyWrite(c->db, key);
|
||||||
|
if (!o) {
|
||||||
|
addReply(c,shared.czero);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
expire = getExpire(c->db,key);
|
||||||
|
|
||||||
|
/* Return zero if the key already exists in the target DB.
|
||||||
|
* If REPLACE option is selected, delete newkey from targetDB. */
|
||||||
|
if (lookupKeyWrite(dst,newkey) != NULL) {
|
||||||
|
if (replace) {
|
||||||
|
delete = 1;
|
||||||
|
} else {
|
||||||
|
addReply(c,shared.czero);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Duplicate object according to object's type. */
|
||||||
|
robj *newobj;
|
||||||
|
switch(o->type) {
|
||||||
|
case OBJ_STRING: newobj = dupStringObject(o); break;
|
||||||
|
case OBJ_LIST: newobj = listTypeDup(o); break;
|
||||||
|
case OBJ_SET: newobj = setTypeDup(o); break;
|
||||||
|
case OBJ_ZSET: newobj = zsetDup(o); break;
|
||||||
|
case OBJ_HASH: newobj = hashTypeDup(o); break;
|
||||||
|
case OBJ_STREAM: newobj = streamDup(o); break;
|
||||||
|
case OBJ_MODULE:
|
||||||
|
addReplyError(c, "Copying module type object is not supported");
|
||||||
|
return;
|
||||||
|
default: {
|
||||||
|
addReplyError(c, "unknown type object");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (delete) {
|
||||||
|
dbDelete(dst,newkey);
|
||||||
|
}
|
||||||
|
|
||||||
|
dbAdd(dst,newkey,newobj);
|
||||||
|
if (expire != -1) setExpire(c, dst, newkey, expire);
|
||||||
|
|
||||||
|
/* OK! key copied */
|
||||||
|
signalModifiedKey(c,dst,c->argv[2]);
|
||||||
|
notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id);
|
||||||
|
|
||||||
|
server.dirty++;
|
||||||
|
addReply(c,shared.cone);
|
||||||
|
}
|
||||||
|
|
||||||
/* Helper function for dbSwapDatabases(): scans the list of keys that have
|
/* Helper function for dbSwapDatabases(): scans the list of keys that have
|
||||||
* one or more blocked clients for B[LR]POP or other blocking commands
|
* one or more blocked clients for B[LR]POP or other blocking commands
|
||||||
* and signal the keys as ready if they are of the right type. See the comment
|
* and signal the keys as ready if they are of the right type. See the comment
|
||||||
|
@ -630,6 +630,10 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
"write fast @keyspace",
|
"write fast @keyspace",
|
||||||
0,NULL,1,1,1,0,0,0},
|
0,NULL,1,1,1,0,0,0},
|
||||||
|
|
||||||
|
{"copy",copyCommand,-3,
|
||||||
|
"write use-memory @keyspace",
|
||||||
|
0,NULL,1,2,1,0,0,0},
|
||||||
|
|
||||||
/* Like for SET, we can't mark rename as a fast command because
|
/* Like for SET, we can't mark rename as a fast command because
|
||||||
* overwriting the target key may result in an implicit slow DEL. */
|
* overwriting the target key may result in an implicit slow DEL. */
|
||||||
{"rename",renameCommand,3,
|
{"rename",renameCommand,3,
|
||||||
|
@ -1796,6 +1796,7 @@ void listTypeInsert(listTypeEntry *entry, robj *value, int where);
|
|||||||
int listTypeEqual(listTypeEntry *entry, robj *o);
|
int listTypeEqual(listTypeEntry *entry, robj *o);
|
||||||
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
|
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry);
|
||||||
void listTypeConvert(robj *subject, int enc);
|
void listTypeConvert(robj *subject, int enc);
|
||||||
|
robj *listTypeDup(robj *o);
|
||||||
void unblockClientWaitingData(client *c);
|
void unblockClientWaitingData(client *c);
|
||||||
void popGenericCommand(client *c, int where);
|
void popGenericCommand(client *c, int where);
|
||||||
void listElementsRemoved(client *c, robj *key, int where, robj *o);
|
void listElementsRemoved(client *c, robj *key, int where, robj *o);
|
||||||
@ -2027,6 +2028,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
|
|||||||
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore);
|
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore);
|
||||||
long zsetRank(robj *zobj, sds ele, int reverse);
|
long zsetRank(robj *zobj, sds ele, int reverse);
|
||||||
int zsetDel(robj *zobj, sds ele);
|
int zsetDel(robj *zobj, sds ele);
|
||||||
|
robj *zsetDup(robj *o);
|
||||||
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg);
|
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg);
|
||||||
sds ziplistGetObject(unsigned char *sptr);
|
sds ziplistGetObject(unsigned char *sptr);
|
||||||
int zslValueGteMin(double value, zrangespec *spec);
|
int zslValueGteMin(double value, zrangespec *spec);
|
||||||
@ -2105,6 +2107,7 @@ int setTypeRandomElement(robj *setobj, sds *sdsele, int64_t *llele);
|
|||||||
unsigned long setTypeRandomElements(robj *set, unsigned long count, robj *aux_set);
|
unsigned long setTypeRandomElements(robj *set, unsigned long count, robj *aux_set);
|
||||||
unsigned long setTypeSize(const robj *subject);
|
unsigned long setTypeSize(const robj *subject);
|
||||||
void setTypeConvert(robj *subject, int enc);
|
void setTypeConvert(robj *subject, int enc);
|
||||||
|
robj *setTypeDup(robj *o);
|
||||||
|
|
||||||
/* Hash data type */
|
/* Hash data type */
|
||||||
#define HASH_SET_TAKE_FIELD (1<<0)
|
#define HASH_SET_TAKE_FIELD (1<<0)
|
||||||
@ -2129,6 +2132,7 @@ sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what);
|
|||||||
robj *hashTypeLookupWriteOrCreate(client *c, robj *key);
|
robj *hashTypeLookupWriteOrCreate(client *c, robj *key);
|
||||||
robj *hashTypeGetValueObject(robj *o, sds field);
|
robj *hashTypeGetValueObject(robj *o, sds field);
|
||||||
int hashTypeSet(robj *o, sds field, sds value, int flags);
|
int hashTypeSet(robj *o, sds field, sds value, int flags);
|
||||||
|
robj *hashTypeDup(robj *o);
|
||||||
|
|
||||||
/* Pub / Sub */
|
/* Pub / Sub */
|
||||||
int pubsubUnsubscribeAllChannels(client *c, int notify);
|
int pubsubUnsubscribeAllChannels(client *c, int notify);
|
||||||
@ -2336,6 +2340,7 @@ void bgsaveCommand(client *c);
|
|||||||
void bgrewriteaofCommand(client *c);
|
void bgrewriteaofCommand(client *c);
|
||||||
void shutdownCommand(client *c);
|
void shutdownCommand(client *c);
|
||||||
void moveCommand(client *c);
|
void moveCommand(client *c);
|
||||||
|
void copyCommand(client *c);
|
||||||
void renameCommand(client *c);
|
void renameCommand(client *c);
|
||||||
void renamenxCommand(client *c);
|
void renamenxCommand(client *c);
|
||||||
void lpushCommand(client *c);
|
void lpushCommand(client *c);
|
||||||
|
@ -118,5 +118,6 @@ int streamCompareID(streamID *a, streamID *b);
|
|||||||
void streamFreeNACK(streamNACK *na);
|
void streamFreeNACK(streamNACK *na);
|
||||||
void streamIncrID(streamID *id);
|
void streamIncrID(streamID *id);
|
||||||
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
|
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
|
||||||
|
robj *streamDup(robj *o);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
54
src/t_hash.c
54
src/t_hash.c
@ -504,6 +504,60 @@ void hashTypeConvert(robj *o, int enc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This is a helper function for the COPY command.
|
||||||
|
* Duplicate a hash object, with the guarantee that the returned object
|
||||||
|
* has the same encoding as the original one.
|
||||||
|
*
|
||||||
|
* The resulting object always has refcount set to 1 */
|
||||||
|
robj *hashTypeDup(robj *o) {
|
||||||
|
robj *hobj;
|
||||||
|
hashTypeIterator *hi;
|
||||||
|
|
||||||
|
serverAssert(o->type == OBJ_HASH);
|
||||||
|
|
||||||
|
switch (o->encoding) {
|
||||||
|
case OBJ_ENCODING_ZIPLIST:
|
||||||
|
hobj = createHashObject();
|
||||||
|
break;
|
||||||
|
case OBJ_ENCODING_HT:
|
||||||
|
hobj = createHashObject();
|
||||||
|
hashTypeConvert(hobj, OBJ_ENCODING_HT);
|
||||||
|
dict *d = o->ptr;
|
||||||
|
dictExpand(hobj->ptr, dictSize(d));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
serverPanic("Wrong encoding.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(o->encoding == OBJ_ENCODING_ZIPLIST){
|
||||||
|
unsigned char *zl = o->ptr;
|
||||||
|
size_t sz = ziplistBlobLen(zl);
|
||||||
|
unsigned char *new_zl = zmalloc(sz);
|
||||||
|
memcpy(new_zl, zl, sz);
|
||||||
|
zfree(hobj->ptr);
|
||||||
|
hobj->ptr = new_zl;
|
||||||
|
} else if(o->encoding == OBJ_ENCODING_HT){
|
||||||
|
hi = hashTypeInitIterator(o);
|
||||||
|
while (hashTypeNext(hi) != C_ERR) {
|
||||||
|
sds field, value;
|
||||||
|
sds newfield, newvalue;
|
||||||
|
/* Extract a field-value pair from an original hash object.*/
|
||||||
|
field = hashTypeCurrentFromHashTable(hi, OBJ_HASH_KEY);
|
||||||
|
value = hashTypeCurrentFromHashTable(hi, OBJ_HASH_VALUE);
|
||||||
|
newfield = sdsdup(field);
|
||||||
|
newvalue = sdsdup(value);
|
||||||
|
|
||||||
|
/* Add a field-value pair to a new hash object. */
|
||||||
|
dictAdd(hobj->ptr,newfield,newvalue);
|
||||||
|
}
|
||||||
|
hashTypeReleaseIterator(hi);
|
||||||
|
} else {
|
||||||
|
serverPanic("Unknown hash encoding");
|
||||||
|
}
|
||||||
|
return hobj;
|
||||||
|
}
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* Hash type commands
|
* Hash type commands
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
23
src/t_list.c
23
src/t_list.c
@ -190,6 +190,29 @@ void listTypeConvert(robj *subject, int enc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This is a helper function for the COPY command.
|
||||||
|
* Duplicate a list object, with the guarantee that the returned object
|
||||||
|
* has the same encoding as the original one.
|
||||||
|
*
|
||||||
|
* The resulting object always has refcount set to 1 */
|
||||||
|
robj *listTypeDup(robj *o) {
|
||||||
|
robj *lobj;
|
||||||
|
|
||||||
|
serverAssert(o->type == OBJ_LIST);
|
||||||
|
|
||||||
|
switch (o->encoding) {
|
||||||
|
case OBJ_ENCODING_QUICKLIST:
|
||||||
|
lobj = createQuicklistObject();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
serverPanic("Wrong encoding.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
zfree(lobj->ptr);
|
||||||
|
lobj->ptr = quicklistDup(o->ptr);
|
||||||
|
return lobj;
|
||||||
|
}
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* List Commands
|
* List Commands
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
46
src/t_set.c
46
src/t_set.c
@ -261,6 +261,52 @@ void setTypeConvert(robj *setobj, int enc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This is a helper function for the COPY command.
|
||||||
|
* Duplicate a set object, with the guarantee that the returned object
|
||||||
|
* has the same encoding as the original one.
|
||||||
|
*
|
||||||
|
* The resulting object always has refcount set to 1 */
|
||||||
|
robj *setTypeDup(robj *o) {
|
||||||
|
robj *set;
|
||||||
|
setTypeIterator *si;
|
||||||
|
sds elesds;
|
||||||
|
int64_t intobj;
|
||||||
|
|
||||||
|
serverAssert(o->type == OBJ_SET);
|
||||||
|
|
||||||
|
/* Create a new set object that have the same encoding as the original object's encoding */
|
||||||
|
switch (o->encoding) {
|
||||||
|
case OBJ_ENCODING_INTSET:
|
||||||
|
set = createIntsetObject();
|
||||||
|
break;
|
||||||
|
case OBJ_ENCODING_HT:
|
||||||
|
set = createSetObject();
|
||||||
|
dict *d = o->ptr;
|
||||||
|
dictExpand(set->ptr, dictSize(d));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
serverPanic("Wrong encoding.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (set->encoding == OBJ_ENCODING_INTSET) {
|
||||||
|
intset *is = o->ptr;
|
||||||
|
size_t size = intsetBlobLen(is);
|
||||||
|
intset *newis = zmalloc(size);
|
||||||
|
memcpy(newis,is,size);
|
||||||
|
zfree(set->ptr);
|
||||||
|
set->ptr = newis;
|
||||||
|
} else if (set->encoding == OBJ_ENCODING_HT) {
|
||||||
|
si = setTypeInitIterator(o);
|
||||||
|
while (setTypeNext(si, &elesds, &intobj) != -1) {
|
||||||
|
setTypeAdd(set, elesds);
|
||||||
|
}
|
||||||
|
setTypeReleaseIterator(si);
|
||||||
|
} else {
|
||||||
|
serverPanic("Unknown set encoding");
|
||||||
|
}
|
||||||
|
return set;
|
||||||
|
}
|
||||||
|
|
||||||
void saddCommand(client *c) {
|
void saddCommand(client *c) {
|
||||||
robj *set;
|
robj *set;
|
||||||
int j, added = 0;
|
int j, added = 0;
|
||||||
|
104
src/t_stream.c
104
src/t_stream.c
@ -106,6 +106,110 @@ void streamNextID(streamID *last_id, streamID *new_id) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This is a helper function for the COPY command.
|
||||||
|
* Duplicate a Stream object, with the guarantee that the returned object
|
||||||
|
* has the same encoding as the original one.
|
||||||
|
*
|
||||||
|
* The resulting object always has refcount set to 1 */
|
||||||
|
robj *streamDup(robj *o) {
|
||||||
|
robj *sobj;
|
||||||
|
|
||||||
|
serverAssert(o->type == OBJ_STREAM);
|
||||||
|
|
||||||
|
switch (o->encoding) {
|
||||||
|
case OBJ_ENCODING_STREAM:
|
||||||
|
sobj = createStreamObject();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
serverPanic("Wrong encoding.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream *s;
|
||||||
|
stream *new_s;
|
||||||
|
s = o->ptr;
|
||||||
|
new_s = sobj->ptr;
|
||||||
|
|
||||||
|
raxIterator ri;
|
||||||
|
uint64_t rax_key[2];
|
||||||
|
raxStart(&ri, s->rax);
|
||||||
|
raxSeek(&ri, "^", NULL, 0);
|
||||||
|
size_t lp_bytes = 0; /* Total bytes in the listpack. */
|
||||||
|
unsigned char *lp = NULL; /* listpack pointer. */
|
||||||
|
/* Get a reference to the listpack node. */
|
||||||
|
while (raxNext(&ri)) {
|
||||||
|
lp = ri.data;
|
||||||
|
lp_bytes = lpBytes(lp);
|
||||||
|
unsigned char *new_lp = zmalloc(lp_bytes);
|
||||||
|
memcpy(new_lp, lp, lp_bytes);
|
||||||
|
memcpy(rax_key, ri.key, sizeof(rax_key));
|
||||||
|
raxInsert(new_s->rax, (unsigned char *)&rax_key, sizeof(rax_key),
|
||||||
|
new_lp, NULL);
|
||||||
|
}
|
||||||
|
new_s->length = s->length;
|
||||||
|
new_s->last_id = s->last_id;
|
||||||
|
raxStop(&ri);
|
||||||
|
|
||||||
|
if (s->cgroups == NULL) return sobj;
|
||||||
|
|
||||||
|
/* Consumer Groups */
|
||||||
|
raxIterator ri_cgroups;
|
||||||
|
raxStart(&ri_cgroups, s->cgroups);
|
||||||
|
raxSeek(&ri_cgroups, "^", NULL, 0);
|
||||||
|
while (raxNext(&ri_cgroups)) {
|
||||||
|
streamCG *cg = ri_cgroups.data;
|
||||||
|
streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
|
||||||
|
ri_cgroups.key_len, &cg->last_id);
|
||||||
|
|
||||||
|
serverAssert(new_cg != NULL);
|
||||||
|
|
||||||
|
/* Consumer Group PEL */
|
||||||
|
raxIterator ri_cg_pel;
|
||||||
|
raxStart(&ri_cg_pel,cg->pel);
|
||||||
|
raxSeek(&ri_cg_pel,"^",NULL,0);
|
||||||
|
while(raxNext(&ri_cg_pel)){
|
||||||
|
streamNACK *nack = ri_cg_pel.data;
|
||||||
|
streamNACK *new_nack = streamCreateNACK(NULL);
|
||||||
|
new_nack->delivery_time = nack->delivery_time;
|
||||||
|
new_nack->delivery_count = nack->delivery_count;
|
||||||
|
raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL);
|
||||||
|
}
|
||||||
|
raxStop(&ri_cg_pel);
|
||||||
|
|
||||||
|
/* Consumers */
|
||||||
|
raxIterator ri_consumers;
|
||||||
|
raxStart(&ri_consumers, cg->consumers);
|
||||||
|
raxSeek(&ri_consumers, "^", NULL, 0);
|
||||||
|
while (raxNext(&ri_consumers)) {
|
||||||
|
streamConsumer *consumer = ri_consumers.data;
|
||||||
|
streamConsumer *new_consumer;
|
||||||
|
new_consumer = zmalloc(sizeof(*new_consumer));
|
||||||
|
new_consumer->name = sdsdup(consumer->name);
|
||||||
|
new_consumer->pel = raxNew();
|
||||||
|
raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name,
|
||||||
|
sdslen(new_consumer->name), new_consumer, NULL);
|
||||||
|
new_consumer->seen_time = consumer->seen_time;
|
||||||
|
|
||||||
|
/* Consumer PEL */
|
||||||
|
raxIterator ri_cpel;
|
||||||
|
raxStart(&ri_cpel, consumer->pel);
|
||||||
|
raxSeek(&ri_cpel, "^", NULL, 0);
|
||||||
|
while (raxNext(&ri_cpel)) {
|
||||||
|
streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID));
|
||||||
|
|
||||||
|
serverAssert(new_nack != raxNotFound);
|
||||||
|
|
||||||
|
new_nack->consumer = new_consumer;
|
||||||
|
raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL);
|
||||||
|
}
|
||||||
|
raxStop(&ri_cpel);
|
||||||
|
}
|
||||||
|
raxStop(&ri_consumers);
|
||||||
|
}
|
||||||
|
raxStop(&ri_cgroups);
|
||||||
|
return sobj;
|
||||||
|
}
|
||||||
|
|
||||||
/* This is just a wrapper for lpAppend() to directly use a 64 bit integer
|
/* This is just a wrapper for lpAppend() to directly use a 64 bit integer
|
||||||
* instead of a string. */
|
* instead of a string. */
|
||||||
unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
|
unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
|
||||||
|
62
src/t_zset.c
62
src/t_zset.c
@ -1553,6 +1553,68 @@ long zsetRank(robj *zobj, sds ele, int reverse) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This is a helper function for the COPY command.
|
||||||
|
* Duplicate a sorted set object, with the guarantee that the returned object
|
||||||
|
* has the same encoding as the original one.
|
||||||
|
*
|
||||||
|
* The resulting object always has refcount set to 1 */
|
||||||
|
robj *zsetDup(robj *o) {
|
||||||
|
robj *zobj;
|
||||||
|
zset *zs;
|
||||||
|
zset *new_zs;
|
||||||
|
|
||||||
|
serverAssert(o->type == OBJ_ZSET);
|
||||||
|
|
||||||
|
/* Create a new sorted set object that have the same encoding as the original object's encoding */
|
||||||
|
switch (o->encoding) {
|
||||||
|
case OBJ_ENCODING_ZIPLIST:
|
||||||
|
zobj = createZsetZiplistObject();
|
||||||
|
break;
|
||||||
|
case OBJ_ENCODING_SKIPLIST:
|
||||||
|
zobj = createZsetObject();
|
||||||
|
zs = o->ptr;
|
||||||
|
new_zs = zobj->ptr;
|
||||||
|
dictExpand(new_zs->dict,dictSize(zs->dict));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
serverPanic("Wrong encoding.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||||
|
unsigned char *zl = o->ptr;
|
||||||
|
size_t sz = ziplistBlobLen(zl);
|
||||||
|
unsigned char *new_zl = zmalloc(sz);
|
||||||
|
memcpy(new_zl, zl, sz);
|
||||||
|
zfree(zobj->ptr);
|
||||||
|
zobj->ptr = new_zl;
|
||||||
|
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||||
|
zs = o->ptr;
|
||||||
|
new_zs = zobj->ptr;
|
||||||
|
zskiplist *zsl = zs->zsl;
|
||||||
|
zskiplistNode *ln;
|
||||||
|
sds ele;
|
||||||
|
long llen = zsetLength(o);
|
||||||
|
|
||||||
|
/* We copy the skiplist elements from the greatest to the
|
||||||
|
* smallest (that's trivial since the elements are already ordered in
|
||||||
|
* the skiplist): this improves the load process, since the next loaded
|
||||||
|
* element will always be the smaller, so adding to the skiplist
|
||||||
|
* will always immediately stop at the head, making the insertion
|
||||||
|
* O(1) instead of O(log(N)). */
|
||||||
|
ln = zsl->tail;
|
||||||
|
while (llen--) {
|
||||||
|
ele = ln->ele;
|
||||||
|
sds new_ele = sdsdup(ele);
|
||||||
|
zskiplistNode *znode = zslInsert(new_zs->zsl,ln->score,new_ele);
|
||||||
|
dictAdd(new_zs->dict,new_ele,&znode->score);
|
||||||
|
ln = ln->backward;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
serverPanic("Unknown sorted set encoding");
|
||||||
|
}
|
||||||
|
return zobj;
|
||||||
|
}
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* Sorted set commands
|
* Sorted set commands
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
@ -169,6 +169,213 @@ start_server {tags {"keyspace"}} {
|
|||||||
format $res
|
format $res
|
||||||
} {0}
|
} {0}
|
||||||
|
|
||||||
|
test {COPY basic usage for string} {
|
||||||
|
r set mykey foobar
|
||||||
|
set res {}
|
||||||
|
r copy mykey mynewkey
|
||||||
|
lappend res [r get mynewkey]
|
||||||
|
lappend res [r dbsize]
|
||||||
|
r copy mykey mynewkey DB 10
|
||||||
|
r select 10
|
||||||
|
lappend res [r get mynewkey]
|
||||||
|
lappend res [r dbsize]
|
||||||
|
r select 9
|
||||||
|
format $res
|
||||||
|
} [list foobar 2 foobar 1]
|
||||||
|
|
||||||
|
test {COPY for string does not replace an existing key without REPLACE option} {
|
||||||
|
r set mykey2 hello
|
||||||
|
catch {r copy mykey2 mynewkey DB 10} e
|
||||||
|
set e
|
||||||
|
} {0}
|
||||||
|
|
||||||
|
test {COPY for string can replace an existing key with REPLACE option} {
|
||||||
|
r copy mykey2 mynewkey DB 10 REPLACE
|
||||||
|
r select 10
|
||||||
|
r get mynewkey
|
||||||
|
} {hello}
|
||||||
|
|
||||||
|
test {COPY for string ensures that copied data is independent of copying data} {
|
||||||
|
r flushdb
|
||||||
|
r select 9
|
||||||
|
r set mykey foobar
|
||||||
|
set res {}
|
||||||
|
r copy mykey mynewkey DB 10
|
||||||
|
r select 10
|
||||||
|
lappend res [r get mynewkey]
|
||||||
|
r set mynewkey hoge
|
||||||
|
lappend res [r get mynewkey]
|
||||||
|
r select 9
|
||||||
|
lappend res [r get mykey]
|
||||||
|
r select 10
|
||||||
|
r flushdb
|
||||||
|
r select 9
|
||||||
|
format $res
|
||||||
|
} [list foobar hoge foobar]
|
||||||
|
|
||||||
|
test {COPY for string does not copy data to no-integer DB} {
|
||||||
|
r set mykey foobar
|
||||||
|
catch {r copy mykey mynewkey DB notanumber} e
|
||||||
|
set e
|
||||||
|
} {*ERR*invalid DB index}
|
||||||
|
|
||||||
|
test {COPY can copy key expire metadata as well} {
|
||||||
|
r set mykey foobar ex 100
|
||||||
|
r copy mykey mynewkey REPLACE
|
||||||
|
assert {[r ttl mynewkey] > 0 && [r ttl mynewkey] <= 100}
|
||||||
|
assert {[r get mynewkey] eq "foobar"}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY does not create an expire if it does not exist} {
|
||||||
|
r set mykey foobar
|
||||||
|
assert {[r ttl mykey] == -1}
|
||||||
|
r copy mykey mynewkey REPLACE
|
||||||
|
assert {[r ttl mynewkey] == -1}
|
||||||
|
assert {[r get mynewkey] eq "foobar"}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for list} {
|
||||||
|
r del mylist mynewlist
|
||||||
|
r lpush mylist a b c d
|
||||||
|
r copy mylist mynewlist
|
||||||
|
set digest [r debug digest-value mylist]
|
||||||
|
assert_equal $digest [r debug digest-value mynewlist]
|
||||||
|
assert_equal 1 [r object refcount mylist]
|
||||||
|
assert_equal 1 [r object refcount mynewlist]
|
||||||
|
r del mylist
|
||||||
|
assert_equal $digest [r debug digest-value mynewlist]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for intset set} {
|
||||||
|
r del set1 newset1
|
||||||
|
r sadd set1 1 2 3
|
||||||
|
assert_encoding intset set1
|
||||||
|
r copy set1 newset1
|
||||||
|
set digest [r debug digest-value set1]
|
||||||
|
assert_equal $digest [r debug digest-value newset1]
|
||||||
|
assert_equal 1 [r object refcount set1]
|
||||||
|
assert_equal 1 [r object refcount newset1]
|
||||||
|
r del set1
|
||||||
|
assert_equal $digest [r debug digest-value newset1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for hashtable set} {
|
||||||
|
r del set2 newset2
|
||||||
|
r sadd set2 1 2 3 a
|
||||||
|
assert_encoding hashtable set2
|
||||||
|
r copy set2 newset2
|
||||||
|
set digest [r debug digest-value set2]
|
||||||
|
assert_equal $digest [r debug digest-value newset2]
|
||||||
|
assert_equal 1 [r object refcount set2]
|
||||||
|
assert_equal 1 [r object refcount newset2]
|
||||||
|
r del set2
|
||||||
|
assert_equal $digest [r debug digest-value newset2]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for ziplist sorted set} {
|
||||||
|
r del zset1 newzset1
|
||||||
|
r zadd zset1 123 foobar
|
||||||
|
assert_encoding ziplist zset1
|
||||||
|
r copy zset1 newzset1
|
||||||
|
set digest [r debug digest-value zset1]
|
||||||
|
assert_equal $digest [r debug digest-value newzset1]
|
||||||
|
assert_equal 1 [r object refcount zset1]
|
||||||
|
assert_equal 1 [r object refcount newzset1]
|
||||||
|
r del zset1
|
||||||
|
assert_equal $digest [r debug digest-value newzset1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for skiplist sorted set} {
|
||||||
|
r del zset2 newzset2
|
||||||
|
set original_max [lindex [r config get zset-max-ziplist-entries] 1]
|
||||||
|
r config set zset-max-ziplist-entries 0
|
||||||
|
for {set j 0} {$j < 130} {incr j} {
|
||||||
|
r zadd zset2 [randomInt 50] ele-[randomInt 10]
|
||||||
|
}
|
||||||
|
assert_encoding skiplist zset2
|
||||||
|
r copy zset2 newzset2
|
||||||
|
set digest [r debug digest-value zset2]
|
||||||
|
assert_equal $digest [r debug digest-value newzset2]
|
||||||
|
assert_equal 1 [r object refcount zset2]
|
||||||
|
assert_equal 1 [r object refcount newzset2]
|
||||||
|
r del zset2
|
||||||
|
assert_equal $digest [r debug digest-value newzset2]
|
||||||
|
r config set zset-max-ziplist-entries $original_max
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for ziplist hash} {
|
||||||
|
r del hash1 newhash1
|
||||||
|
r hset hash1 tmp 17179869184
|
||||||
|
assert_encoding ziplist hash1
|
||||||
|
r copy hash1 newhash1
|
||||||
|
set digest [r debug digest-value hash1]
|
||||||
|
assert_equal $digest [r debug digest-value newhash1]
|
||||||
|
assert_equal 1 [r object refcount hash1]
|
||||||
|
assert_equal 1 [r object refcount newhash1]
|
||||||
|
r del hash1
|
||||||
|
assert_equal $digest [r debug digest-value newhash1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for hashtable hash} {
|
||||||
|
r del hash2 newhash2
|
||||||
|
set original_max [lindex [r config get hash-max-ziplist-entries] 1]
|
||||||
|
r config set hash-max-ziplist-entries 0
|
||||||
|
for {set i 0} {$i < 64} {incr i} {
|
||||||
|
r hset hash2 [randomValue] [randomValue]
|
||||||
|
}
|
||||||
|
assert_encoding hashtable hash2
|
||||||
|
r copy hash2 newhash2
|
||||||
|
set digest [r debug digest-value hash2]
|
||||||
|
assert_equal $digest [r debug digest-value newhash2]
|
||||||
|
assert_equal 1 [r object refcount hash2]
|
||||||
|
assert_equal 1 [r object refcount newhash2]
|
||||||
|
r del hash2
|
||||||
|
assert_equal $digest [r debug digest-value newhash2]
|
||||||
|
r config set hash-max-ziplist-entries $original_max
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for stream} {
|
||||||
|
r del mystream mynewstream
|
||||||
|
for {set i 0} {$i < 1000} {incr i} {
|
||||||
|
r XADD mystream * item 2 value b
|
||||||
|
}
|
||||||
|
r copy mystream mynewstream
|
||||||
|
set digest [r debug digest-value mystream]
|
||||||
|
assert_equal $digest [r debug digest-value mynewstream]
|
||||||
|
assert_equal 1 [r object refcount mystream]
|
||||||
|
assert_equal 1 [r object refcount mynewstream]
|
||||||
|
r del mystream
|
||||||
|
assert_equal $digest [r debug digest-value mynewstream]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {COPY basic usage for stream-cgroups} {
|
||||||
|
r del x
|
||||||
|
r XADD x 100 a 1
|
||||||
|
set id [r XADD x 101 b 1]
|
||||||
|
r XADD x 102 c 1
|
||||||
|
r XADD x 103 e 1
|
||||||
|
r XADD x 104 f 1
|
||||||
|
r XADD x 105 g 1
|
||||||
|
r XGROUP CREATE x g1 0
|
||||||
|
r XGROUP CREATE x g2 0
|
||||||
|
r XREADGROUP GROUP g1 Alice COUNT 1 STREAMS x >
|
||||||
|
r XREADGROUP GROUP g1 Bob COUNT 1 STREAMS x >
|
||||||
|
r XREADGROUP GROUP g1 Bob NOACK COUNT 1 STREAMS x >
|
||||||
|
r XREADGROUP GROUP g2 Charlie COUNT 4 STREAMS x >
|
||||||
|
r XGROUP SETID x g1 $id
|
||||||
|
r XREADGROUP GROUP g1 Dave COUNT 3 STREAMS x >
|
||||||
|
r XDEL x 103
|
||||||
|
|
||||||
|
r copy x newx
|
||||||
|
set info [r xinfo stream x full]
|
||||||
|
assert_equal $info [r xinfo stream newx full]
|
||||||
|
assert_equal 1 [r object refcount x]
|
||||||
|
assert_equal 1 [r object refcount newx]
|
||||||
|
r del x
|
||||||
|
assert_equal $info [r xinfo stream newx full]
|
||||||
|
r flushdb
|
||||||
|
}
|
||||||
|
|
||||||
test {MOVE basic usage} {
|
test {MOVE basic usage} {
|
||||||
r set mykey foobar
|
r set mykey foobar
|
||||||
r move mykey 10
|
r move mykey 10
|
||||||
|
Loading…
x
Reference in New Issue
Block a user