Make the sampling logic in eviction clearer (#12781)
Additional optimizations for the eviction logic in #11695: To make the eviction logic clearer and decouple the number of sampled keys from the running mode (cluster or standalone). * When sampling in each database, we only care about the number of keys in the current database (not the dicts we sampled from). * If there are a insufficient number of keys in the current database (e.g. 10 times the value of `maxmemory_samples`), we can break out sooner (to avoid looping on a sparse database). * We'll never try to sample the db dicts more times than the number of non-empty dicts in the db (max 1 in non-cluster mode). And it also ensures that each database has a sufficient amount of sampled keys, so even if unsharded-cluster supports multiple databases, there won't be any issues. other changes: 1. keep track of the number of non-empty dicts in each database. 2. move key_count tracking into cumulativeKeyCountAdd rather than all it's callers --------- Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
991aff1c0f
commit
9ee1cc33a3
32
src/db.c
32
src/db.c
@ -284,7 +284,6 @@ static void dbAddInternal(redisDb *db, robj *key, robj *val, int update_if_exist
|
||||
dictSetKey(d, de, sdsdup(key->ptr));
|
||||
initObjectLRUOrLFU(val);
|
||||
dictSetVal(d, de, val);
|
||||
db->sub_dict[DB_MAIN].key_count++;
|
||||
cumulativeKeyCountAdd(db, slot, 1, DB_MAIN);
|
||||
signalKeyAsReady(db, key, val->type);
|
||||
notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
|
||||
@ -335,7 +334,6 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
|
||||
if (de == NULL) return 0;
|
||||
initObjectLRUOrLFU(val);
|
||||
dictSetVal(d, de, val);
|
||||
db->sub_dict[DB_MAIN].key_count++;
|
||||
cumulativeKeyCountAdd(db, slot, 1, DB_MAIN);
|
||||
return 1;
|
||||
}
|
||||
@ -468,6 +466,14 @@ robj *dbRandomKey(redisDb *db) {
|
||||
* You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree
|
||||
* Time complexity is O(log(CLUSTER_SLOTS)). */
|
||||
void cumulativeKeyCountAdd(redisDb *db, int slot, long delta, dbKeyType keyType) {
|
||||
db->sub_dict[keyType].key_count += delta;
|
||||
dict *d = (keyType == DB_MAIN ? db->dict[slot] : db->expires[slot]);
|
||||
if (dictSize(d) == 1)
|
||||
db->sub_dict[keyType].non_empty_slots++;
|
||||
if (dictSize(d) == 0)
|
||||
db->sub_dict[keyType].non_empty_slots--;
|
||||
|
||||
/* BIT does not need to be calculated when the cluster is turned off. */
|
||||
if (!server.cluster_enabled) return;
|
||||
int idx = slot + 1; /* Unlike slots, BIT is 1-based, so we need to add 1. */
|
||||
while (idx <= CLUSTER_SLOTS) {
|
||||
@ -570,16 +576,14 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
|
||||
dictSetVal(d, de, NULL);
|
||||
}
|
||||
/* Deleting an entry from the expires dict will not free the sds of
|
||||
* the key, because it is shared with the main dictionary. */
|
||||
* the key, because it is shared with the main dictionary. */
|
||||
if (dictSize(db->expires[slot]) > 0) {
|
||||
if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) {
|
||||
cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES);
|
||||
db->sub_dict[DB_EXPIRES].key_count--;
|
||||
}
|
||||
}
|
||||
}
|
||||
dictTwoPhaseUnlinkFree(d,de,plink,table);
|
||||
cumulativeKeyCountAdd(db, slot, -1, DB_MAIN);
|
||||
db->sub_dict[DB_MAIN].key_count--;
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
@ -674,6 +678,7 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
|
||||
dbarray[j].avg_ttl = 0;
|
||||
dbarray[j].expires_cursor = 0;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
dbarray[j].sub_dict[subdict].non_empty_slots = 0;
|
||||
dbarray[j].sub_dict[subdict].key_count = 0;
|
||||
dbarray[j].sub_dict[subdict].resize_cursor = -1;
|
||||
if (server.cluster_enabled) {
|
||||
@ -1414,6 +1419,10 @@ unsigned long long int dbSize(redisDb *db, dbKeyType keyType) {
|
||||
return db->sub_dict[keyType].key_count;
|
||||
}
|
||||
|
||||
int dbNonEmptySlots(redisDb *db, dbKeyType keyType) {
|
||||
return db->sub_dict[keyType].non_empty_slots;
|
||||
}
|
||||
|
||||
/* This method provides the cumulative sum of all the dictionary buckets
|
||||
* across dictionaries in a database. */
|
||||
unsigned long dbBuckets(redisDb *db, dbKeyType keyType) {
|
||||
@ -1880,6 +1889,7 @@ int dbSwapDatabases(int id1, int id2) {
|
||||
db1->dict_count = db2->dict_count;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count;
|
||||
db1->sub_dict[subdict].non_empty_slots = db2->sub_dict[subdict].non_empty_slots;
|
||||
db1->sub_dict[subdict].resize_cursor = db2->sub_dict[subdict].resize_cursor;
|
||||
db1->sub_dict[subdict].slot_size_index = db2->sub_dict[subdict].slot_size_index;
|
||||
}
|
||||
@ -1891,6 +1901,7 @@ int dbSwapDatabases(int id1, int id2) {
|
||||
db2->dict_count = aux.dict_count;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
|
||||
db2->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots;
|
||||
db2->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor;
|
||||
db2->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index;
|
||||
}
|
||||
@ -1934,6 +1945,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
|
||||
activedb->dict_count = newdb->dict_count;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count;
|
||||
activedb->sub_dict[subdict].non_empty_slots = newdb->sub_dict[subdict].non_empty_slots;
|
||||
activedb->sub_dict[subdict].resize_cursor = newdb->sub_dict[subdict].resize_cursor;
|
||||
activedb->sub_dict[subdict].slot_size_index = newdb->sub_dict[subdict].slot_size_index;
|
||||
}
|
||||
@ -1945,6 +1957,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
|
||||
newdb->dict_count = aux.dict_count;
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
|
||||
newdb->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots;
|
||||
newdb->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor;
|
||||
newdb->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index;
|
||||
}
|
||||
@ -2000,9 +2013,9 @@ void swapdbCommand(client *c) {
|
||||
*----------------------------------------------------------------------------*/
|
||||
|
||||
int removeExpire(redisDb *db, robj *key) {
|
||||
if (dictDelete(db->expires[(getKeySlot(key->ptr))],key->ptr) == DICT_OK) {
|
||||
db->sub_dict[DB_EXPIRES].key_count--;
|
||||
cumulativeKeyCountAdd(db, getKeySlot(key->ptr), -1, DB_EXPIRES);
|
||||
int slot = getKeySlot(key->ptr);
|
||||
if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) {
|
||||
cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES);
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
@ -2025,7 +2038,6 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) {
|
||||
dictSetSignedIntegerVal(existing, when);
|
||||
} else {
|
||||
dictSetSignedIntegerVal(de, when);
|
||||
db->sub_dict[DB_EXPIRES].key_count++;
|
||||
cumulativeKeyCountAdd(db, slot, 1, DB_EXPIRES);
|
||||
}
|
||||
|
||||
|
49
src/evict.c
49
src/evict.c
@ -143,8 +143,7 @@ void evictionPoolAlloc(void) {
|
||||
* We insert keys on place in ascending order, so keys with the smaller
|
||||
* idle time are on the left, and keys with the higher idle time on the
|
||||
* right. */
|
||||
|
||||
void evictionPoolPopulate(int dbid, int slot, dict *sampledict, redisDb *db, struct evictionPoolEntry *pool) {
|
||||
int evictionPoolPopulate(int dbid, int slot, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
|
||||
int j, k, count;
|
||||
dictEntry *samples[server.maxmemory_samples];
|
||||
|
||||
@ -162,7 +161,7 @@ void evictionPoolPopulate(int dbid, int slot, dict *sampledict, redisDb *db, str
|
||||
* dictionary (but the expires one) we need to lookup the key
|
||||
* again in the key dictionary to obtain the value object. */
|
||||
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
|
||||
if (!(server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)) de = dictFind(db->dict[slot], key);
|
||||
if (sampledict != keydict) de = dictFind(keydict, key);
|
||||
o = dictGetVal(de);
|
||||
}
|
||||
|
||||
@ -240,6 +239,8 @@ void evictionPoolPopulate(int dbid, int slot, dict *sampledict, redisDb *db, str
|
||||
pool[k].dbid = dbid;
|
||||
pool[k].slot = slot;
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------------------
|
||||
@ -586,33 +587,37 @@ int performEvictions(void) {
|
||||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
|
||||
{
|
||||
struct evictionPoolEntry *pool = EvictionPoolLRU;
|
||||
dbKeyType keyType = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS ?
|
||||
DB_MAIN : DB_EXPIRES);
|
||||
|
||||
while (bestkey == NULL) {
|
||||
unsigned long total_keys = 0, keys;
|
||||
unsigned long total_keys = 0;
|
||||
|
||||
/* We don't want to make local-db choices when expiring keys,
|
||||
* so to start populate the eviction pool sampling keys from
|
||||
* every DB. */
|
||||
for (i = 0; i < server.dbnum; i++) {
|
||||
db = server.db+i;
|
||||
do {
|
||||
int slot = 0;
|
||||
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
|
||||
slot = getFairRandomSlot(db, DB_MAIN);
|
||||
dict = db->dict[slot];
|
||||
} else {
|
||||
slot = getFairRandomSlot(db, DB_EXPIRES);
|
||||
dict = db->expires[slot];
|
||||
}
|
||||
if ((keys = dictSize(dict)) != 0) {
|
||||
evictionPoolPopulate(i, slot, dict, db, pool);
|
||||
total_keys += keys;
|
||||
}
|
||||
/* Since keys are distributed across smaller slot-specific dictionaries in cluster mode, we may need to
|
||||
* visit more than one dictionary in order to populate required number of samples into eviction pool. */
|
||||
} while (server.cluster_enabled && keys != 0 && server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS &&
|
||||
total_keys < (unsigned long) server.maxmemory_samples
|
||||
);
|
||||
unsigned long sampled_keys = 0;
|
||||
unsigned long current_db_keys = dbSize(db, keyType);
|
||||
if (current_db_keys == 0) continue;
|
||||
|
||||
total_keys += current_db_keys;
|
||||
int l = dbNonEmptySlots(db, keyType);
|
||||
/* Do not exceed the number of non-empty slots when looping. */
|
||||
while (l--) {
|
||||
int slot = getFairRandomSlot(db, keyType);
|
||||
dict = (keyType == DB_MAIN ? db->dict[slot] : db->expires[slot]);
|
||||
sampled_keys += evictionPoolPopulate(i, slot, dict, db->dict[slot], pool);
|
||||
/* We have sampled enough keys in the current db, exit the loop. */
|
||||
if (sampled_keys >= (unsigned long) server.maxmemory_samples)
|
||||
break;
|
||||
/* If there are not a lot of keys in the current db, dict/s may be very
|
||||
* sparsely populated, exit the loop without meeting the sampling
|
||||
* requirement. */
|
||||
if (current_db_keys < (unsigned long) server.maxmemory_samples*10)
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!total_keys) break; /* No keys to evict. */
|
||||
|
||||
|
@ -2654,6 +2654,7 @@ void makeThreadKillable(void) {
|
||||
void initDbState(redisDb *db){
|
||||
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
|
||||
db->sub_dict[subdict].rehashing = listCreate();
|
||||
db->sub_dict[subdict].non_empty_slots = 0;
|
||||
db->sub_dict[subdict].key_count = 0;
|
||||
db->sub_dict[subdict].resize_cursor = -1;
|
||||
db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL;
|
||||
|
@ -972,6 +972,7 @@ typedef struct replBufBlock {
|
||||
typedef struct dbDictState {
|
||||
list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */
|
||||
int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used for cluster-enabled). */
|
||||
int non_empty_slots; /* The number of non-empty slots. */
|
||||
unsigned long long key_count; /* Total number of keys in this DB. */
|
||||
unsigned long long bucket_count; /* Total number of buckets in this DB across dictionaries (only used for cluster-enabled). */
|
||||
unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */
|
||||
@ -3128,6 +3129,7 @@ void dismissMemoryInChild(void);
|
||||
#define RESTART_SERVER_CONFIG_REWRITE (1<<1) /* CONFIG REWRITE before restart.*/
|
||||
int restartServer(int flags, mstime_t delay);
|
||||
unsigned long long int dbSize(redisDb *db, dbKeyType keyType);
|
||||
int dbNonEmptySlots(redisDb *db, dbKeyType keyType);
|
||||
int getKeySlot(sds key);
|
||||
int calculateKeySlot(sds key);
|
||||
unsigned long dbBuckets(redisDb *db, dbKeyType keyType);
|
||||
|
Loading…
x
Reference in New Issue
Block a user