Merge commit '97e58ee026675e70fbd8843f7a86e98f53a3c791' into unstable
Former-commit-id: 8ab77465dbb3c0b1f859f24cdbb461937516eb6a
This commit is contained in:
commit
e245fdbb89
@ -549,7 +549,10 @@ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
|
||||
dst = sdscatlen(dst,buf,len);
|
||||
|
||||
for (j = 0; j < argc; j++) {
|
||||
o = getDecodedObject(argv[j]);
|
||||
if (sdsEncodedObject(argv[j]))
|
||||
o = argv[j];
|
||||
else
|
||||
o = getDecodedObject(argv[j]);
|
||||
buf[0] = '$';
|
||||
len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(szFromObj(o)));
|
||||
buf[len++] = '\r';
|
||||
@ -557,7 +560,8 @@ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
|
||||
dst = sdscatlen(dst,buf,len);
|
||||
dst = sdscatlen(dst,ptrFromObj(o),sdslen(szFromObj(o)));
|
||||
dst = sdscatlen(dst,"\r\n",2);
|
||||
decrRefCount(o);
|
||||
if (o != argv[j])
|
||||
decrRefCount(o);
|
||||
}
|
||||
return dst;
|
||||
}
|
||||
|
@ -5033,7 +5033,7 @@ void restoreCommand(client *c) {
|
||||
|
||||
rioInitWithBuffer(&payload,szFromObj(c->argv[3]));
|
||||
if (((type = rdbLoadObjectType(&payload)) == -1) ||
|
||||
((obj = rdbLoadObject(type,&payload,c->argv[1], OBJ_MVCC_INVALID)) == NULL))
|
||||
((obj = rdbLoadObject(type,&payload,szFromObj(c->argv[1]), OBJ_MVCC_INVALID)) == NULL))
|
||||
{
|
||||
addReplyError(c,"Bad data format");
|
||||
return;
|
||||
|
16
src/db.cpp
16
src/db.cpp
@ -214,7 +214,7 @@ int dbAddCore(redisDb *db, robj *key, robj *val) {
|
||||
val->type == OBJ_ZSET ||
|
||||
val->type == OBJ_STREAM)
|
||||
signalKeyAsReady(db, key);
|
||||
if (g_pserver->cluster_enabled) slotToKeyAdd(key);
|
||||
if (g_pserver->cluster_enabled) slotToKeyAdd(szFromObj(key));
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -392,7 +392,7 @@ int dbSyncDelete(redisDb *db, robj *key) {
|
||||
if (de != nullptr && ((robj*)dictGetVal(de))->FExpires())
|
||||
removeExpireCore(db, key, de);
|
||||
if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) {
|
||||
if (g_pserver->cluster_enabled) slotToKeyDel(key);
|
||||
if (g_pserver->cluster_enabled) slotToKeyDel(szFromObj(key));
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
@ -1952,11 +1952,11 @@ int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys)
|
||||
* a fast way a key that belongs to a specified hash slot. This is useful
|
||||
* while rehashing the cluster and in other conditions when we need to
|
||||
* understand if we have keys for a given hash slot. */
|
||||
void slotToKeyUpdateKey(robj *key, int add) {
|
||||
void slotToKeyUpdateKey(sds key, int add) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
size_t keylen = sdslen(szFromObj(key));
|
||||
unsigned int hashslot = keyHashSlot(szFromObj(key),keylen);
|
||||
size_t keylen = sdslen(key);
|
||||
unsigned int hashslot = keyHashSlot(key,keylen);
|
||||
unsigned char buf[64];
|
||||
unsigned char *indexed = buf;
|
||||
|
||||
@ -1964,7 +1964,7 @@ void slotToKeyUpdateKey(robj *key, int add) {
|
||||
if (keylen+2 > 64) indexed = (unsigned char*)zmalloc(keylen+2, MALLOC_SHARED);
|
||||
indexed[0] = (hashslot >> 8) & 0xff;
|
||||
indexed[1] = hashslot & 0xff;
|
||||
memcpy(indexed+2,ptrFromObj(key),keylen);
|
||||
memcpy(indexed+2,key,keylen);
|
||||
int fModified = false;
|
||||
if (add) {
|
||||
fModified = raxInsert(g_pserver->cluster->slots_to_keys,indexed,keylen+2,NULL,NULL);
|
||||
@ -1975,11 +1975,11 @@ void slotToKeyUpdateKey(robj *key, int add) {
|
||||
if (indexed != buf) zfree(indexed);
|
||||
}
|
||||
|
||||
void slotToKeyAdd(robj *key) {
|
||||
void slotToKeyAdd(sds key) {
|
||||
slotToKeyUpdateKey(key,1);
|
||||
}
|
||||
|
||||
void slotToKeyDel(robj *key) {
|
||||
void slotToKeyDel(sds key) {
|
||||
slotToKeyUpdateKey(key,0);
|
||||
}
|
||||
|
||||
|
@ -380,7 +380,7 @@ void debugCommand(client *c) {
|
||||
"OOM -- Crash the server simulating an out-of-memory error.",
|
||||
"PANIC -- Crash the server simulating a panic.",
|
||||
"POPULATE <count> [prefix] [size] -- Create <count> string keys named key:<num>. If a prefix is specified is used instead of the 'key' prefix.",
|
||||
"RELOAD -- Save the RDB on disk and reload it back in memory.",
|
||||
"RELOAD [MERGE] [NOFLUSH] [NOSAVE] -- Save the RDB on disk and reload it back in memory. By default it will save the RDB file and load it back. With the NOFLUSH option the current database is not removed before loading the new one, but conficts in keys will kill the server with an exception. When MERGE is used, conflicting keys will be loaded (the key in the loaded RDB file will win). When NOSAVE is used, the server will not save the current dataset in the RDB file before loading. Use DEBUG RELOAD NOSAVE when you want just to load the RDB file you placed in the Redis working directory in order to replace the current dataset in memory. Use DEBUG RELOAD NOSAVE NOFLUSH MERGE when you want to add what is in the current RDB file placed in the Redis current directory, with the current memory content. Use DEBUG RELOAD when you want to verify Redis is able to persist the current dataset in the RDB file, flush the memory content, and load it back.",
|
||||
"RESTART -- Graceful restart: save config, db, restart.",
|
||||
"SDSLEN <key> -- Show low level SDS string info representing key and value.",
|
||||
"SEGFAULT -- Crash the server with sigsegv.",
|
||||
@ -425,15 +425,44 @@ NULL
|
||||
serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)ptrFromObj(c->argv[2]));
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(szFromObj(c->argv[1]),"reload")) {
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (rdbSave(rsiptr) != C_OK) {
|
||||
addReply(c,shared.err);
|
||||
return;
|
||||
int flush = 1, save = 1;
|
||||
int flags = RDBFLAGS_NONE;
|
||||
|
||||
/* Parse the additional options that modify the RELOAD
|
||||
* behavior. */
|
||||
for (int j = 2; j < c->argc; j++) {
|
||||
char *opt = szFromObj(c->argv[j]);
|
||||
if (!strcasecmp(opt,"MERGE")) {
|
||||
flags |= RDBFLAGS_ALLOW_DUP;
|
||||
} else if (!strcasecmp(opt,"NOFLUSH")) {
|
||||
flush = 0;
|
||||
} else if (!strcasecmp(opt,"NOSAVE")) {
|
||||
save = 0;
|
||||
} else {
|
||||
addReplyError(c,"DEBUG RELOAD only supports the "
|
||||
"MERGE, NOFLUSH and NOSAVE options.");
|
||||
return;
|
||||
}
|
||||
}
|
||||
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
|
||||
/* The default beahvior is to save the RDB file before loading
|
||||
* it back. */
|
||||
if (save) {
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
if (rdbSaveFile(g_pserver->rdb_filename,rsiptr) != C_OK) {
|
||||
addReply(c,shared.err);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* The default behavior is to remove the current dataset from
|
||||
* memory before loading the RDB file, however when MERGE is
|
||||
* used together with NOFLUSH, we are able to merge two datasets. */
|
||||
if (flush) emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
|
||||
|
||||
protectClient(c);
|
||||
int ret = rdbLoadFile(g_pserver->rdb_filename,NULL,RDBFLAGS_NONE);
|
||||
int ret = rdbLoadFile(g_pserver->rdb_filename,NULL,flags);
|
||||
unprotectClient(c);
|
||||
if (ret != C_OK) {
|
||||
addReplyError(c,"Error trying to load the RDB dump");
|
||||
|
@ -86,7 +86,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
|
||||
* field to NULL in order to lazy free it later. */
|
||||
if (de) {
|
||||
dictFreeUnlinkedEntry(db->pdict,de);
|
||||
if (g_pserver->cluster_enabled) slotToKeyDel(key);
|
||||
if (g_pserver->cluster_enabled) slotToKeyDel(szFromObj(key));
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -362,7 +362,16 @@ void freeStreamObject(robj_roptr o) {
|
||||
}
|
||||
|
||||
void incrRefCount(robj_roptr o) {
|
||||
if (o->getrefcount(std::memory_order_relaxed) != OBJ_SHARED_REFCOUNT) o->addref();
|
||||
auto refcount = o->getrefcount(std::memory_order_relaxed);
|
||||
if (refcount < OBJ_FIRST_SPECIAL_REFCOUNT) {
|
||||
o->addref();
|
||||
} else {
|
||||
if (refcount == OBJ_SHARED_REFCOUNT) {
|
||||
/* Nothing to do: this refcount is immutable. */
|
||||
} else if (refcount == OBJ_STATIC_REFCOUNT) {
|
||||
serverPanic("You tried to retain an object allocated in the stack");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void decrRefCount(robj_roptr o) {
|
||||
|
45
src/rdb.cpp
45
src/rdb.cpp
@ -1504,7 +1504,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
|
||||
|
||||
/* Load a Redis object of the specified type from the specified file.
|
||||
* On success a newly allocated object is returned, otherwise NULL. */
|
||||
robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
|
||||
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
|
||||
robj *o = NULL, *ele, *dec;
|
||||
uint64_t len;
|
||||
unsigned int i;
|
||||
@ -1990,7 +1990,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
|
||||
exit(1);
|
||||
}
|
||||
RedisModuleIO io;
|
||||
moduleInitIOContext(io,mt,rdb,key);
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj,key);
|
||||
moduleInitIOContext(io,mt,rdb,&keyobj);
|
||||
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
|
||||
/* Call the rdb_load method of the module providing the 10 bit
|
||||
* encoding version in the lower 10 bits of the module ID. */
|
||||
@ -2153,7 +2155,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
long long lru_clock = 0;
|
||||
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
|
||||
robj *subexpireKey = nullptr;
|
||||
robj *key = nullptr;
|
||||
sds key = nullptr;
|
||||
|
||||
rdb->update_cksum = rdbLoadProgressCallback;
|
||||
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
|
||||
@ -2290,10 +2292,12 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
incrRefCount(subexpireKey);
|
||||
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) {
|
||||
if (key == nullptr || subexpireKey == nullptr) {
|
||||
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? szFromObj(key) : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)");
|
||||
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)");
|
||||
}
|
||||
else {
|
||||
setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10));
|
||||
redisObject keyobj;
|
||||
initStaticStringObject(keyobj,key);
|
||||
setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10));
|
||||
decrRefCount(subexpireKey);
|
||||
subexpireKey = nullptr;
|
||||
}
|
||||
@ -2363,14 +2367,15 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
/* Read key */
|
||||
if (key != nullptr)
|
||||
{
|
||||
decrRefCount(key);
|
||||
sdsfree(key);
|
||||
key = nullptr;
|
||||
}
|
||||
|
||||
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
|
||||
if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
|
||||
goto eoferr;
|
||||
/* Read value */
|
||||
if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) {
|
||||
decrRefCount(key);
|
||||
if ((val = rdbLoadObject(type,rdb,key,mvcc_tstamp)) == NULL) {
|
||||
sdsfree(key);
|
||||
key = nullptr;
|
||||
goto eoferr;
|
||||
}
|
||||
@ -2381,26 +2386,34 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
* received from the master. In the latter case, the master is
|
||||
* responsible for key expiry. If we would expire keys here, the
|
||||
* snapshot taken by the master may not be reflected on the replica. */
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj,key);
|
||||
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
|
||||
if (fStaleMvccKey || fExpiredKey) {
|
||||
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) {
|
||||
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) {
|
||||
// We have a key that we've already deleted and is not back in our database.
|
||||
// We'll need to inform the sending master of the delete if it is also a replica of us
|
||||
rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(key);
|
||||
robj *objKeyDup = createStringObject(key, sdslen(key));
|
||||
rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(objKeyDup);
|
||||
decrRefCount(objKeyDup);
|
||||
}
|
||||
decrRefCount(key);
|
||||
sdsfree(key);
|
||||
key = nullptr;
|
||||
decrRefCount(val);
|
||||
val = nullptr;
|
||||
} else {
|
||||
|
||||
|
||||
/* Add the new object in the hash table */
|
||||
int fInserted = dbMerge(db, key, val, rsi && rsi->fForceSetKey); // Note: dbMerge will incrRef
|
||||
int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
|
||||
|
||||
if (fInserted)
|
||||
{
|
||||
/* Set the expire time if needed */
|
||||
if (expiretime != -1)
|
||||
setExpire(NULL,db,key,nullptr,expiretime);
|
||||
{
|
||||
setExpire(NULL,db,&keyobj,nullptr,expiretime);
|
||||
}
|
||||
|
||||
/* Set usage information (for eviction). */
|
||||
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
|
||||
@ -2423,7 +2436,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
|
||||
if (key != nullptr)
|
||||
{
|
||||
decrRefCount(key);
|
||||
sdsfree(key);
|
||||
key = nullptr;
|
||||
}
|
||||
|
||||
@ -2458,7 +2471,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
eoferr:
|
||||
if (key != nullptr)
|
||||
{
|
||||
decrRefCount(key);
|
||||
sdsfree(key);
|
||||
key = nullptr;
|
||||
}
|
||||
if (subexpireKey != nullptr)
|
||||
|
@ -125,9 +125,10 @@
|
||||
#define RDB_LOAD_SDS (1<<2)
|
||||
|
||||
/* flags on the purpose of rdb save or load */
|
||||
#define RDBFLAGS_NONE 0
|
||||
#define RDBFLAGS_AOF_PREAMBLE (1<<0)
|
||||
#define RDBFLAGS_REPLICATION (1<<1)
|
||||
#define RDBFLAGS_NONE 0 /* No special RDB loading. */
|
||||
#define RDBFLAGS_AOF_PREAMBLE (1<<0) /* Load/save the RDB as AOF preamble. */
|
||||
#define RDBFLAGS_REPLICATION (1<<1) /* Load/save for SYNC. */
|
||||
#define RDBFLAGS_ALLOW_DUP (1<<2) /* Allow duplicated keys when loading.*/
|
||||
|
||||
int rdbSaveType(rio *rdb, unsigned char type);
|
||||
int rdbLoadType(rio *rdb);
|
||||
@ -152,7 +153,7 @@ int rdbSaveS3(char *path, rdbSaveInfo *rsi);
|
||||
int rdbLoadS3(char *path, rdbSaveInfo *rsi, int rdbflags);
|
||||
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key);
|
||||
size_t rdbSavedObjectLen(robj *o, robj *key);
|
||||
robj *rdbLoadObject(int type, rio *rdb, robj *key, uint64_t mvcc_tstamp);
|
||||
robj *rdbLoadObject(int type, rio *rdb, sds key, uint64_t mvcc_tstamp);
|
||||
void backgroundSaveDoneHandler(int exitcode, int bysignal);
|
||||
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
|
||||
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
|
||||
|
@ -287,7 +287,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) {
|
||||
rdbstate.keys++;
|
||||
/* Read value */
|
||||
rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE;
|
||||
if ((val = rdbLoadObject(type,&rdb,key,OBJ_MVCC_INVALID)) == NULL) goto eoferr;
|
||||
if ((val = rdbLoadObject(type,&rdb,szFromObj(key),OBJ_MVCC_INVALID)) == NULL) goto eoferr;
|
||||
/* Check if the key already expired. */
|
||||
if (expiretime != -1 && expiretime < now)
|
||||
rdbstate.already_expired++;
|
||||
|
@ -774,6 +774,8 @@ typedef struct RedisModuleDigest {
|
||||
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
|
||||
|
||||
#define OBJ_SHARED_REFCOUNT (0x7FFFFFFF)
|
||||
#define OBJ_STATIC_REFCOUNT (OBJ_SHARED_REFCOUNT-1)
|
||||
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
|
||||
#define OBJ_MVCC_INVALID (0xFFFFFFFFFFFFFFFFULL)
|
||||
|
||||
#define MVCC_MS_SHIFT 20
|
||||
@ -1071,7 +1073,7 @@ const char *getObjectTypeName(robj_roptr o);
|
||||
* we'll update it when the structure is changed, to avoid bugs like
|
||||
* bug #85 introduced exactly in this way. */
|
||||
#define initStaticStringObject(_var,_ptr) do { \
|
||||
_var.setrefcount(1); \
|
||||
_var.setrefcount(OBJ_STATIC_REFCOUNT); \
|
||||
_var.type = OBJ_STRING; \
|
||||
_var.encoding = OBJ_ENCODING_RAW; \
|
||||
_var.m_ptr = _ptr; \
|
||||
@ -2655,6 +2657,7 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
|
||||
#define LOOKUP_NOTOUCH (1<<0)
|
||||
#define LOOKUP_UPDATEMVCC (1<<1)
|
||||
void dbAdd(redisDb *db, robj *key, robj *val);
|
||||
int dbAddRDBLoad(redisDb *db, sds key, robj *val);
|
||||
void dbOverwrite(redisDb *db, robj *key, robj *val);
|
||||
int dbMerge(redisDb *db, robj *key, robj *val, int fReplace);
|
||||
void genericSetKey(redisDb *db, robj *key, robj *val, int keepttl, int signal);
|
||||
@ -2682,8 +2685,8 @@ unsigned int delKeysInSlot(unsigned int hashslot);
|
||||
int verifyClusterConfigWithData(void);
|
||||
void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor);
|
||||
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
|
||||
void slotToKeyAdd(robj *key);
|
||||
void slotToKeyDel(robj *key);
|
||||
void slotToKeyAdd(sds key);
|
||||
void slotToKeyDel(sds key);
|
||||
void slotToKeyFlush(void);
|
||||
int dbAsyncDelete(redisDb *db, robj *key);
|
||||
void emptyDbAsync(redisDb *db);
|
||||
|
Loading…
x
Reference in New Issue
Block a user