Replace cluster metadata with slot specific dictionaries (#11695)

This is an implementation of https://github.com/redis/redis/issues/10589 that eliminates 16 bytes per entry in cluster mode, that are currently used to create a linked list between entries in the same slot.  Main idea is splitting main dictionary into 16k smaller dictionaries (one per slot), so we can perform all slot specific operations, such as iteration, without any additional info in the `dictEntry`. For Redis cluster, the expectation is that there will be a larger number of keys, so the fixed overhead of 16k dictionaries will be The expire dictionary is also split up so that each slot is logically decoupled, so that in subsequent revisions we will be able to atomically flush a slot of data.

## Important changes
* Incremental rehashing - one big change here is that it's not one, but rather up to 16k dictionaries that can be rehashing at the same time, in order to keep track of them, we introduce a separate queue for dictionaries that are rehashing. Also instead of rehashing a single dictionary, cron job will now try to rehash as many as it can in 1ms.
* getRandomKey - now needs to not only select a random key, from the random bucket, but also needs to select a random dictionary. Fairness is a major concern here, as it's possible that keys can be unevenly distributed across the slots. In order to address this search we introduced binary index tree). With that data structure we are able to efficiently find a random slot using binary search in O(log^2(slot count)) time.
* Iteration efficiency - when iterating dictionary with a lot of empty slots, we want to skip them efficiently. We can do this using same binary index that is used for random key selection, this index allows us to find a slot for a specific key index. For example if there are 10 keys in the slot 0, then we can quickly find a slot that contains 11th key using binary search on top of the binary index tree.
* scan API - in order to perform a scan across the entire DB, the cursor now needs to not only save position within the dictionary but also the slot id. In this change we append slot id into LSB of the cursor so it can be passed around between client and the server. This has interesting side effect, now you'll be able to start scanning specific slot by simply providing slot id as a cursor value. The plan is to not document this as defined behavior, however. It's also worth nothing the SCAN API is now technically incompatible with previous versions, although practically we don't believe it's an issue.
* Checksum calculation optimizations - During command execution, we know that all of the keys are from the same slot (outside of a few notable exceptions such as cross slot scripts and modules). We don't want to compute the checksum multiple multiple times, hence we are relying on cached slot id in the client during the command executions. All operations that access random keys, either should pass in the known slot or recompute the slot. 
* Slot info in RDB - in order to resize individual dictionaries correctly, while loading RDB, it's not enough to know total number of keys (of course we could approximate number of keys per slot, but it won't be precise). To address this issue, we've added additional metadata into RDB that contains number of keys in each slot, which can be used as a hint during loading.
* DB size - besides `DBSIZE` API, we need to know size of the DB in many places want, in order to avoid scanning all dictionaries and summing up their sizes in a loop, we've introduced a new field into `redisDb` that keeps track of `key_count`. This way we can keep DBSIZE operation O(1). This is also kept for O(1) expires computation as well.

## Performance
This change improves SET performance in cluster mode by ~5%, most of the gains come from us not having to maintain linked lists for keys in slot, non-cluster mode has same performance. For workloads that rely on evictions, the performance is similar because of the extra overhead for finding keys to evict. 

RDB loading performance is slightly reduced, as the slot of each key needs to be computed during the load.

## Interface changes
* Removed `overhead.hashtable.slot-to-keys` to `MEMORY STATS`
* Scan API will now require 64 bits to store the cursor, even on 32 bit systems, as the slot information will be stored.
* New RDB version to support the new op code for SLOT information. 

---------

Co-authored-by: Vitaly Arbuzov <arvit@amazon.com>
Co-authored-by: Harkrishn Patro <harkrisp@amazon.com>
Co-authored-by: Roshan Khatri <rvkhatri@amazon.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Vitaly 2023-10-14 23:58:26 -07:00 committed by GitHub
parent f0c1c730d4
commit 0270abda82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1154 additions and 562 deletions

View File

@ -2240,11 +2240,11 @@ werr:
}
int rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
int j;
long key_count = 0;
long long updated_time = 0;
dbIterator *dbit = NULL;
/* Record timestamp at the beginning of rewriting AOF. */
if (server.aof_timestamp_enabled) {
@ -2257,17 +2257,13 @@ int rewriteAppendOnlyFileRio(rio *aof) {
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
/* SELECT the new DB */
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
redisDb *db = server.db + j;
dbit = dbIteratorInit(db, DB_MAIN);
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
while((de = dbIteratorNext(dbit)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
@ -2332,13 +2328,12 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (server.rdb_key_save_delay)
debugDelay(server.rdb_key_save_delay);
}
dictReleaseIterator(di);
di = NULL;
zfree(dbit);
}
return C_OK;
werr:
if (di) dictReleaseIterator(di);
if (dbit) zfree(dbit);
return C_ERR;
}

View File

@ -1050,9 +1050,6 @@ void clusterInit(void) {
exit(1);
}
/* Initialize data for the Slot to key API. */
slotToKeyInit(server.db);
/* The slots -> channels map is a radix tree. Initialize it here. */
server.cluster->slots_to_channels = raxNew();
@ -5113,7 +5110,7 @@ int verifyClusterConfigWithData(void) {
/* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) {
if (dictSize(server.db[j].dict)) return C_ERR;
if (dbSize(&server.db[j], DB_MAIN)) return C_ERR;
}
/* Check that all the slots we see populated memory have a corresponding
@ -5986,7 +5983,7 @@ NULL
clusterReplyShards(c);
} else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (dictSize(server.db[0].dict) != 0) {
if (dbSize(&server.db[0], DB_MAIN) != 0) {
addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
return;
}
@ -6248,13 +6245,16 @@ NULL
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c,numkeys);
dictEntry *de = (*server.db->slots_to_keys).by_slot[slot].head;
for (unsigned int j = 0; j < numkeys; j++) {
dictIterator *iter = NULL;
dictEntry *de = NULL;
iter = dictGetIterator(server.db->dict[slot]);
for (unsigned int i = 0; i < numkeys; i++) {
de = dictNext(iter);
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
de = dictEntryNextInSlot(de);
}
dictReleaseIterator(iter);
} else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
/* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
@ -6303,7 +6303,7 @@ NULL
* slots nor keys to accept to replicate some other node.
* Slaves can switch to another master without issues. */
if (nodeIsMaster(myself) &&
(myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
(myself->numslots != 0 || dbSize(&server.db[0], DB_MAIN) != 0)) {
addReplyError(c,
"To set a master the node must be empty and "
"without assigned slots.");
@ -6462,7 +6462,7 @@ NULL
/* Slaves can be reset while containing data, but not master nodes
* that must be empty. */
if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
if (nodeIsMaster(myself) && dbSize(c->db, DB_MAIN) != 0) {
addReplyError(c,"CLUSTER RESET can't be called with "
"master nodes containing keys");
return;
@ -7544,98 +7544,16 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
return 0;
}
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
* a fast way a key that belongs to a specified hash slot. This is useful
* while rehashing the cluster and in other conditions when we need to
* understand if we have keys for a given hash slot. */
void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
sds key = dictGetKey(entry);
unsigned int hashslot = keyHashSlot(key, sdslen(key));
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
slot_to_keys->count++;
/* Insert entry before the first element in the list. */
dictEntry *first = slot_to_keys->head;
dictEntryNextInSlot(entry) = first;
if (first != NULL) {
serverAssert(dictEntryPrevInSlot(first) == NULL);
dictEntryPrevInSlot(first) = entry;
}
serverAssert(dictEntryPrevInSlot(entry) == NULL);
slot_to_keys->head = entry;
}
void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
sds key = dictGetKey(entry);
unsigned int hashslot = keyHashSlot(key, sdslen(key));
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
slot_to_keys->count--;
/* Connect previous and next entries to each other. */
dictEntry *next = dictEntryNextInSlot(entry);
dictEntry *prev = dictEntryPrevInSlot(entry);
if (next != NULL) {
dictEntryPrevInSlot(next) = prev;
}
if (prev != NULL) {
dictEntryNextInSlot(prev) = next;
} else {
/* The removed entry was the first in the list. */
serverAssert(slot_to_keys->head == entry);
slot_to_keys->head = next;
}
}
/* Updates neighbour entries when an entry has been replaced (e.g. reallocated
* during active defrag). */
void slotToKeyReplaceEntry(dict *d, dictEntry *entry) {
dictEntry *next = dictEntryNextInSlot(entry);
dictEntry *prev = dictEntryPrevInSlot(entry);
if (next != NULL) {
dictEntryPrevInSlot(next) = entry;
}
if (prev != NULL) {
dictEntryNextInSlot(prev) = entry;
} else {
/* The replaced entry was the first in the list. */
sds key = dictGetKey(entry);
unsigned int hashslot = keyHashSlot(key, sdslen(key));
clusterDictMetadata *dictmeta = dictMetadata(d);
redisDb *db = dictmeta->db;
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
slot_to_keys->head = entry;
}
}
/* Initialize slots-keys map of given db. */
void slotToKeyInit(redisDb *db) {
db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping));
clusterDictMetadata *dictmeta = dictMetadata(db->dict);
dictmeta->db = db;
}
/* Empty slots-keys map of given db. */
void slotToKeyFlush(redisDb *db) {
memset(db->slots_to_keys, 0,
sizeof(clusterSlotToKeyMapping));
}
/* Free slots-keys map of given db. */
void slotToKeyDestroy(redisDb *db) {
zfree(db->slots_to_keys);
db->slots_to_keys = NULL;
}
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int j = 0;
dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head;
while (de != NULL) {
dictIterator *iter = NULL;
dictEntry *de = NULL;
iter = dictGetSafeIterator(server.db->dict[hashslot]);
while((de = dictNext(iter)) != NULL) {
sds sdskey = dictGetKey(de);
de = dictEntryNextInSlot(de);
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
@ -7646,12 +7564,13 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
j++;
server.dirty++;
}
dictReleaseIterator(iter);
return j;
}
unsigned int countKeysInSlot(unsigned int hashslot) {
return (*server.db->slots_to_keys).by_slot[hashslot].count;
unsigned int countKeysInSlot(unsigned int slot) {
return dictSize(server.db->dict[slot]);
}
/* -----------------------------------------------------------------------------

View File

@ -5,7 +5,9 @@
* Redis cluster data structures, defines, exported API.
*----------------------------------------------------------------------------*/
#define CLUSTER_SLOTS 16384
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */
@ -150,29 +152,6 @@ typedef struct clusterNode {
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
/* Slot to keys for a single slot. The keys in the same slot are linked together
* using dictEntry metadata. */
typedef struct slotToKeys {
uint64_t count; /* Number of keys in the slot. */
dictEntry *head; /* The first key-value entry in the slot. */
} slotToKeys;
/* Slot to keys mapping for all slots, opaque outside this file. */
struct clusterSlotToKeyMapping {
slotToKeys by_slot[CLUSTER_SLOTS];
};
/* Dict entry metadata for cluster mode, used for the Slot to Key API to form a
* linked list of the entries belonging to the same slot. */
typedef struct clusterDictEntryMetadata {
dictEntry *prev; /* Prev entry with key in the same slot */
dictEntry *next; /* Next entry with key in the same slot */
} clusterDictEntryMetadata;
typedef struct {
redisDb *db; /* A link back to the db this dict belongs to */
} clusterDictMetadata;
typedef struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
@ -300,7 +279,7 @@ typedef struct {
uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
union {
clusterMsgPingExtHostname hostname;
clusterMsgPingExtHumanNodename human_nodename;
clusterMsgPingExtHumanNodename human_nodename;
clusterMsgPingExtForgottenNode forgotten_node;
clusterMsgPingExtShardId shard_id;
} ext[]; /* Actual extension information, formatted so that the data is 8
@ -424,12 +403,6 @@ unsigned long getClusterConnectionsCount(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
void clusterPropagatePublish(robj *channel, robj *message, int sharded);
unsigned int keyHashSlot(char *key, int keylen);
void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
void slotToKeyReplaceEntry(dict *d, dictEntry *entry);
void slotToKeyInit(redisDb *db);
void slotToKeyFlush(redisDb *db);
void slotToKeyDestroy(redisDb *db);
void clusterUpdateMyselfFlags(void);
void clusterUpdateMyselfIp(void);
void slotToChannelAdd(sds channel);
@ -439,6 +412,7 @@ void clusterUpdateMyselfAnnouncedPorts(void);
sds clusterGenNodesDescription(client *c, int filter, int tls_primary);
sds genClusterInfoString(void);
void freeClusterLink(clusterLink *link);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
void clusterUpdateMyselfHumanNodename(void);
int isValidAuxString(char *s, unsigned int length);
int getNodeDefaultClientPort(clusterNode *n);

578
src/db.c
View File

@ -37,6 +37,15 @@
#include <signal.h>
#include <ctype.h>
/* Structure for DB iterator that allows iterating across multiple slot specific dictionaries in cluster mode. */
struct dbIterator {
redisDb *db;
int slot;
int next_slot;
dictIterator di;
dbKeyType keyType;
};
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
@ -47,8 +56,73 @@
int expireIfNeeded(redisDb *db, robj *key, int flags);
int keyIsExpired(redisDb *db, robj *key);
void cumulativeKeyCountAdd(redisDb *db, int idx, long delta, dbKeyType keyType);
static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
dict *dbGetDictFromIterator(dbIterator *dbit) {
if (dbit->keyType == DB_MAIN)
return dbit->db->dict[dbit->slot];
else if (dbit->keyType == DB_EXPIRES)
return dbit->db->expires[dbit->slot];
else
serverAssert(0);
}
/* Returns next dictionary from the iterator, or NULL if iteration is complete. */
dict *dbIteratorNextDict(dbIterator *dbit) {
if (dbit->next_slot == -1) return NULL;
dbit->slot = dbit->next_slot;
dbit->next_slot = dbGetNextNonEmptySlot(dbit->db, dbit->slot, dbit->keyType);
return dbGetDictFromIterator(dbit);
}
int dbIteratorGetCurrentSlot(dbIterator *dbit) {
serverAssert(dbit->slot >= 0 && dbit->slot < CLUSTER_SLOTS);
return dbit->slot;
}
/* Returns next entry from the multi slot db. */
dictEntry *dbIteratorNext(dbIterator *dbit) {
dictEntry *de = dbit->di.d ? dictNext(&dbit->di) : NULL;
if (!de) { /* No current dict or reached the end of the dictionary. */
dict *d = dbIteratorNextDict(dbit);
if (!d) return NULL;
dictInitSafeIterator(&dbit->di, d);
de = dictNext(&dbit->di);
}
return de;
}
/* Returns DB iterator that can be used to iterate through sub-dictionaries.
* Primary database contains only one dictionary when node runs without cluster mode,
* or 16k dictionaries (one per slot) when node runs with cluster mode enabled. */
dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) {
dbIterator *dbit = zmalloc(sizeof(*dbit));
dbit->db = db;
dbit->slot = -1;
dbit->keyType = keyType;
dbit->next_slot = findSlotByKeyIndex(dbit->db, 1, dbit->keyType); /* Finds first non-empty slot. */
dictInitSafeIterator(&dbit->di, NULL);
return dbit;
}
dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot) {
dbIterator *dbit = zmalloc(sizeof(*dbit));
dbit->db = db;
dbit->slot = slot;
dbit->keyType = keyType;
dbit->next_slot = dbGetNextNonEmptySlot(dbit->db, dbit->slot, dbit->keyType);
dictInitSafeIterator(&dbit->di, NULL);
return dbit;
}
/* Returns next non-empty slot strictly after given one, or -1 if provided slot is the last one. */
int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType) {
unsigned long long next_key = cumulativeKeyCountRead(db, slot, keyType) + 1;
return next_key <= dbSize(db, keyType) ? findSlotByKeyIndex(db, next_key, keyType) : -1;
}
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
@ -86,7 +160,7 @@ void updateLFU(robj *val) {
* expired on replicas even if the master is lagging expiring our key via DELs
* in the replication link. */
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
dictEntry *de = dbFind(db, key->ptr, DB_MAIN);
robj *val = NULL;
if (de) {
val = dictGetVal(de);
@ -192,17 +266,20 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
* if the key already exists, otherwise, it can fall back to dbOverwrite. */
static void dbAddInternal(redisDb *db, robj *key, robj *val, int update_if_existing) {
dictEntry *existing;
dictEntry *de = dictAddRaw(db->dict, key->ptr, &existing);
int slot = getKeySlot(key->ptr);
dict *d = db->dict[slot];
dictEntry *de = dictAddRaw(d, key->ptr, &existing);
if (update_if_existing && existing) {
dbSetValue(db, key, val, 1, existing);
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
dictSetKey(db->dict, de, sdsdup(key->ptr));
dictSetKey(d, de, sdsdup(key->ptr));
initObjectLRUOrLFU(val);
dictSetVal(db->dict, de, val);
dictSetVal(d, de, val);
db->sub_dict[DB_MAIN].key_count++;
cumulativeKeyCountAdd(db, slot, 1, DB_MAIN);
signalKeyAsReady(db, key, val->type);
if (server.cluster_enabled) slotToKeyAddEntry(de, db);
notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
}
@ -210,6 +287,28 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
dbAddInternal(db, key, val, 0);
}
/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled.
* The only difference between this function and getKeySlot, is that it's not using cached key slot from the current_client
* and always calculates CRC hash.
* This is useful when slot needs to be calculated for a key that user didn't request for, such as in case of eviction. */
int calculateKeySlot(sds key) {
return server.cluster_enabled ? keyHashSlot(key, (int) sdslen(key)) : 0;
}
/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/
int getKeySlot(sds key) {
/* This is performance optimization that uses pre-set slot id from the current command,
* in order to avoid calculation of the key hash.
* This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND) {
return server.current_client->slot;
}
return calculateKeySlot(key);
}
/* This is a special version of dbAdd() that is used only when loading
* keys from the RDB file: the key is passed as an SDS string that is
* retained by the function (and not freed by the caller).
@ -222,11 +321,14 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
* ownership of the SDS string, otherwise 0 is returned, and is up to the
* caller to free the SDS string. */
int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
dictEntry *de = dictAddRaw(db->dict, key, NULL);
int slot = getKeySlot(key);
dict *d = db->dict[slot];
dictEntry *de = dictAddRaw(d, key, NULL);
if (de == NULL) return 0;
initObjectLRUOrLFU(val);
dictSetVal(db->dict, de, val);
if (server.cluster_enabled) slotToKeyAddEntry(de, db);
dictSetVal(d, de, val);
db->sub_dict[DB_MAIN].key_count++;
cumulativeKeyCountAdd(db, slot, 1, DB_MAIN);
return 1;
}
@ -243,7 +345,7 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
*
* The program is aborted if the key was not already present. */
static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de) {
if (!de) de = dictFind(db->dict,key->ptr);
if (!de) de = dbFind(db, key->ptr, DB_MAIN);
serverAssertWithInfo(NULL,key,de != NULL);
robj *old = dictGetVal(de);
@ -263,13 +365,14 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
}
dictSetVal(db->dict, de, val);
dict *d = db->dict[getKeySlot(key->ptr)];
dictSetVal(d, de, val);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(key,old,db->id);
} else {
/* This is just decrRefCount(old); */
db->dict->type->valDestructor(db->dict, old);
d->type->valDestructor(d, old);
}
}
@ -321,18 +424,18 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) {
robj *dbRandomKey(redisDb *db) {
dictEntry *de;
int maxtries = 100;
int allvolatile = dictSize(db->dict) == dictSize(db->expires);
int allvolatile = dbSize(db, DB_MAIN) == dbSize(db, DB_EXPIRES);
while(1) {
sds key;
robj *keyobj;
de = dictGetFairRandomKey(db->dict);
int randomSlot = getFairRandomSlot(db, DB_MAIN);
de = dictGetFairRandomKey(db->dict[randomSlot]);
if (de == NULL) return NULL;
key = dictGetKey(de);
keyobj = createStringObject(key,sdslen(key));
if (dictFind(db->expires,key)) {
if (dbFind(db, key, DB_EXPIRES)) {
if (allvolatile && server.masterhost && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
@ -353,11 +456,96 @@ robj *dbRandomKey(redisDb *db) {
}
}
/* Updates binary index tree (also known as Fenwick tree), increasing key count for a given slot.
* You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree
* Time complexity is O(log(CLUSTER_SLOTS)). */
void cumulativeKeyCountAdd(redisDb *db, int slot, long delta, dbKeyType keyType) {
if (!server.cluster_enabled) return;
int idx = slot + 1; /* Unlike slots, BIT is 1-based, so we need to add 1. */
while (idx <= CLUSTER_SLOTS) {
if (delta < 0) {
serverAssert(db->sub_dict[keyType].slot_size_index[idx] >= (unsigned long long)labs(delta));
}
db->sub_dict[keyType].slot_size_index[idx] += delta;
idx += (idx & -idx);
}
}
/* Returns total (cumulative) number of keys up until given slot (inclusive).
* Time complexity is O(log(CLUSTER_SLOTS)). */
unsigned long long cumulativeKeyCountRead(redisDb *db, int slot, dbKeyType keyType) {
if (!server.cluster_enabled) {
serverAssert(slot == 0);
return dbSize(db, keyType);
}
int idx = slot + 1;
unsigned long long sum = 0;
while (idx > 0) {
sum += db->sub_dict[keyType].slot_size_index[idx];
idx -= (idx & -idx);
}
return sum;
}
/* Returns fair random slot, probability of each slot being returned is proportional to the number of elements that slot dictionary holds.
* Implementation uses binary search on top of binary index tree.
* This function guarantees that it returns a slot whose dict is non-empty, unless the entire db is empty.
* Time complexity of this function is O(log^2(CLUSTER_SLOTS)). */
int getFairRandomSlot(redisDb *db, dbKeyType keyType) {
unsigned long target = dbSize(db, keyType) ? (randomULong() % dbSize(db, keyType)) + 1 : 0;
int slot = findSlotByKeyIndex(db, target, keyType);
return slot;
}
static inline unsigned long dictSizebySlot(redisDb *db, int slot, dbKeyType keyType) {
if (keyType == DB_MAIN)
return dictSize(db->dict[slot]);
else if (keyType == DB_EXPIRES)
return dictSize(db->expires[slot]);
else
serverAssert(0);
}
/* Finds a slot containing target element in a key space ordered by slot id.
* Consider this example. Slots are represented by brackets and keys by dots:
* #0 #1 #2 #3 #4
* [..][....][...][.......][.]
* ^
* target
*
* In this case slot #3 contains key that we are trying to find.
*
* This function is 1 based and the range of the target is [1..dbSize], dbSize inclusive.
* Time complexity of this function is O(log^2(CLUSTER_SLOTS))
* */
int findSlotByKeyIndex(redisDb *db, unsigned long target, dbKeyType keyType) {
if (!server.cluster_enabled || dbSize(db, keyType) == 0) return 0;
serverAssert(target <= dbSize(db, keyType));
int lo = 0, hi = CLUSTER_SLOTS - 1;
/* We use binary search to find a slot, we are allowed to do this, because we have a quick way to find a total number of keys
* up until certain slot, using binary index tree. */
while (lo <= hi) {
int mid = lo + (hi - lo) / 2;
unsigned long keys_up_to_mid = cumulativeKeyCountRead(db, mid, keyType); /* Total number of keys up until a given slot (inclusive). */
unsigned long keys_in_mid = dictSizebySlot(db, mid, keyType);
if (target > keys_up_to_mid) { /* Target is to the right from mid. */
lo = mid + 1;
} else if (target <= keys_up_to_mid - keys_in_mid) { /* Target is to the left from mid. */
hi = mid - 1;
} else { /* Located target. */
return mid;
}
}
serverPanic("Unable to find a slot that contains target key.");
}
/* Helper for sync and async delete. */
int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
dictEntry **plink;
int table;
dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink,&table);
int slot = getKeySlot(key->ptr);
dict *d = db->dict[slot];
dictEntry *de = dictTwoPhaseUnlinkFind(d,key->ptr,&plink,&table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
@ -373,14 +561,19 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
if (async) {
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
dictSetVal(db->dict, de, NULL);
dictSetVal(d, de, NULL);
}
if (server.cluster_enabled) slotToKeyDelEntry(de, db);
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
dictTwoPhaseUnlinkFree(db->dict,de,plink,table);
if (dictSize(db->expires[slot]) > 0) {
if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) {
cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES);
db->sub_dict[DB_EXPIRES].key_count--;
}
}
dictTwoPhaseUnlinkFree(d,de,plink,table);
cumulativeKeyCountAdd(db, slot, -1, DB_MAIN);
db->sub_dict[DB_MAIN].key_count--;
return 1;
} else {
return 0;
@ -462,16 +655,26 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
}
for (int j = startdb; j <= enddb; j++) {
removed += dictSize(dbarray[j].dict);
removed += dbSize(&dbarray[j], DB_MAIN);
if (async) {
emptyDbAsync(&dbarray[j]);
} else {
dictEmpty(dbarray[j].dict,callback);
dictEmpty(dbarray[j].expires,callback);
for (int k = 0; k < dbarray[j].dict_count; k++) {
dictEmpty(dbarray[j].dict[k],callback);
dictEmpty(dbarray[j].expires[k],callback);
}
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j].avg_ttl = 0;
dbarray[j].expires_cursor = 0;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
dbarray[j].sub_dict[subdict].key_count = 0;
dbarray[j].sub_dict[subdict].resize_cursor = 0;
if (server.cluster_enabled) {
unsigned long long *slot_size_index = dbarray[j].sub_dict[subdict].slot_size_index;
memset(slot_size_index, 0, sizeof(unsigned long long) * (CLUSTER_SLOTS + 1));
}
}
}
return removed;
@ -516,11 +719,6 @@ long long emptyData(int dbnum, int flags, void(callback)(dict*)) {
/* Empty redis database structure. */
removed = emptyDbStructure(server.db, dbnum, async, callback);
/* Flush slots to keys map if enable cluster, we can flush entire
* slots to keys map whatever dbnum because only support one DB
* in cluster mode. */
if (server.cluster_enabled) slotToKeyFlush(server.db);
if (dbnum == -1) flushSlaveKeysWithExpireList();
if (with_functions) {
@ -541,14 +739,12 @@ long long emptyData(int dbnum, int flags, void(callback)(dict*)) {
redisDb *initTempDb(void) {
redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum);
for (int i=0; i<server.dbnum; i++) {
tempDb[i].dict = dictCreate(&dbDictType);
tempDb[i].expires = dictCreate(&dbExpiresDictType);
tempDb[i].slots_to_keys = NULL;
}
if (server.cluster_enabled) {
/* Prepare temp slot to key map to be written during async diskless replication. */
slotToKeyInit(tempDb);
tempDb[i].dict_count = (server.cluster_enabled) ? CLUSTER_SLOTS : 1;
tempDb[i].dict = dictCreateMultiple(&dbDictType, tempDb[i].dict_count);
tempDb[i].expires = dictCreateMultiple(&dbExpiresDictType, tempDb[i].dict_count);
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
tempDb[i].sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL;
}
}
return tempDb;
@ -561,13 +757,17 @@ void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, async, callback);
for (int i=0; i<server.dbnum; i++) {
dictRelease(tempDb[i].dict);
dictRelease(tempDb[i].expires);
}
if (server.cluster_enabled) {
/* Release temp slot to key map. */
slotToKeyDestroy(tempDb);
for (int j=0; j<tempDb[i].dict_count; j++) {
dictRelease(tempDb[i].dict[j]);
dictRelease(tempDb[i].expires[j]);
}
zfree(tempDb[i].dict);
zfree(tempDb[i].expires);
if (server.cluster_enabled) {
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
zfree(tempDb[i].sub_dict[subdict].slot_size_index);
}
}
}
zfree(tempDb);
@ -584,7 +784,7 @@ long long dbTotalServerKeyCount(void) {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += dictSize(server.db[j].dict);
total += dbSize(&server.db[j], DB_MAIN);
}
return total;
}
@ -783,17 +983,15 @@ void randomkeyCommand(client *c) {
}
void keysCommand(client *c) {
dictIterator *di;
dictEntry *de;
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
long numkeys = 0;
void *replylen = addReplyDeferredLen(c);
di = dictGetSafeIterator(c->db->dict);
dbIterator *dbit = dbIteratorInit(c->db, DB_MAIN);
allkeys = (pattern[0] == '*' && plen == 1);
robj keyobj;
while((de = dictNext(di)) != NULL) {
while ((de = dbIteratorNext(dbit)) != NULL) {
sds key = dictGetKey(de);
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
@ -806,7 +1004,7 @@ void keysCommand(client *c) {
if (c->flags & CLIENT_CLOSE_ASAP)
break;
}
dictReleaseIterator(di);
zfree(dbit);
setDeferredArrayLen(c,replylen,numkeys);
}
@ -886,15 +1084,8 @@ void scanCallback(void *privdata, const dictEntry *de) {
* if the cursor is valid, store it as unsigned integer into *cursor and
* returns C_OK. Otherwise return C_ERR and send an error to the
* client. */
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
char *eptr;
/* Use strtoul() because we need an *unsigned* long, so
* getLongLongFromObject() does not cover the whole cursor space. */
errno = 0;
*cursor = strtoul(o->ptr, &eptr, 10);
if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
{
int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor) {
if (!string2ull(o->ptr, cursor)) {
addReplyError(c, "invalid cursor");
return C_ERR;
}
@ -952,7 +1143,7 @@ char *getObjectTypeName(robj *o) {
*
* In the case of a Hash object the function returns both the field and value
* of every element on the Hash. */
void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
int i, j;
listNode *node;
long count = 10;
@ -1022,7 +1213,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
/* Handle the case of a hash table. */
ht = NULL;
if (o == NULL) {
ht = c->db->dict;
ht = NULL;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
@ -1045,7 +1236,8 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
listSetFreeMethod(keys, (void (*)(void*))sdsfree);
}
if (ht) {
/* For main dictionary scan or data structure using hashtable. */
if (!o || ht) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
@ -1071,7 +1263,13 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
.sampled = 0,
};
do {
cursor = dictScan(ht, cursor, scanCallback, &data);
/* In cluster mode there is a separate dictionary for each slot.
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = dbScan(c->db, DB_MAIN, cursor, scanCallback, NULL, &data);
} else {
cursor = dictScan(ht, cursor, scanCallback, &data);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
char *str;
@ -1158,15 +1356,111 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
listRelease(keys);
}
void addSlotIdToCursor(int slot, unsigned long long *cursor) {
if (!server.cluster_enabled) return;
/* Slot id can be -1 when iteration is over and there are no more slots to visit. */
if (slot < 0) return;
*cursor = (*cursor << CLUSTER_SLOT_MASK_BITS) | slot;
}
int getAndClearSlotIdFromCursor(unsigned long long *cursor) {
if (!server.cluster_enabled) return 0;
int slot = (int) (*cursor & CLUSTER_SLOT_MASK);
*cursor = *cursor >> CLUSTER_SLOT_MASK_BITS;
return slot;
}
/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(client *c) {
unsigned long cursor;
unsigned long long cursor;
if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
scanGenericCommand(c,NULL,cursor);
}
void dbsizeCommand(client *c) {
addReplyLongLong(c,dictSize(c->db->dict));
redisDb *db = c->db;
unsigned long long int size = dbSize(db, DB_MAIN);
addReplyLongLong(c, size);
}
unsigned long long int dbSize(redisDb *db, dbKeyType keyType) {
return db->sub_dict[keyType].key_count;
}
/* This method proivdes the cumulative sum of all the dictionary buckets
* across dictionaries in a database. */
unsigned long dbBuckets(redisDb *db, dbKeyType keyType) {
unsigned long buckets = 0;
dict *d;
dbIterator *dbit = dbIteratorInit(db, keyType);
while ((d = dbIteratorNextDict(dbit))) {
buckets += dictBuckets(d);
}
zfree(dbit);
return buckets;
}
size_t dbMemUsage(redisDb *db, dbKeyType keyType) {
size_t mem = 0;
unsigned long long keys_count = dbSize(db, keyType);
mem += keys_count * dictEntryMemUsage() +
dbBuckets(db, keyType) * sizeof(dictEntry*) +
db->dict_count * sizeof(dict);
if (keyType == DB_MAIN) {
mem+=keys_count * sizeof(robj);
}
return mem;
}
dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType){
int slot = getKeySlot(key);
if (keyType == DB_MAIN)
return dictFind(db->dict[slot], key);
else if (keyType == DB_EXPIRES)
return dictFind(db->expires[slot], key);
else
serverAssert(0);
}
/*
* This method is used to iterate over the elements of the entire database specifically across slots.
* It's a three pronged approach.
*
* 1. It uses the provided cursor `v` to retrieve the slot from it.
* 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`,
* it performs a dictScan over the appropriate `keyType` dictionary of `db`.
* 3. If the slot is entirely scanned i.e. the cursor has reached 0, the next non empty slot is discovered.
* The slot information is embedded into the cursor and returned.
*/
unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long v, dictScanFunction *fn, int (dictScanValidFunction)(dict *d), void *privdata) {
dict *d;
unsigned long long cursor = 0;
/* During main dictionary traversal in cluster mode, 48 lower bits in the cursor are used for positioning in the HT.
* Following 14 bits are used for the slot number, ranging from 0 to 2^14-1.
* Slot is always 0 at the start of iteration and can be incremented only in cluster mode. */
int slot = getAndClearSlotIdFromCursor(&v);
if (keyType == DB_MAIN)
d = db->dict[slot];
else if (keyType == DB_EXPIRES)
d = db->expires[slot];
else
serverAssert(0);
int is_dict_valid = (dictScanValidFunction == NULL || dictScanValidFunction(d) == C_OK);
if (is_dict_valid) {
cursor = dictScan(d, v, fn, privdata);
} else {
serverLog(LL_DEBUG, "Slot [%d] not valid for scanning, skipping.", slot);
}
/* scanning done for the current dictionary or if the scanning wasn't possible, move to the next slot. */
if (cursor == 0 || !is_dict_valid) {
slot = dbGetNextNonEmptySlot(db, slot, keyType);
}
if (slot == -1) {
return 0;
}
addSlotIdToCursor(slot, &cursor);
return cursor;
}
void lastsaveCommand(client *c) {
@ -1463,7 +1757,7 @@ void scanDatabaseForReadyKeys(redisDb *db) {
dictIterator *di = dictGetSafeIterator(db->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
dictEntry *kde = dictFind(db->dict,key->ptr);
dictEntry *kde = dbFind(db, key->ptr, DB_MAIN);
if (kde) {
robj *value = dictGetVal(kde);
signalKeyAsReady(db, key, value->type);
@ -1483,7 +1777,7 @@ void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) {
int existed = 0, exists = 0;
int original_type = -1, curr_type = -1;
dictEntry *kde = dictFind(emptied->dict, key->ptr);
dictEntry *kde = dbFind(emptied, key->ptr, DB_MAIN);
if (kde) {
robj *value = dictGetVal(kde);
original_type = value->type;
@ -1491,7 +1785,7 @@ void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) {
}
if (replaced_with) {
dictEntry *kde = dictFind(replaced_with->dict, key->ptr);
kde = dbFind(replaced_with, key->ptr, DB_MAIN);
if (kde) {
robj *value = dictGetVal(kde);
curr_type = value->type;
@ -1536,11 +1830,23 @@ int dbSwapDatabases(int id1, int id2) {
db1->expires = db2->expires;
db1->avg_ttl = db2->avg_ttl;
db1->expires_cursor = db2->expires_cursor;
db1->dict_count = db2->dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count;
db1->sub_dict[subdict].resize_cursor = db2->sub_dict[subdict].resize_cursor;
db1->sub_dict[subdict].slot_size_index = db2->sub_dict[subdict].slot_size_index;
}
db2->dict = aux.dict;
db2->expires = aux.expires;
db2->avg_ttl = aux.avg_ttl;
db2->expires_cursor = aux.expires_cursor;
db2->dict_count = aux.dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
db2->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor;
db2->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index;
}
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
@ -1560,13 +1866,6 @@ int dbSwapDatabases(int id1, int id2) {
* database (temp) as the main (active) database, the actual freeing of old database
* (which will now be placed in the temp one) is done later. */
void swapMainDbWithTempDb(redisDb *tempDb) {
if (server.cluster_enabled) {
/* Swap slots_to_keys from tempdb just loaded with main db slots_to_keys. */
clusterSlotToKeyMapping *aux = server.db->slots_to_keys;
server.db->slots_to_keys = tempDb->slots_to_keys;
tempDb->slots_to_keys = aux;
}
for (int i=0; i<server.dbnum; i++) {
redisDb aux = server.db[i];
redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
@ -1585,12 +1884,23 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
activedb->expires = newdb->expires;
activedb->avg_ttl = newdb->avg_ttl;
activedb->expires_cursor = newdb->expires_cursor;
activedb->dict_count = newdb->dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count;
activedb->sub_dict[subdict].resize_cursor = newdb->sub_dict[subdict].resize_cursor;
activedb->sub_dict[subdict].slot_size_index = newdb->sub_dict[subdict].slot_size_index;
}
newdb->dict = aux.dict;
newdb->expires = aux.expires;
newdb->avg_ttl = aux.avg_ttl;
newdb->expires_cursor = aux.expires_cursor;
newdb->dict_count = aux.dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
newdb->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor;
newdb->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index;
}
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
@ -1643,7 +1953,13 @@ void swapdbCommand(client *c) {
*----------------------------------------------------------------------------*/
int removeExpire(redisDb *db, robj *key) {
return dictDelete(db->expires,key->ptr) == DICT_OK;
if (dictDelete(db->expires[(getKeySlot(key->ptr))],key->ptr) == DICT_OK) {
db->sub_dict[DB_EXPIRES].key_count--;
cumulativeKeyCountAdd(db, getKeySlot(key->ptr), -1, DB_EXPIRES);
return 1;
} else {
return 0;
}
}
/* Set an expire to the specified key. If the expire is set in the context
@ -1651,13 +1967,20 @@ int removeExpire(redisDb *db, robj *key) {
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
dictEntry *kde, *de, *existing;
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->dict,key->ptr);
kde = dbFind(db, key->ptr, DB_MAIN);
serverAssertWithInfo(NULL,key,kde != NULL);
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
int slot = getKeySlot(key->ptr);
de = dictAddRaw(db->expires[slot], dictGetKey(kde), &existing);
if (existing) {
dictSetSignedIntegerVal(existing, when);
} else {
dictSetSignedIntegerVal(de, when);
db->sub_dict[DB_EXPIRES].key_count++;
cumulativeKeyCountAdd(db, slot, 1, DB_EXPIRES);
}
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
@ -1670,8 +1993,8 @@ long long getExpire(redisDb *db, robj *key) {
dictEntry *de;
/* No expire? return ASAP */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,key->ptr)) == NULL) return -1;
if (dictSize(db->expires[getKeySlot(key->ptr)]) == 0 ||
(de = dbFind(db,key->ptr, DB_EXPIRES)) == NULL) return -1;
return dictGetSignedIntegerVal(de);
}
@ -1817,6 +2140,54 @@ int expireIfNeeded(redisDb *db, robj *key, int flags) {
return 1;
}
/*
* This functions increases size of the main/expires db to match desired number.
* In cluster mode resizes all individual dictionaries for slots that this node owns.
*
* Based on the parameter `try_expand`, appropriate dict expand API is invoked.
* if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`.
* The return code is either `DICT_OK`/`DICT_ERR` for both the API(s).
* `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in
* `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
*/
int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_expand) {
dict *d;
if (server.cluster_enabled) {
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (clusterNodeGetSlotBit(server.cluster->myself, i)) {
/* We don't know exact number of keys that would fall into each slot, but we can approximate it, assuming even distribution. */
if (keyType == DB_MAIN) {
d = db->dict[i];
} else {
d = db->expires[i];
}
int result = try_expand ? dictTryExpand(d, db_size) : dictExpand(d, db_size);
if (try_expand && result == DICT_ERR) {
serverLog(LL_WARNING, "Dict expansion failed for type :%s slot: %d",
keyType == DB_MAIN ? "main" : "expires", i);
return C_ERR;
} else if (result == DICT_ERR) {
serverLog(LL_DEBUG, "Dict expansion skipped for type :%s slot: %d",
keyType == DB_MAIN ? "main" : "expires", i);
}
}
}
} else {
if (keyType == DB_MAIN) {
d = db->dict[0];
} else {
d = db->expires[0];
}
int result = try_expand ? dictTryExpand(d, db_size) : dictExpand(d, db_size);
if (try_expand && result == DICT_ERR) {
serverLog(LL_WARNING, "Dict expansion failed for db type: %s",
keyType == DB_MAIN ? "main" : "expires");
return C_ERR;
}
}
return C_OK;
}
/* -----------------------------------------------------------------------------
* API to get key arguments from commands
* ---------------------------------------------------------------------------*/
@ -2556,3 +2927,42 @@ int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResu
}
return 1;
}
void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyType) {
size_t l;
char *orig_buf = buf;
size_t orig_bufsize = bufsize;
dictStats *mainHtStats = NULL;
dictStats *rehashHtStats = NULL;
dict *d;
dbIterator *dbit = dbIteratorInit(db, keyType);
while ((d = dbIteratorNextDict(dbit))) {
dictStats *stats = dictGetStatsHt(d, 0, full);
if (!mainHtStats) {
mainHtStats = stats;
} else {
dictCombineStats(stats, mainHtStats);
dictFreeStats(stats);
}
if (dictIsRehashing(d)) {
stats = dictGetStatsHt(d, 1, full);
if (!rehashHtStats) {
rehashHtStats = stats;
} else {
dictCombineStats(stats, rehashHtStats);
dictFreeStats(stats);
}
}
}
zfree(dbit);
l = dictGetStatsMsg(buf, bufsize, mainHtStats, full);
dictFreeStats(mainHtStats);
buf += l;
bufsize -= l;
if (rehashHtStats && bufsize > 0) {
dictGetStatsMsg(buf, bufsize, rehashHtStats, full);
dictFreeStats(rehashHtStats);
}
/* Make sure there is a NULL term at the end. */
if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0';
}

View File

@ -76,6 +76,7 @@ void bugReportStart(void);
void printCrashReport(void);
void bugReportEnd(int killViaSignal, int sig);
void logStackTrace(void *eip, int uplevel);
void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyType);
void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret);
/* ================================= Debugging ============================== */
@ -281,7 +282,6 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o)
* a different digest. */
void computeDatasetDigest(unsigned char *final) {
unsigned char digest[20];
dictIterator *di = NULL;
dictEntry *de;
int j;
uint32_t aux;
@ -290,17 +290,15 @@ void computeDatasetDigest(unsigned char *final) {
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
if (dbSize(db, DB_MAIN) == 0) continue;
dbIterator *dbit = dbIteratorInit(db, DB_MAIN);
if (dictSize(db->dict) == 0) continue;
di = dictGetSafeIterator(db->dict);
/* hash the DB id, so the same dataset moved in a different
* DB will lead to a different digest */
/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
aux = htonl(j);
mixDigest(final,&aux,sizeof(aux));
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
while((de = dbIteratorNext(dbit)) != NULL) {
sds key;
robj *keyobj, *o;
@ -317,7 +315,7 @@ void computeDatasetDigest(unsigned char *final) {
xorDigest(final,digest,20);
decrRefCount(keyobj);
}
dictReleaseIterator(di);
zfree(dbit);
}
}
@ -610,7 +608,7 @@ NULL
robj *val;
char *strenc;
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) {
if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) {
addReplyErrorObject(c,shared.nokeyerr);
return;
}
@ -662,7 +660,7 @@ NULL
robj *val;
sds key;
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) {
if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) {
addReplyErrorObject(c,shared.nokeyerr);
return;
}
@ -718,7 +716,7 @@ NULL
if (getPositiveLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != C_OK)
return;
if (dictTryExpand(c->db->dict, keys) != DICT_OK) {
if (dbExpand(c->db, keys, DB_MAIN, 1) == C_ERR) {
addReplyError(c, "OOM in dictTryExpand");
return;
}
@ -766,7 +764,7 @@ NULL
/* We don't use lookupKey because a debug command should
* work on logically expired keys */
dictEntry *de;
robj *o = ((de = dictFind(c->db->dict,c->argv[j]->ptr)) == NULL) ? NULL : dictGetVal(de);
robj *o = ((de = dbFind(c->db, c->argv[j]->ptr, DB_MAIN)) == NULL) ? NULL : dictGetVal(de);
if (o) xorObjectDigest(c->db,c->argv[j],digest,o);
sds d = sdsempty();
@ -910,11 +908,11 @@ NULL
full = 1;
stats = sdscatprintf(stats,"[Dictionary HT]\n");
dictGetStats(buf,sizeof(buf),server.db[dbid].dict,full);
dbGetStats(buf, sizeof(buf), &server.db[dbid], full, DB_MAIN);
stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires HT]\n");
dictGetStats(buf,sizeof(buf),server.db[dbid].expires,full);
dbGetStats(buf, sizeof(buf), &server.db[dbid], full, DB_EXPIRES);
stats = sdscat(stats,buf);
addReplyVerbatim(c,stats,sdslen(stats),"txt");
@ -2046,7 +2044,7 @@ void logCurrentClient(client *cc, const char *title) {
dictEntry *de;
key = getDecodedObject(cc->argv[1]);
de = dictFind(cc->db->dict, key->ptr);
de = dbFind(cc->db, key->ptr, DB_MAIN);
if (de) {
val = dictGetVal(de);
serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)key->ptr);

View File

@ -40,6 +40,11 @@
#ifdef HAVE_DEFRAG
typedef struct defragCtx {
redisDb *db;
int slot;
} defragCtx;
/* this method was added to jemalloc in order to help us understand which
* pointers are worthwhile moving and which aren't */
int je_get_defrag_hint(void* ptr);
@ -669,30 +674,31 @@ void defragModule(redisDb *db, dictEntry *kde) {
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. Returns a stat of how many pointers were
* moved. */
void defragKey(redisDb *db, dictEntry *de) {
void defragKey(defragCtx *ctx, dictEntry *de) {
sds keysds = dictGetKey(de);
robj *newob, *ob;
unsigned char *newzl;
sds newsds;
redisDb *db = ctx->db;
int slot = ctx->slot;
/* Try to defrag the key name. */
newsds = activeDefragSds(keysds);
if (newsds) {
dictSetKey(db->dict, de, newsds);
if (dictSize(db->expires)) {
dictSetKey(db->dict[slot], de, newsds);
if (dbSize(db, DB_EXPIRES)) {
/* We can't search in db->expires for that key after we've released
* the pointer it holds, since it won't be able to do the string
* compare, but we can find the entry using key hash and pointer. */
uint64_t hash = dictGetHash(db->dict, newsds);
dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires, keysds, hash);
if (expire_de) dictSetKey(db->expires, expire_de, newsds);
uint64_t hash = dictGetHash(db->dict[slot], newsds);
dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires[slot], keysds, hash);
if (expire_de) dictSetKey(db->expires[slot], expire_de, newsds);
}
}
/* Try to defrag robj and / or string value. */
ob = dictGetVal(de);
if ((newob = activeDefragStringOb(ob))) {
dictSetVal(db->dict, de, newob);
dictSetVal(db->dict[slot], de, newob);
ob = newob;
}
@ -749,7 +755,7 @@ void defragKey(redisDb *db, dictEntry *de) {
/* Defrag scan callback for the main db dictionary. */
void defragScanCallback(void *privdata, const dictEntry *de) {
long long hits_before = server.stat_active_defrag_hits;
defragKey((redisDb*)privdata, (dictEntry*)de);
defragKey((defragCtx*)privdata, (dictEntry*)de);
if (server.stat_active_defrag_hits != hits_before)
server.stat_active_defrag_key_hits++;
else
@ -820,7 +826,7 @@ static sds defrag_later_current_key = NULL;
static unsigned long defrag_later_cursor = 0;
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
int defragLaterStep(redisDb *db, long long endtime) {
int defragLaterStep(redisDb *db, int slot, long long endtime) {
unsigned int iterations = 0;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
@ -850,7 +856,7 @@ int defragLaterStep(redisDb *db, long long endtime) {
}
/* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
dictEntry *de = dictFind(db->dict, defrag_later_current_key);
dictEntry *de = dictFind(db->dict[slot], defrag_later_current_key);
key_defragged = server.stat_active_defrag_hits;
do {
int quit = 0;
@ -918,7 +924,10 @@ void computeDefragCycles(void) {
* This works in a similar way to activeExpireCycle, in the sense that
* we do incremental work across calls. */
void activeDefragCycle(void) {
static defragCtx ctx;
static int slot = -1;
static int current_db = -1;
static int defrag_later_item_in_progress = 0;
static unsigned long cursor = 0;
static unsigned long expires_cursor = 0;
static redisDb *db = NULL;
@ -940,6 +949,9 @@ void activeDefragCycle(void) {
defrag_later_cursor = 0;
current_db = -1;
cursor = 0;
expires_cursor = 0;
slot = -1;
defrag_later_item_in_progress = 0;
db = NULL;
goto update_metrics;
}
@ -967,9 +979,9 @@ void activeDefragCycle(void) {
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
do {
/* if we're not continuing a scan from the last call or loop, start a new one */
if (!cursor && !expires_cursor) {
if (!cursor && !expires_cursor && (slot < 0)) {
/* finish any leftovers from previous db before moving to the next one */
if (db && defragLaterStep(db, endtime)) {
if (db && defragLaterStep(db, slot, endtime)) {
quit = 1; /* time is up, we didn't finish all the work */
break; /* this will exit the function and we'll continue on the next cycle */
}
@ -989,7 +1001,11 @@ void activeDefragCycle(void) {
start_scan = now;
current_db = -1;
cursor = 0;
expires_cursor = 0;
slot = -1;
defrag_later_item_in_progress = 0;
db = NULL;
memset(&ctx, -1, sizeof(ctx));
server.active_defrag_running = 0;
computeDefragCycles(); /* if another scan is needed, start it right away */
@ -1005,32 +1021,47 @@ void activeDefragCycle(void) {
db = &server.db[current_db];
cursor = 0;
expires_cursor = 0;
slot = findSlotByKeyIndex(db, 1, DB_MAIN);
defrag_later_item_in_progress = 0;
ctx.db = db;
ctx.slot = slot;
}
do {
dict *d = db->dict[slot];
/* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */
if (defragLaterStep(db, endtime)) {
if (defragLaterStep(db, slot, endtime)) {
quit = 1; /* time is up, we didn't finish all the work */
break; /* this will exit the function and we'll continue on the next cycle */
}
/* Scan the keyspace dict unless we're scanning the expire dict. */
if (!expires_cursor)
cursor = dictScanDefrag(db->dict, cursor, defragScanCallback,
&defragfns, db);
/* When done scanning the keyspace dict, we scan the expire dict. */
if (!cursor)
expires_cursor = dictScanDefrag(db->expires, expires_cursor,
scanCallbackCountScanned,
&defragfns, NULL);
if (!defrag_later_item_in_progress) {
/* Scan the keyspace dict unless we're scanning the expire dict. */
if (!expires_cursor)
cursor = dictScanDefrag(d, cursor, defragScanCallback,
&defragfns, &ctx);
/* When done scanning the keyspace dict, we scan the expire dict. */
if (!cursor)
expires_cursor = dictScanDefrag(db->expires[slot], expires_cursor,
scanCallbackCountScanned,
&defragfns, NULL);
}
if (!(cursor || expires_cursor)) {
/* Move to the next slot only if regular and large item scanning has been completed. */
if (listLength(db->defrag_later) > 0) {
defrag_later_item_in_progress = 1;
continue;
}
slot = dbGetNextNonEmptySlot(db, slot, DB_MAIN);
ctx.slot = slot;
}
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
* (if we have a lot of pointers in one hash bucket or rehashing),
* check if we reached the time limit.
* But regardless, don't start a new db in this loop, this is because after
* the last db we call defragOtherGlobals, which must be done in one cycle */
if (!(cursor || expires_cursor) ||
if ((!(cursor || expires_cursor) && slot == -1) ||
++iterations > 16 ||
server.stat_active_defrag_hits - prev_defragged > 512 ||
server.stat_active_defrag_scanned - prev_scanned > 64)
@ -1043,7 +1074,7 @@ void activeDefragCycle(void) {
prev_defragged = server.stat_active_defrag_hits;
prev_scanned = server.stat_active_defrag_scanned;
}
} while((cursor || expires_cursor) && !quit);
} while(((cursor || expires_cursor) || slot > 0) && !quit);
} while(!quit);
latencyEndMonitor(latency);

View File

@ -46,6 +46,7 @@
#include "dict.h"
#include "zmalloc.h"
#include "redisassert.h"
#include "monotonic.h"
/* Using dictEnableResize() / dictDisableResize() we make possible to disable
* resizing and rehashing of the hash table as needed. This is very important
@ -59,7 +60,6 @@ static dictResizeEnable dict_can_resize = DICT_RESIZE_ENABLE;
static unsigned int dict_force_resize_ratio = 5;
/* -------------------------- types ----------------------------------------- */
struct dictEntry {
void *key;
union {
@ -69,9 +69,6 @@ struct dictEntry {
double d;
} v;
struct dictEntry *next; /* Next entry in the same hash bucket. */
void *metadata[]; /* An arbitrary number of bytes (starting at a
* pointer-aligned address) of size as returned
* by dictType's dictEntryMetadataBytes(). */
};
typedef struct {
@ -184,16 +181,21 @@ static void _dictReset(dict *d, int htidx)
/* Create a new hash table */
dict *dictCreate(dictType *type)
{
size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes() : 0;
dict *d = zmalloc(sizeof(*d) + metasize);
if (metasize) {
memset(dictMetadata(d), 0, metasize);
}
dict *d = zmalloc(sizeof(*d));
_dictInit(d,type);
return d;
}
/* Create an array of dictionaries */
dict **dictCreateMultiple(dictType *type, int count)
{
dict **d = zmalloc(sizeof(dict*) * count);
for (int i = 0; i < count; i++) {
d[i] = dictCreate(type);
}
return d;
}
/* Initialize the hash table */
int _dictInit(dict *d, dictType *type)
{
@ -268,6 +270,7 @@ int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
d->ht_used[1] = new_ht_used;
d->ht_table[1] = new_ht_table;
d->rehashidx = 0;
if (d->type->rehashingStarted) d->type->rehashingStarted(d);
return DICT_OK;
}
@ -390,15 +393,16 @@ long long timeInMilliseconds(void) {
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
* than 0, and is smaller than 1 in most cases. The exact upper bound
* depends on the running time of dictRehash(d,100).*/
int dictRehashMilliseconds(dict *d, int ms) {
int dictRehashMilliseconds(dict *d, unsigned int ms) {
if (d->pauserehash > 0) return 0;
long long start = timeInMilliseconds();
monotime timer;
elapsedStart(&timer);
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
if (elapsedMs(timer) >= ms) break;
}
return rehashes;
}
@ -415,11 +419,6 @@ static void _dictRehashStep(dict *d) {
if (d->pauserehash == 0) dictRehash(d,1);
}
/* Return a pointer to the metadata section within the dict. */
void *dictMetadata(dict *d) {
return &d->metadata;
}
/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
@ -472,9 +471,7 @@ dictEntry *dictInsertAtPosition(dict *d, void *key, void *position) {
int htidx = dictIsRehashing(d) ? 1 : 0;
assert(bucket >= &d->ht_table[htidx][0] &&
bucket <= &d->ht_table[htidx][DICTHT_SIZE_MASK(d->ht_size_exp[htidx])]);
size_t metasize = dictEntryMetadataSize(d);
if (d->type->no_value) {
assert(!metasize); /* Entry metadata + no value not supported. */
if (d->type->keys_are_odd && !*bucket) {
/* We can store the key directly in the destination bucket without the
* allocated entry.
@ -494,11 +491,8 @@ dictEntry *dictInsertAtPosition(dict *d, void *key, void *position) {
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
entry = zmalloc(sizeof(*entry) + metasize);
entry = zmalloc(sizeof(*entry));
assert(entryIsNormal(entry)); /* Check alignment of allocation */
if (metasize > 0) {
memset(dictEntryMetadata(entry), 0, metasize);
}
entry->key = key;
entry->next = *bucket;
}
@ -788,12 +782,6 @@ double dictIncrDoubleVal(dictEntry *de, double val) {
return de->v.d += val;
}
/* A pointer to the metadata section within the dict entry. */
void *dictEntryMetadata(dictEntry *de) {
assert(entryHasValue(de));
return &de->metadata;
}
void *dictGetKey(const dictEntry *de) {
if (entryIsKey(de)) return (void*)de;
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->key;
@ -856,7 +844,7 @@ static void dictSetNext(dictEntry *de, dictEntry *next) {
* and values. */
size_t dictMemUsage(const dict *d) {
return dictSize(d) * sizeof(dictEntry) +
dictSlots(d) * sizeof(dictEntry*);
dictBuckets(d) * sizeof(dictEntry*);
}
size_t dictEntryMemUsage(void) {
@ -1000,7 +988,7 @@ dictEntry *dictGetRandomKey(dict *d)
do {
/* We are sure there are no elements in indexes from 0
* to rehashidx-1 */
h = d->rehashidx + (randomULong() % (dictSlots(d) - d->rehashidx));
h = d->rehashidx + (randomULong() % (dictBuckets(d) - d->rehashidx));
he = (h >= s0) ? d->ht_table[1][h - s0] : d->ht_table[0][h];
} while(he == NULL);
} else {
@ -1132,7 +1120,7 @@ end:
/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dict *d, dictEntry **bucketref, dictDefragFunctions *defragfns) {
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
@ -1159,8 +1147,6 @@ static void dictDefragBucket(dict *d, dictEntry **bucketref, dictDefragFunctions
}
if (newde) {
*bucketref = newde;
if (d->type->afterReplaceEntry)
d->type->afterReplaceEntry(d, newde);
}
bucketref = dictGetNextRef(*bucketref);
}
@ -1323,7 +1309,7 @@ unsigned long dictScanDefrag(dict *d,
/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragfns);
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns);
}
de = d->ht_table[htidx0][v & m0];
while (de) {
@ -1356,7 +1342,7 @@ unsigned long dictScanDefrag(dict *d,
/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragfns);
dictDefragBucket(&d->ht_table[htidx0][v & m0], defragfns);
}
de = d->ht_table[htidx0][v & m0];
while (de) {
@ -1370,7 +1356,7 @@ unsigned long dictScanDefrag(dict *d,
do {
/* Emit entries at cursor */
if (defragfns) {
dictDefragBucket(d, &d->ht_table[htidx1][v & m1], defragfns);
dictDefragBucket(&d->ht_table[htidx1][v & m1], defragfns);
}
de = d->ht_table[htidx1][v & m1];
while (de) {
@ -1519,77 +1505,87 @@ dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash)
}
/* ------------------------------- Debugging ---------------------------------*/
#define DICT_STATS_VECTLEN 50
size_t _dictGetStatsHt(char *buf, size_t bufsize, dict *d, int htidx, int full) {
unsigned long i, slots = 0, chainlen, maxchainlen = 0;
unsigned long totchainlen = 0;
unsigned long clvector[DICT_STATS_VECTLEN];
size_t l = 0;
void dictFreeStats(dictStats *stats) {
zfree(stats->clvector);
zfree(stats);
}
if (d->ht_used[htidx] == 0) {
return snprintf(buf,bufsize,
"Hash table %d stats (%s):\n"
"No stats available for empty dictionaries\n",
htidx, (htidx == 0) ? "main hash table" : "rehashing target");
}
if (!full) {
l += snprintf(buf+l,bufsize-l,
"Hash table %d stats (%s):\n"
" table size: %lu\n"
" number of elements: %lu\n",
htidx, (htidx == 0) ? "main hash table" : "rehashing target",
DICTHT_SIZE(d->ht_size_exp[htidx]), d->ht_used[htidx]);
/* Make sure there is a NULL term at the end. */
buf[bufsize-1] = '\0';
/* Unlike snprintf(), return the number of characters actually written. */
return strlen(buf);
void dictCombineStats(dictStats *from, dictStats *into) {
into->buckets += from->buckets;
into->maxChainLen = (from->maxChainLen > into->maxChainLen) ? from->maxChainLen : into->maxChainLen;
into->totalChainLen += from->totalChainLen;
into->htSize += from->htSize;
into->htUsed += from->htUsed;
for (int i = 0; i < DICT_STATS_VECTLEN; i++) {
into->clvector[i] += from->clvector[i];
}
}
dictStats *dictGetStatsHt(dict *d, int htidx, int full) {
unsigned long *clvector = zcalloc(sizeof(unsigned long) * DICT_STATS_VECTLEN);
dictStats *stats = zcalloc(sizeof(dictStats));
stats->htidx = htidx;
stats->clvector = clvector;
stats->htSize = DICTHT_SIZE(d->ht_size_exp[htidx]);
stats->htUsed = d->ht_used[htidx];
if (!full) return stats;
/* Compute stats. */
for (i = 0; i < DICT_STATS_VECTLEN; i++) clvector[i] = 0;
for (i = 0; i < DICTHT_SIZE(d->ht_size_exp[htidx]); i++) {
for (unsigned long i = 0; i < DICTHT_SIZE(d->ht_size_exp[htidx]); i++) {
dictEntry *he;
if (d->ht_table[htidx][i] == NULL) {
clvector[0]++;
continue;
}
slots++;
stats->buckets++;
/* For each hash entry on this slot... */
chainlen = 0;
unsigned long chainlen = 0;
he = d->ht_table[htidx][i];
while(he) {
chainlen++;
he = dictGetNext(he);
}
clvector[(chainlen < DICT_STATS_VECTLEN) ? chainlen : (DICT_STATS_VECTLEN-1)]++;
if (chainlen > maxchainlen) maxchainlen = chainlen;
totchainlen += chainlen;
if (chainlen > stats->maxChainLen) stats->maxChainLen = chainlen;
stats->totalChainLen += chainlen;
}
/* Generate human readable stats. */
l += snprintf(buf+l,bufsize-l,
"Hash table %d stats (%s):\n"
" table size: %lu\n"
" number of elements: %lu\n"
" different slots: %lu\n"
" max chain length: %lu\n"
" avg chain length (counted): %.02f\n"
" avg chain length (computed): %.02f\n"
" Chain length distribution:\n",
htidx, (htidx == 0) ? "main hash table" : "rehashing target",
DICTHT_SIZE(d->ht_size_exp[htidx]), d->ht_used[htidx], slots, maxchainlen,
(float)totchainlen/slots, (float)d->ht_used[htidx]/slots);
return stats;
}
for (i = 0; i < DICT_STATS_VECTLEN-1; i++) {
if (clvector[i] == 0) continue;
if (l >= bufsize) break;
l += snprintf(buf+l,bufsize-l,
" %ld: %ld (%.02f%%)\n",
i, clvector[i], ((float)clvector[i]/DICTHT_SIZE(d->ht_size_exp[htidx]))*100);
/* Generates human readable stats. */
size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full) {
if (stats->htUsed == 0) {
return snprintf(buf,bufsize,
"Hash table %d stats (%s):\n"
"No stats available for empty dictionaries\n",
stats->htidx, (stats->htidx == 0) ? "main hash table" : "rehashing target");
}
size_t l = 0;
l += snprintf(buf + l, bufsize - l,
"Hash table %d stats (%s):\n"
" table size: %lu\n"
" number of elements: %lu\n",
stats->htidx, (stats->htidx == 0) ? "main hash table" : "rehashing target",
stats->htSize, stats->htUsed);
if (full) {
l += snprintf(buf + l, bufsize - l,
" different slots: %lu\n"
" max chain length: %lu\n"
" avg chain length (counted): %.02f\n"
" avg chain length (computed): %.02f\n"
" Chain length distribution:\n",
stats->buckets, stats->maxChainLen,
(float) stats->totalChainLen / stats->buckets, (float) stats->htUsed / stats->buckets);
for (unsigned long i = 0; i < DICT_STATS_VECTLEN - 1; i++) {
if (stats->clvector[i] == 0) continue;
if (l >= bufsize) break;
l += snprintf(buf + l, bufsize - l,
" %ld: %ld (%.02f%%)\n",
i, stats->clvector[i], ((float) stats->clvector[i] / stats->htSize) * 100);
}
}
/* Make sure there is a NULL term at the end. */
@ -1603,11 +1599,15 @@ void dictGetStats(char *buf, size_t bufsize, dict *d, int full) {
char *orig_buf = buf;
size_t orig_bufsize = bufsize;
l = _dictGetStatsHt(buf,bufsize,d,0,full);
if (dictIsRehashing(d) && bufsize > l) {
buf += l;
bufsize -= l;
_dictGetStatsHt(buf,bufsize,d,1,full);
dictStats *mainHtStats = dictGetStatsHt(d, 0, full);
l = dictGetStatsMsg(buf, bufsize, mainHtStats, full);
dictFreeStats(mainHtStats);
buf += l;
bufsize -= l;
if (dictIsRehashing(d) && bufsize > 0) {
dictStats *rehashHtStats = dictGetStatsHt(d, 1, full);
dictGetStatsMsg(buf, bufsize, rehashHtStats, full);
dictFreeStats(rehashHtStats);
}
/* Make sure there is a NULL term at the end. */
orig_buf[orig_bufsize-1] = '\0';

View File

@ -45,7 +45,6 @@
#define DICT_ERR 1
typedef struct dictEntry dictEntry; /* opaque */
typedef struct dict dict;
typedef struct dictType {
@ -56,6 +55,7 @@ typedef struct dictType {
void (*keyDestructor)(dict *d, void *key);
void (*valDestructor)(dict *d, void *obj);
int (*expandAllowed)(size_t moreMem, double usedRatio);
void (*rehashingStarted)(dict *d);
/* Flags */
/* The 'no_value' flag, if set, indicates that values are not used, i.e. the
* dict is a set. When this flag is set, it's not possible to access the
@ -68,14 +68,6 @@ typedef struct dictType {
unsigned int keys_are_odd:1;
/* TODO: Add a 'keys_are_even' flag and use a similar optimization if that
* flag is set. */
/* Allow each dict and dictEntry to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when allocated. */
size_t (*dictEntryMetadataBytes)(dict *d);
size_t (*dictMetadataBytes)(void);
/* Optional callback called after an entry has been reallocated (due to
* active defrag). Only called if the entry has metadata. */
void (*afterReplaceEntry)(dict *d, dictEntry *entry);
} dictType;
#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp))
@ -92,10 +84,6 @@ struct dict {
/* Keep small vars at end for optimal (minimal) struct padding */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
void *metadata[]; /* An arbitrary number of bytes (starting at a
* pointer-aligned address) of size as defined
* by dictType's dictEntryBytes. */
};
/* If safe is set to 1 this is a safe iterator, that means, you can call
@ -111,6 +99,16 @@ typedef struct dictIterator {
unsigned long long fingerprint;
} dictIterator;
typedef struct dictStats {
int htidx;
unsigned long buckets;
unsigned long maxChainLen;
unsigned long totalChainLen;
unsigned long htSize;
unsigned long htUsed;
unsigned long *clvector;
} dictStats;
typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
typedef void *(dictDefragAllocFunction)(void *ptr);
typedef struct {
@ -138,14 +136,10 @@ typedef struct {
(d)->type->keyCompare((d), key1, key2) : \
(key1) == (key2))
#define dictEntryMetadataSize(d) ((d)->type->dictEntryMetadataBytes \
? (d)->type->dictEntryMetadataBytes(d) : 0)
#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \
? (d)->type->dictMetadataBytes() : 0)
#define dictHashKey(d, key) ((d)->type->hashFunction(key))
#define dictSlots(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1]))
#define dictBuckets(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1]))
#define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1])
#define dictIsEmpty(d) ((d)->ht_used[0] == 0 && (d)->ht_used[1] == 0)
#define dictIsRehashing(d) ((d)->rehashidx != -1)
#define dictPauseRehashing(d) ((d)->pauserehash++)
#define dictResumeRehashing(d) ((d)->pauserehash--)
@ -165,6 +159,7 @@ typedef enum {
/* API */
dict *dictCreate(dictType *type);
dict **dictCreateMultiple(dictType *type, int count);
int dictExpand(dict *d, unsigned long size);
int dictTryExpand(dict *d, unsigned long size);
void *dictMetadata(dict *d);
@ -216,7 +211,7 @@ uint64_t dictGenCaseHashFunction(const unsigned char *buf, size_t len);
void dictEmpty(dict *d, void(callback)(dict*));
void dictSetResizeEnabled(dictResizeEnable enable);
int dictRehash(dict *d, int n);
int dictRehashMilliseconds(dict *d, int ms);
int dictRehashMilliseconds(dict *d, unsigned int ms);
void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
@ -224,6 +219,11 @@ unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dic
uint64_t dictGetHash(dict *d, const void *key);
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full);
dictStats* dictGetStatsHt(dict *d, int htidx, int full);
void dictCombineStats(dictStats *from, dictStats *into);
void dictFreeStats(dictStats *stats);
#ifdef REDIS_TEST
int dictTest(int argc, char *argv[], int flags);
#endif

View File

@ -58,6 +58,7 @@ struct evictionPoolEntry {
sds key; /* Key name. */
sds cached; /* Cached SDS object for key name. */
int dbid; /* Key DB number. */
int slot; /* Slot. */
};
static struct evictionPoolEntry *EvictionPoolLRU;
@ -143,7 +144,7 @@ void evictionPoolAlloc(void) {
* idle time are on the left, and keys with the higher idle time on the
* right. */
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
void evictionPoolPopulate(int dbid, int slot, dict *sampledict, redisDb *db, struct evictionPoolEntry *pool) {
int j, k, count;
dictEntry *samples[server.maxmemory_samples];
@ -161,13 +162,13 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
* dictionary (but the expires one) we need to lookup the key
* again in the key dictionary to obtain the value object. */
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
if (!(server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)) de = dictFind(db->dict[calculateKeySlot(key)], key);
o = dictGetVal(de);
}
/* Calculate the idle time according to the policy. This is called
* idle just because the code initially handled LRU, but is in fact
* just a score where an higher score means better candidate. */
* just a score where a higher score means better candidate. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
idle = estimateObjectIdleTime(o);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
@ -237,6 +238,7 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
}
pool[k].idle = idle;
pool[k].dbid = dbid;
pool[k].slot = slot;
}
}
@ -569,6 +571,7 @@ int performEvictions(void) {
/* Try to smoke-out bugs (server.also_propagate should be empty here) */
serverAssert(server.also_propagate.numops == 0);
/* Evictions are performed on random keys that have nothing to do with the current command slot. */
while (mem_freed < (long long)mem_tofree) {
int j, k, i;
@ -592,12 +595,24 @@ int performEvictions(void) {
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->dict, pool);
total_keys += keys;
}
do {
int slot = 0;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
slot = getFairRandomSlot(db, DB_MAIN);
dict = db->dict[slot];
} else {
slot = getFairRandomSlot(db, DB_EXPIRES);
dict = db->expires[slot];
}
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, slot, dict, db, pool);
total_keys += keys;
}
/* Since keys are distributed across smaller slot-specific dictionaries in cluster mode, we may need to
* visit more than one dictionary in order to populate required number of samples into eviction pool. */
} while (server.cluster_enabled && keys != 0 && server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS &&
total_keys < (unsigned long) server.maxmemory_samples
);
}
if (!total_keys) break; /* No keys to evict. */
@ -607,11 +622,11 @@ int performEvictions(void) {
bestdbid = pool[k].dbid;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[bestdbid].dict,
pool[k].key);
de = dictFind(server.db[bestdbid].dict[pool[k].slot],
pool[k].key);
} else {
de = dictFind(server.db[bestdbid].expires,
pool[k].key);
de = dictFind(server.db[bestdbid].expires[pool[k].slot],
pool[k].key);
}
/* Remove the entry from the pool. */
@ -643,7 +658,7 @@ int performEvictions(void) {
j = (++next_db) % server.dbnum;
db = server.db+j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict : db->expires;
db->dict[getFairRandomSlot(db, DB_MAIN)] : db->expires[getFairRandomSlot(db, DB_EXPIRES)];
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);

View File

@ -137,6 +137,18 @@ void expireScanCallback(void *privdata, const dictEntry *const_de) {
data->sampled++;
}
static inline int isExpiryDictValidForSamplingCb(dict *d) {
long long numkeys = dictSize(d);
unsigned long buckets = dictBuckets(d);
/* When there are less than 1% filled buckets, sampling the key
* space is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (buckets > DICT_HT_INITIAL_SIZE && (numkeys * 100/buckets < 1)) {
return C_ERR;
}
return C_OK;
}
void activeExpireCycle(int type) {
/* Adjust the running parameters according to the configured expire
* effort. The default effort is 1, and the maximum configurable effort
@ -229,23 +241,16 @@ void activeExpireCycle(int type) {
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the Redis configured "expire effort". */
do {
unsigned long num, slots;
unsigned long num;
iteration++;
/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
if ((num = dbSize(db, DB_EXPIRES)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
data.now = mstime();
/* When there are less than 1% filled slots, sampling the key
* space is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Scan through keys among keys
* with an expire set, checking for expired ones. */
data.sampled = 0;
@ -270,8 +275,10 @@ void activeExpireCycle(int type) {
long checked_buckets = 0;
while (data.sampled < num && checked_buckets < max_buckets) {
db->expires_cursor = dictScan(db->expires, db->expires_cursor,
expireScanCallback, &data);
db->expires_cursor = dbScan(db, DB_EXPIRES, db->expires_cursor, expireScanCallback, isExpiryDictValidForSamplingCb, &data);
if (db->expires_cursor == 0) {
break;
}
checked_buckets++;
}
total_expired += data.expired;
@ -378,7 +385,7 @@ void expireSlaveKeys(void) {
while(dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
redisDb *db = server.db+dbid;
dictEntry *expire = dictFind(db->expires,keyname);
dictEntry *expire = dictFind(db->expires[getKeySlot(keyname)],keyname);
int expired = 0;
if (expire &&

View File

@ -19,14 +19,19 @@ void lazyfreeFreeObject(void *args[]) {
* database which was substituted with a fresh one in the main thread
* when the database was logically deleted. */
void lazyfreeFreeDatabase(void *args[]) {
dict *ht1 = (dict *) args[0];
dict *ht2 = (dict *) args[1];
size_t numkeys = dictSize(ht1);
dictRelease(ht1);
dictRelease(ht2);
atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
dict **ht1 = (dict **) args[0];
dict **ht2 = (dict **) args[1];
int *dictCount = (int *) args[2];
for (int i=0; i<*dictCount; i++) {
size_t numkeys = dictSize(ht1[i]);
dictRelease(ht1[i]);
dictRelease(ht2[i]);
atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
}
zfree(ht1);
zfree(ht2);
zfree(dictCount);
}
/* Release the key tracking table. */
@ -174,11 +179,14 @@ void freeObjAsync(robj *key, robj *obj, int dbid) {
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->dict, *oldht2 = db->expires;
db->dict = dictCreate(&dbDictType);
db->expires = dictCreate(&dbExpiresDictType);
atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2);
dict **oldDict = db->dict;
dict **oldExpires = db->expires;
atomicIncr(lazyfree_objects,dbSize(db, DB_MAIN));
db->dict = dictCreateMultiple(&dbDictType, db->dict_count);
db->expires = dictCreateMultiple(&dbExpiresDictType, db->dict_count);
int *count = zmalloc(sizeof(int));
*count = db->dict_count;
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, oldDict, oldExpires, count);
}
/* Free the key tracking table.

View File

@ -4243,7 +4243,7 @@ void RM_ResetDataset(int restart_aof, int async) {
/* Returns the number of keys in the current db. */
unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
return dictSize(ctx->client->db->dict);
return dbSize(ctx->client->db, DB_MAIN);
}
/* Returns a name of a random key, or NULL if current db is empty. */
@ -10879,7 +10879,7 @@ typedef struct {
} ScanCBData;
typedef struct RedisModuleScanCursor{
unsigned long cursor;
unsigned long long cursor;
int done;
}RedisModuleScanCursor;
@ -10981,7 +10981,7 @@ int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanC
}
int ret = 1;
ScanCBData data = { ctx, privdata, fn };
cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, &data);
cursor->cursor = dbScan(ctx->client->db, DB_MAIN, cursor->cursor, moduleScanCallback, NULL, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;

View File

@ -394,7 +394,7 @@ void touchWatchedKey(redisDb *db, robj *key) {
/* The key was already expired when WATCH was called. */
if (db == wk->db &&
equalStringObjects(key, wk->key) &&
dictFind(db->dict, key->ptr) == NULL)
dictFind(db->dict[calculateKeySlot(key->ptr)], key->ptr) == NULL)
{
/* Already expired key is deleted, so logically no change. Clear
* the flag. Deleted keys are not flagged as expired. */
@ -432,9 +432,9 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
dictIterator *di = dictGetSafeIterator(emptied->watched_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int exists_in_emptied = dictFind(emptied->dict, key->ptr) != NULL;
int exists_in_emptied = dictFind(emptied->dict[calculateKeySlot(key->ptr)], key->ptr) != NULL;
if (exists_in_emptied ||
(replaced_with && dictFind(replaced_with->dict, key->ptr)))
(replaced_with && dictFind(replaced_with->dict[calculateKeySlot(key->ptr)], key->ptr)))
{
list *clients = dictGetVal(de);
if (!clients) continue;
@ -442,7 +442,7 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
while((ln = listNext(&li))) {
watchedKey *wk = redis_member2struct(watchedKey, node, ln);
if (wk->expired) {
if (!replaced_with || !dictFind(replaced_with->dict, key->ptr)) {
if (!replaced_with || !dictFind(replaced_with->dict[calculateKeySlot(key->ptr)], key->ptr)) {
/* Expired key now deleted. No logical change. Clear the
* flag. Deleted keys are not flagged as expired. */
wk->expired = 0;

View File

@ -1035,7 +1035,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
if (o->encoding == OBJ_ENCODING_HT) {
d = o->ptr;
di = dictGetIterator(d);
asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictBuckets(d));
while((de = dictNext(di)) != NULL && samples < sample_size) {
ele = dictGetKey(de);
elesize += dictEntryMemUsage() + sdsZmallocSize(ele);
@ -1058,7 +1058,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
zskiplist *zsl = ((zset*)o->ptr)->zsl;
zskiplistNode *znode = zsl->header->level[0].forward;
asize = sizeof(*o)+sizeof(zset)+sizeof(zskiplist)+sizeof(dict)+
(sizeof(struct dictEntry*)*dictSlots(d))+
(sizeof(struct dictEntry*)*dictBuckets(d))+
zmalloc_size(zsl->header);
while(znode != NULL && samples < sample_size) {
elesize += sdsZmallocSize(znode->ele);
@ -1076,7 +1076,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
} else if (o->encoding == OBJ_ENCODING_HT) {
d = o->ptr;
di = dictGetIterator(d);
asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictBuckets(d));
while((de = dictNext(di)) != NULL && samples < sample_size) {
ele = dictGetKey(de);
ele2 = dictGetVal(de);
@ -1246,28 +1246,21 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
long long keyscount = dictSize(db->dict);
if (keyscount==0) continue;
unsigned long long keyscount = dbSize(db, DB_MAIN);
if (keyscount == 0) continue;
mh->total_keys += keyscount;
mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1));
mh->db[mh->num_dbs].dbid = j;
mem = dictMemUsage(db->dict) +
dictSize(db->dict) * sizeof(robj);
mem = dbMemUsage(db, DB_MAIN);
mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem;
mem = dictMemUsage(db->expires);
mem = dbMemUsage(db, DB_EXPIRES);
mh->db[mh->num_dbs].overhead_ht_expires = mem;
mem_total+=mem;
/* Account for the slot to keys map in cluster mode */
mem = dictSize(db->dict) * dictEntryMetadataSize(db->dict) +
dictMetadataSize(db->dict);
mh->db[mh->num_dbs].overhead_ht_slot_to_keys = mem;
mem_total+=mem;
mh->num_dbs++;
}
@ -1551,14 +1544,13 @@ NULL
return;
}
}
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) {
if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) {
addReplyNull(c);
return;
}
size_t usage = objectComputeSize(c->argv[2],dictGetVal(de),samples,c->db->id);
usage += sdsZmallocSize(dictGetKey(de));
usage += dictEntryMemUsage();
usage += dictMetadataSize(c->db->dict);
addReplyLongLong(c,usage);
} else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
struct redisMemOverhead *mh = getMemoryOverheadData();
@ -1599,16 +1591,13 @@ NULL
char dbname[32];
snprintf(dbname,sizeof(dbname),"db.%zd",mh->db[j].dbid);
addReplyBulkCString(c,dbname);
addReplyMapLen(c,3);
addReplyMapLen(c,2);
addReplyBulkCString(c,"overhead.hashtable.main");
addReplyLongLong(c,mh->db[j].overhead_ht_main);
addReplyBulkCString(c,"overhead.hashtable.expires");
addReplyLongLong(c,mh->db[j].overhead_ht_expires);
addReplyBulkCString(c,"overhead.hashtable.slot-to-keys");
addReplyLongLong(c,mh->db[j].overhead_ht_slot_to_keys);
}

View File

@ -1298,17 +1298,16 @@ werr:
}
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
dictIterator *di;
dictEntry *de;
ssize_t written = 0;
ssize_t res;
dbIterator *dbit = NULL;
static long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
redisDb *db = server.db + dbid;
dict *d = db->dict;
if (dictSize(d) == 0) return 0;
di = dictGetSafeIterator(d);
unsigned long long int db_size = dbSize(db, DB_MAIN);
if (db_size == 0) return 0;
/* Write the SELECT DB opcode */
if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr;
@ -1317,9 +1316,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
written += res;
/* Write the RESIZE DB opcode. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
unsigned long long expires_size = dbSize(db, DB_EXPIRES);
if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr;
@ -1327,8 +1324,23 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr;
written += res;
dbit = dbIteratorInit(db, DB_MAIN);
int last_slot = -1;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
while ((de = dbIteratorNext(dbit)) != NULL) {
int curr_slot = dbIteratorGetCurrentSlot(dbit);
/* Save slot info. */
if (server.cluster_enabled && curr_slot != last_slot) {
if ((res = rdbSaveType(rdb, RDB_OPCODE_SLOT_INFO)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, curr_slot)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, dictSize(db->dict[curr_slot]))) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, dictSize(db->expires[curr_slot]))) < 0) goto werr;
written += res;
last_slot = curr_slot;
}
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
@ -1356,12 +1368,11 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
}
}
}
dictReleaseIterator(di);
zfree(dbit);
return written;
werr:
dictReleaseIterator(di);
if (dbit) zfree(dbit);
return -1;
}
@ -3023,6 +3034,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx) {
uint64_t dbid = 0;
int type, rdbver;
uint64_t db_size = 0, expires_size = 0;
int should_expand_db = 1;
redisDb *db = rdb_loading_ctx->dbarray+0;
char buf[1024];
int error;
@ -3098,13 +3111,26 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
* selected data base, in order to avoid useless rehashing. */
uint64_t db_size, expires_size;
if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
dictExpand(db->dict,db_size);
dictExpand(db->expires,expires_size);
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_SLOT_INFO) {
uint64_t slot_id, slot_size, expires_slot_size;
if ((slot_id = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
if ((slot_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
if ((expires_slot_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
if (!server.cluster_enabled) {
continue; /* Ignore gracefully. */
}
/* In cluster mode we resize individual slot specific dictionaries based on the number of keys that slot holds. */
dictExpand(db->dict[slot_id], slot_size);
dictExpand(db->expires[slot_id], expires_slot_size);
should_expand_db = 0;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
@ -3234,6 +3260,20 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
continue;
}
/* If there is no slot info, it means that it's either not cluster mode or we are trying to load legacy RDB file.
* In this case we want to estimate number of keys per slot and resize accordingly. */
if (should_expand_db) {
if (dbExpand(db, db_size, DB_MAIN, 0) == C_ERR) {
serverLog(LL_WARNING, "OOM in dict expand of main dict");
return C_ERR;
}
if (dbExpand(db, expires_size, DB_EXPIRES, 0) == C_ERR) {
serverLog(LL_WARNING, "OOM in dict expand of expire dict");
return C_ERR;
}
should_expand_db = 0;
}
/* Read key */
if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
goto eoferr;

View File

@ -38,7 +38,7 @@
/* The current RDB version. When the format changes in a way that is no longer
* backward compatible this number gets incremented. */
#define RDB_VERSION 11
#define RDB_VERSION 12
/* Defines related to the dump file format. To store 32 bits lengths for short
* keys requires a lot of space, so we check the most significant 2 bits of
@ -103,6 +103,7 @@
#define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21))
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_SLOT_INFO 244 /* Individual slot info, such as slot id and size (cluster mode only). */
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
#define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */

View File

@ -276,6 +276,13 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
if ((expires_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
continue; /* Read type again. */
} else if (type == RDB_OPCODE_SLOT_INFO) {
uint64_t slot_id, slot_size;
if ((slot_id = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
if ((slot_size = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
continue; /* Read type again. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
* which is backward compatible. Implementations of RDB loading

View File

@ -400,25 +400,17 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) {
}
}
/* Returns the size of the DB dict entry metadata in bytes. In cluster mode, the
* metadata is used for constructing a doubly linked list of the dict entries
* belonging to the same cluster slot. See the Slot to Key API in cluster.c. */
size_t dbDictEntryMetadataSize(dict *d) {
UNUSED(d);
/* NOTICE: this also affects overhead_ht_slot_to_keys in getMemoryOverheadData.
* If we ever add non-cluster related data here, that code must be modified too. */
return server.cluster_enabled ? sizeof(clusterDictEntryMetadata) : 0;
/* Adds dictionary to the rehashing list in cluster mode, which allows us
* to quickly find rehash targets during incremental rehashing.
* In non-cluster mode, we don't need this list as there is only one dictionary per DB. */
void dictRehashingStarted(dict *d) {
if (!server.cluster_enabled || !server.activerehashing) return;
listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d);
}
/* Returns the size of the DB dict metadata in bytes. In cluster mode, we store
* a pointer to the db in the main db dict, used for updating the slot-to-key
* mapping when a dictEntry is reallocated. */
size_t dbDictMetadataSize(void) {
return server.cluster_enabled ? sizeof(clusterDictMetadata) : 0;
}
void dbDictAfterReplaceEntry(dict *d, dictEntry *de) {
if (server.cluster_enabled) slotToKeyReplaceEntry(d, de);
void dictRehashingStartedForExpires(dict *d) {
if (!server.cluster_enabled || !server.activerehashing) return;
listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d);
}
/* Generic hash table type where keys are Redis Objects, Values
@ -476,9 +468,7 @@ dictType dbDictType = {
dictSdsDestructor, /* key destructor */
dictObjectDestructor, /* val destructor */
dictExpandAllowed, /* allow to expand */
.dictEntryMetadataBytes = dbDictEntryMetadataSize,
.dictMetadataBytes = dbDictMetadataSize,
.afterReplaceEntry = dbDictAfterReplaceEntry
dictRehashingStarted,
};
/* Db->expires */
@ -489,7 +479,8 @@ dictType dbExpiresDictType = {
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
dictExpandAllowed /* allow to expand */
dictExpandAllowed, /* allow to expand */
dictRehashingStartedForExpires,
};
/* Command table. sds string -> command struct pointer. */
@ -600,19 +591,33 @@ dictType sdsHashDictType = {
int htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict);
size = dictBuckets(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}
/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
* we resize the hash table to save memory */
/* In cluster-enabled setup, this method traverses through all main/expires dictionaries (CLUSTER_SLOTS)
* and triggers a resize if the percentage of used buckets in the HT reaches HASHTABLE_MIN_FILL
* we resize the hash table to save memory.
*
* In non cluster-enabled setup, it resize main/expires dictionary based on the same condition described above. */
void tryResizeHashTables(int dbid) {
if (htNeedsResize(server.db[dbid].dict))
dictResize(server.db[dbid].dict);
if (htNeedsResize(server.db[dbid].expires))
dictResize(server.db[dbid].expires);
redisDb *db = &server.db[dbid];
int slot = 0;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
dbIterator *dbit = dbIteratorInitFromSlot(db, subdict, db->sub_dict[subdict].resize_cursor);
for (int i = 0; i < CRON_DBS_PER_CALL; i++) {
dict *d = dbIteratorNextDict(dbit);
slot = dbIteratorGetCurrentSlot(dbit);
if (!d) break;
if (htNeedsResize(d))
dictResize(d);
}
/* Save current iterator position in the resize_cursor. */
db->sub_dict[subdict].resize_cursor = slot;
zfree(dbit);
}
}
/* Our hash table implementation performs rehashing incrementally while
@ -623,15 +628,42 @@ void tryResizeHashTables(int dbid) {
* The function returns 1 if some rehashing was performed, otherwise 0
* is returned. */
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
/* Rehash main and expire dictionary . */
if (server.cluster_enabled) {
listNode *node, *nextNode;
monotime timer;
elapsedStart(&timer);
/* Our goal is to rehash as many slot specific dictionaries as we can before reaching predefined threshold,
* while removing those that already finished rehashing from the queue. */
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.db[dbid].sub_dict[subdict].rehashing));
while ((node = listFirst(server.db[dbid].sub_dict[subdict].rehashing))) {
if (dictIsRehashing((dict *) listNodeValue(node))) {
dictRehashMilliseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_MS);
if (elapsedMs(timer) >= INCREMENTAL_REHASHING_THRESHOLD_MS) {
return 1; /* Reached the time limit. */
}
} else { /* It is possible that rehashing has already completed for this dictionary, simply remove it from the queue. */
nextNode = listNextNode(node);
listDelNode(server.db[dbid].sub_dict[subdict].rehashing, node);
node = nextNode;
}
}
}
/* When cluster mode is disabled, only one dict is used for the entire DB and rehashing list isn't populated. */
} else {
/* Rehash main dict. */
dict *main_dict = server.db[dbid].dict[0];
if (main_dict) {
dictRehashMilliseconds(main_dict, INCREMENTAL_REHASHING_THRESHOLD_MS);
return 1; /* already used our millisecond for this loop... */
}
/* Rehash expires. */
dict *expires_dict = server.db[dbid].expires[0];
if (expires_dict) {
dictRehashMilliseconds(expires_dict, INCREMENTAL_REHASHING_THRESHOLD_MS);
return 1; /* already used our millisecond for this loop... */
}
}
return 0;
}
@ -1356,9 +1388,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
size = dbBuckets(&server.db[j], DB_MAIN);
used = dbSize(&server.db[j], DB_MAIN);
vkeys = dbSize(&server.db[j], DB_EXPIRES);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
}
@ -2564,6 +2596,15 @@ void makeThreadKillable(void) {
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
}
void initDbState(redisDb *db){
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db->sub_dict[subdict].rehashing = listCreate();
db->sub_dict[subdict].key_count = 0;
db->sub_dict[subdict].resize_cursor = 0;
db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL;
}
}
void initServer(void) {
int j;
@ -2640,8 +2681,9 @@ void initServer(void) {
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType);
server.db[j].expires = dictCreate(&dbExpiresDictType);
int slotCount = (server.cluster_enabled) ? CLUSTER_SLOTS : 1;
server.db[j].dict = dictCreateMultiple(&dbDictType, slotCount);
server.db[j].expires = dictCreateMultiple(&dbExpiresDictType,slotCount);
server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType);
server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType);
@ -2650,7 +2692,8 @@ void initServer(void) {
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
server.db[j].slots_to_keys = NULL; /* Set by clusterInit later on if necessary. */
server.db[j].dict_count = slotCount;
initDbState(&server.db[j]);
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
@ -4171,7 +4214,6 @@ int processCommand(client *c) {
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand())
handleClientsBlockedOnKeys();
}
return C_OK;
}
@ -6013,8 +6055,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
for (j = 0; j < server.dbnum; j++) {
long long keys, vkeys;
keys = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
keys = dbSize(&server.db[j], DB_MAIN);
vkeys = dbSize(&server.db[j], DB_EXPIRES);
if (keys || vkeys) {
info = sdscatprintf(info,
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
@ -6805,6 +6847,7 @@ int main(int argc, char **argv) {
char config_from_stdin = 0;
#ifdef REDIS_TEST
monotonicInit(); /* Required for dict tests, that are relying on monotime during dict rehashing. */
if (argc >= 3 && !strcasecmp(argv[1], "test")) {
int flags = 0;
for (j = 3; j < argc; j++) {

View File

@ -137,6 +137,7 @@ struct hdr_histogram;
#define CONFIG_BINDADDR_MAX 16
#define CONFIG_MIN_RESERVED_FDS 32
#define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}"
#define INCREMENTAL_REHASHING_THRESHOLD_MS 1
/* Bucket sizes for client eviction pools. Each bucket stores clients with
* memory usage of up to twice the size of the bucket below it. */
@ -959,15 +960,24 @@ typedef struct replBufBlock {
char buf[];
} replBufBlock;
/* Opaque type for the Slot to Key API. */
typedef struct clusterSlotToKeyMapping clusterSlotToKeyMapping;
typedef struct dbDictState {
list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */
int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries. */
unsigned long long key_count; /* Total number of keys in this DB. */
unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */
} dbDictState;
typedef enum dbKeyType {
DB_MAIN,
DB_EXPIRES
} dbKeyType;
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict **dict; /* The keyspace for this DB */
dict **expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for
* data, and should be unblocked if key is deleted (XREADEDGROUP).
@ -978,7 +988,8 @@ typedef struct redisDb {
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
int dict_count; /* Indicates total number of dictionaires owned by this DB, 1 dict per slot in cluster mode. */
dbDictState sub_dict[2]; /* Metadata for main and expires dictionaries */
} redisDb;
/* forward declaration for functions ctx */
@ -1418,7 +1429,6 @@ struct redisMemOverhead {
size_t dbid;
size_t overhead_ht_main;
size_t overhead_ht_expires;
size_t overhead_ht_slot_to_keys;
} *db;
};
@ -2412,6 +2422,19 @@ typedef struct {
unsigned char *lpi; /* listpack iterator */
} setTypeIterator;
typedef struct dbIterator dbIterator;
/* DB iterator specific functions */
dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType);
dbIterator *dbIteratorInitFromSlot(redisDb *db, dbKeyType keyType, int slot);
dict *dbIteratorNextDict(dbIterator *dbit);
int dbIteratorGetCurrentSlot(dbIterator *dbit);
dictEntry *dbIteratorNext(dbIterator *iter);
/* SCAN specific commands for easy cursor manipulation, shared between main code and modules. */
int getAndClearSlotIdFromCursor(unsigned long long *cursor);
void addSlotIdToCursor(int slot, unsigned long long *cursor);
/* Structure to hold hash iteration abstraction. Note that iteration over
* hashes involves both fields and values. Because it is possible that
* not both are required, store pointers in the iterator to avoid
@ -3089,6 +3112,18 @@ void dismissMemoryInChild(void);
#define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */
#define RESTART_SERVER_CONFIG_REWRITE (1<<1) /* CONFIG REWRITE before restart.*/
int restartServer(int flags, mstime_t delay);
unsigned long long int dbSize(redisDb *db, dbKeyType keyType);
int getKeySlot(sds key);
int calculateKeySlot(sds key);
unsigned long dbBuckets(redisDb *db, dbKeyType keyType);
size_t dbMemUsage(redisDb *db, dbKeyType keyType);
dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType);
unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long cursor, dictScanFunction *fn, int (dictScanValidFunction)(dict *d), void *privdata);
int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_expand);
unsigned long long cumulativeKeyCountRead(redisDb *db, int idx, dbKeyType keyType);
int getFairRandomSlot(redisDb *db, dbKeyType keyType);
int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType);
int findSlotByKeyIndex(redisDb *db, unsigned long target, dbKeyType keyType);
/* Set data type */
robj *setTypeCreate(sds value, size_t size_hint);
@ -3278,8 +3313,8 @@ void discardTempDb(redisDb *tempDb, void(callback)(dict*));
int selectDb(client *c, int id);
void signalModifiedKey(client *c, redisDb *db, robj *key);
void signalFlushedDb(int dbid, int async);
void scanGenericCommand(client *c, robj *o, unsigned long cursor);
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
void scanGenericCommand(client *c, robj *o, unsigned long long cursor);
int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor);
int dbAsyncDelete(redisDb *db, robj *key);
void emptyDbAsync(redisDb *db);
size_t lazyfreeGetPendingObjectsCount(void);

View File

@ -888,7 +888,7 @@ void hexistsCommand(client *c) {
void hscanCommand(client *c) {
robj *o;
unsigned long cursor;
unsigned long long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||

View File

@ -1671,7 +1671,7 @@ void sdiffstoreCommand(client *c) {
void sscanCommand(client *c) {
robj *set;
unsigned long cursor;
unsigned long long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||

View File

@ -3901,7 +3901,7 @@ void zrevrankCommand(client *c) {
void zscanCommand(client *c) {
robj *o;
unsigned long cursor;
unsigned long long cursor;
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||

View File

@ -602,16 +602,24 @@ proc stop_bg_complex_data {handle} {
# Write num keys with the given key prefix and value size (in bytes). If idx is
# given, it's the index (AKA level) used with the srv procedure and it specifies
# to which Redis instance to write the keys.
proc populate {num {prefix key:} {size 3} {idx 0} {prints false}} {
proc populate {num {prefix key:} {size 3} {idx 0} {prints false} {expires 0}} {
r $idx deferred 1
if {$num > 16} {set pipeline 16} else {set pipeline $num}
set val [string repeat A $size]
for {set j 0} {$j < $pipeline} {incr j} {
r $idx set $prefix$j $val
if {$expires > 0} {
r $idx set $prefix$j $val ex $expires
} else {
r $idx set $prefix$j $val
}
if {$prints} {puts $j}
}
for {} {$j < $num} {incr j} {
r $idx set $prefix$j $val
if {$expires > 0} {
r $idx set $prefix$j $val ex $expires
} else {
r $idx set $prefix$j $val
}
r $idx read
if {$prints} {puts $j}
}

View File

@ -30,6 +30,13 @@ start_cluster 1 0 {tags {external:skip cluster}} {
redis.call('set', 'foo', 'bar'); redis.call('set', 'bar', 'foo')
} 0
# Retrieve data from different slot to verify data has been stored in the correct dictionary in cluster-enabled setup
# during cross-slot operation from the above lua script.
assert_equal "bar" [r 0 get foo]
assert_equal "foo" [r 0 get bar]
r 0 del foo
r 0 del bar
# Functions with allow-cross-slot-keys flag are allowed
r 0 function load REPLACE {#!lua name=crossslot
local function test_cross_slot(keys, args)
@ -40,6 +47,11 @@ start_cluster 1 0 {tags {external:skip cluster}} {
redis.register_function{function_name='test_cross_slot', callback=test_cross_slot, flags={ 'allow-cross-slot-keys' }}}
r FCALL test_cross_slot 0
# Retrieve data from different slot to verify data has been stored in the correct dictionary in cluster-enabled setup
# during cross-slot operation from the above lua function.
assert_equal "bar" [r 0 get foo]
assert_equal "foo" [r 0 get bar]
}
test {Cross slot commands are also blocked if they disagree with pre-declared keys} {

View File

@ -192,8 +192,8 @@ start_server {tags {"expire"}} {
# two seconds.
wait_for_condition 20 100 {
[r dbsize] eq 0
} fail {
"Keys did not actively expire."
} else {
fail "Keys did not actively expire."
}
}
@ -378,8 +378,8 @@ start_server {tags {"expire"}} {
{set foo15 bar}
{pexpireat foo15 *}
{set foo16 bar}
{restore foo17 * {*} ABSTTL}
{restore foo18 * {*} absttl}
{restore foo17 * * ABSTTL}
{restore foo18 * * absttl}
}
# Remember the absolute TTLs of all the keys
@ -507,8 +507,8 @@ start_server {tags {"expire"}} {
{pexpireat foo4 *}
{pexpireat foo4 *}
{set foo5 bar}
{restore foo6 * {*} ABSTTL}
{restore foo7 * {*} absttl}
{restore foo6 * * ABSTTL}
{restore foo7 * * absttl}
}
close_replication_stream $repl
} {} {needs:repl}
@ -833,3 +833,75 @@ start_server {tags {"expire"}} {
assert_equal [r debug set-active-expire 1] {OK}
} {} {needs:debug}
}
start_cluster 1 0 {tags {"expire external:skip cluster"}} {
test "expire scan should skip dictionaries with lot's of empty buckets" {
# Collect two slots to help determine the expiry scan logic is able
# to go past certain slots which aren't valid for scanning at the given point of time.
# And the next non empyt slot after that still gets scanned and expiration happens.
# hashslot(alice) is 749
r psetex alice 500 val
# hashslot(foo) is 12182
# fill data across different slots with expiration
for {set j 1} {$j <= 100} {incr j} {
r psetex "{foo}$j" 500 a
}
# hashslot(key) is 12539
r psetex key 500 val
assert_equal 102 [r dbsize]
# disable resizing
r config set rdb-key-save-delay 10000000
r bgsave
# delete data to have lot's (99%) of empty buckets (slot 12182 should be skipped)
for {set j 1} {$j <= 99} {incr j} {
r del "{foo}$j"
}
# Verify {foo}5 still exists and remaining got cleaned up
wait_for_condition 20 100 {
[r dbsize] eq 1
} else {
if {[r dbsize] eq 0} {
fail "scan didn't handle slot skipping logic."
} else {
fail "scan didn't process all valid slots."
}
}
# Enable resizing
r config set rdb-key-save-delay 0
catch {exec kill -9 [get_child_pid 0]}
wait_for_condition 1000 10 {
[s rdb_bgsave_in_progress] eq 0
} else {
fail "bgsave did not stop in time."
}
# Verify dict is under rehashing
set htstats [r debug HTSTATS 0]
assert_match {*rehashing target*} $htstats
# put some data into slot 12182 and trigger the resize
r psetex "{foo}0" 500 a
# Verify dict rehashing has completed
set htstats [r debug HTSTATS 0]
wait_for_condition 20 100 {
![string match {*rehashing target*} $htstats]
} else {
fail "rehashing didn't complete"
}
# Verify all keys have expired
wait_for_condition 20 100 {
[r dbsize] eq 0
} else {
fail "Keys did not actively expire."
}
}
}

View File

@ -37,9 +37,9 @@ start_server {tags {"memefficiency external:skip"}} {
}
run_solo {defrag} {
start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} {
proc test_active_defrag {type} {
if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} {
test "Active defrag" {
test "Active defrag main dictionary: $type" {
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
@ -50,7 +50,11 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
r config set maxmemory-policy allkeys-lru
populate 700000 asdf1 150
populate 100 asdf1 150 0 false 1000
populate 170000 asdf2 300
populate 100 asdf2 300 0 false 1000
assert {[scan [regexp -inline {expires\=([\d]*)} [r info keyspace]] expires=%d] > 0}
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
if {$::verbose} {
@ -115,7 +119,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
r save ;# saving an rdb iterates over all the data / pointers
# if defrag is supported, test AOF loading too
if {[r config get activedefrag] eq "activedefrag yes"} {
if {[r config get activedefrag] eq "activedefrag yes" && $type eq "standalone"} {
test "Active defrag - AOF loading" {
# reset stats and load the AOF file
r config resetstat
@ -159,8 +163,8 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
}
r config set appendonly no
r config set key-load-delay 0
test "Active defrag eval scripts" {
test "Active defrag eval scripts: $type" {
r flushdb
r script flush sync
r config resetstat
@ -171,7 +175,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
r config set active-defrag-cycle-max 75
r config set active-defrag-ignore-bytes 1500kb
r config set maxmemory 0
set n 50000
# Populate memory with interleaving script-key pattern of same size
@ -192,9 +196,9 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
puts "rss [s allocator_active]"
puts "frag [s allocator_frag_ratio]"
puts "frag_bytes [s allocator_frag_bytes]"
}
}
assert_lessthan [s allocator_frag_ratio] 1.05
# Delete all the keys to create fragmentation
for {set j 0} {$j < $n} {incr j} { $rd del k$j }
for {set j 0} {$j < $n} {incr j} { $rd read } ; # Discard del replies
@ -205,7 +209,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
puts "rss [s allocator_active]"
puts "frag [s allocator_frag_ratio]"
puts "frag_bytes [s allocator_frag_bytes]"
}
}
assert_morethan [s allocator_frag_ratio] 1.4
catch {r config set activedefrag yes} e
@ -235,14 +239,14 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
puts "rss [s allocator_active]"
puts "frag [s allocator_frag_ratio]"
puts "frag_bytes [s allocator_frag_bytes]"
}
}
assert_lessthan_equal [s allocator_frag_ratio] 1.05
}
}
# Flush all script to make sure we don't crash after defragging them
r script flush sync
} {OK}
test "Active defrag big keys" {
test "Active defrag big keys: $type" {
r flushdb
r config resetstat
r config set hz 100
@ -277,6 +281,14 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
$rd read ; # Discard replies
}
# create some small items (effective in cluster-enabled)
r set "{bighash}smallitem" val
r set "{biglist}smallitem" val
r set "{bigzset}smallitem" val
r set "{bigset}smallitem" val
r set "{bigstream}smallitem" val
set expected_frag 1.7
if {$::accurate} {
# scale the hash to 1m fields in order to have a measurable the latency
@ -297,7 +309,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
for {set j 0} {$j < 500000} {incr j} {
$rd read ; # Discard replies
}
assert_equal [r dbsize] 500010
assert_equal [r dbsize] 500015
# create some fragmentation
for {set j 0} {$j < 500000} {incr j 2} {
@ -306,7 +318,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
for {set j 0} {$j < 500000} {incr j 2} {
$rd read ; # Discard replies
}
assert_equal [r dbsize] 250010
assert_equal [r dbsize] 250015
# start defrag
after 120 ;# serverCron only updates the info once in 100ms
@ -371,7 +383,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
r save ;# saving an rdb iterates over all the data / pointers
} {OK}
test "Active defrag big list" {
test "Active defrag big list: $type" {
r flushdb
r config resetstat
r config set hz 100
@ -473,7 +485,7 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
r del biglist1 ;# coverage for quicklistBookmarksClear
} {1}
test "Active defrag edge case" {
test "Active defrag edge case: $type" {
# there was an edge case in defrag where all the slabs of a certain bin are exact the same
# % utilization, with the exception of the current slab from which new allocations are made
# if the current slab is lower in utilization the defragger would have ended up in stagnation,
@ -576,5 +588,13 @@ start_server {tags {"defrag external:skip"} overrides {appendonly yes auto-aof-r
}
}
}
}
}
start_cluster 1 0 {tags {"defrag external:skip cluster"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} {
test_active_defrag "cluster"
}
start_server {tags {"defrag external:skip standalone"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} {
test_active_defrag "standalone"
}
} ;# run_solo

View File

@ -1,5 +1,5 @@
start_server {tags {"scan network"}} {
test "SCAN basic" {
proc test_scan {type} {
test "{$type} SCAN basic" {
r flushdb
populate 1000
@ -17,7 +17,7 @@ start_server {tags {"scan network"}} {
assert_equal 1000 [llength $keys]
}
test "SCAN COUNT" {
test "{$type} SCAN COUNT" {
r flushdb
populate 1000
@ -35,7 +35,7 @@ start_server {tags {"scan network"}} {
assert_equal 1000 [llength $keys]
}
test "SCAN MATCH" {
test "{$type} SCAN MATCH" {
r flushdb
populate 1000
@ -53,7 +53,7 @@ start_server {tags {"scan network"}} {
assert_equal 100 [llength $keys]
}
test "SCAN TYPE" {
test "{$type} SCAN TYPE" {
r flushdb
# populate only creates strings
populate 1000
@ -98,7 +98,7 @@ start_server {tags {"scan network"}} {
assert_equal 1000 [llength $keys]
}
test "SCAN unknown type" {
test "{$type} SCAN unknown type" {
r flushdb
# make sure that passive expiration is triggered by the scan
r debug set-active-expire 0
@ -131,7 +131,7 @@ start_server {tags {"scan network"}} {
r debug set-active-expire 1
} {OK} {needs:debug}
test "SCAN with expired keys" {
test "{$type} SCAN with expired keys" {
r flushdb
# make sure that passive expiration is triggered by the scan
r debug set-active-expire 0
@ -164,7 +164,7 @@ start_server {tags {"scan network"}} {
r debug set-active-expire 1
} {OK} {needs:debug}
test "SCAN with expired keys with TYPE filter" {
test "{$type} SCAN with expired keys with TYPE filter" {
r flushdb
# make sure that passive expiration is triggered by the scan
r debug set-active-expire 0
@ -201,7 +201,7 @@ start_server {tags {"scan network"}} {
} {OK} {needs:debug}
foreach enc {intset listpack hashtable} {
test "SSCAN with encoding $enc" {
test "{$type} SSCAN with encoding $enc" {
# Create the Set
r del set
if {$enc eq {intset}} {
@ -236,7 +236,7 @@ start_server {tags {"scan network"}} {
}
foreach enc {listpack hashtable} {
test "HSCAN with encoding $enc" {
test "{$type} HSCAN with encoding $enc" {
# Create the Hash
r del hash
if {$enc eq {listpack}} {
@ -276,7 +276,7 @@ start_server {tags {"scan network"}} {
}
foreach enc {listpack skiplist} {
test "ZSCAN with encoding $enc" {
test "{$type} ZSCAN with encoding $enc" {
# Create the Sorted Set
r del zset
if {$enc eq {listpack}} {
@ -315,7 +315,7 @@ start_server {tags {"scan network"}} {
}
}
test "SCAN guarantees check under write load" {
test "{$type} SCAN guarantees check under write load" {
r flushdb
populate 100
@ -344,7 +344,7 @@ start_server {tags {"scan network"}} {
assert_equal 100 [llength $keys2]
}
test "SSCAN with integer encoded object (issue #1345)" {
test "{$type} SSCAN with integer encoded object (issue #1345)" {
set objects {1 a}
r del set
r sadd set {*}$objects
@ -354,28 +354,28 @@ start_server {tags {"scan network"}} {
assert_equal [lsort -unique [lindex $res 1]] {1}
}
test "SSCAN with PATTERN" {
test "{$type} SSCAN with PATTERN" {
r del mykey
r sadd mykey foo fab fiz foobar 1 2 3 4
set res [r sscan mykey 0 MATCH foo* COUNT 10000]
lsort -unique [lindex $res 1]
} {foo foobar}
test "HSCAN with PATTERN" {
test "{$type} HSCAN with PATTERN" {
r del mykey
r hmset mykey foo 1 fab 2 fiz 3 foobar 10 1 a 2 b 3 c 4 d
set res [r hscan mykey 0 MATCH foo* COUNT 10000]
lsort -unique [lindex $res 1]
} {1 10 foo foobar}
test "ZSCAN with PATTERN" {
test "{$type} ZSCAN with PATTERN" {
r del mykey
r zadd mykey 1 foo 2 fab 3 fiz 10 foobar
set res [r zscan mykey 0 MATCH foo* COUNT 10000]
lsort -unique [lindex $res 1]
}
test "ZSCAN scores: regression test for issue #2175" {
test "{$type} ZSCAN scores: regression test for issue #2175" {
r del mykey
for {set j 0} {$j < 500} {incr j} {
r zadd mykey 9.8813129168249309e-323 $j
@ -385,7 +385,7 @@ start_server {tags {"scan network"}} {
assert {$first_score != 0}
}
test "SCAN regression test for issue #4906" {
test "{$type} SCAN regression test for issue #4906" {
for {set k 0} {$k < 100} {incr k} {
r del set
r sadd set x; # Make sure it's not intset encoded
@ -431,3 +431,11 @@ start_server {tags {"scan network"}} {
}
}
}
start_server {tags {"scan network standalone"}} {
test_scan "standalone"
}
start_cluster 1 0 {tags {"external:skip cluster scan"}} {
test_scan "cluster"
}