diff --git a/src/db.c b/src/db.c index 3d0a5ebca..1de2f2601 100644 --- a/src/db.c +++ b/src/db.c @@ -284,7 +284,6 @@ static void dbAddInternal(redisDb *db, robj *key, robj *val, int update_if_exist dictSetKey(d, de, sdsdup(key->ptr)); initObjectLRUOrLFU(val); dictSetVal(d, de, val); - db->sub_dict[DB_MAIN].key_count++; cumulativeKeyCountAdd(db, slot, 1, DB_MAIN); signalKeyAsReady(db, key, val->type); notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id); @@ -335,7 +334,6 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) { if (de == NULL) return 0; initObjectLRUOrLFU(val); dictSetVal(d, de, val); - db->sub_dict[DB_MAIN].key_count++; cumulativeKeyCountAdd(db, slot, 1, DB_MAIN); return 1; } @@ -468,6 +466,14 @@ robj *dbRandomKey(redisDb *db) { * You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree * Time complexity is O(log(CLUSTER_SLOTS)). */ void cumulativeKeyCountAdd(redisDb *db, int slot, long delta, dbKeyType keyType) { + db->sub_dict[keyType].key_count += delta; + dict *d = (keyType == DB_MAIN ? db->dict[slot] : db->expires[slot]); + if (dictSize(d) == 1) + db->sub_dict[keyType].non_empty_slots++; + if (dictSize(d) == 0) + db->sub_dict[keyType].non_empty_slots--; + + /* BIT does not need to be calculated when the cluster is turned off. */ if (!server.cluster_enabled) return; int idx = slot + 1; /* Unlike slots, BIT is 1-based, so we need to add 1. */ while (idx <= CLUSTER_SLOTS) { @@ -570,16 +576,14 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { dictSetVal(d, de, NULL); } /* Deleting an entry from the expires dict will not free the sds of - * the key, because it is shared with the main dictionary. */ + * the key, because it is shared with the main dictionary. */ if (dictSize(db->expires[slot]) > 0) { if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) { cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES); - db->sub_dict[DB_EXPIRES].key_count--; } - } + } dictTwoPhaseUnlinkFree(d,de,plink,table); cumulativeKeyCountAdd(db, slot, -1, DB_MAIN); - db->sub_dict[DB_MAIN].key_count--; return 1; } else { return 0; @@ -674,6 +678,7 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, dbarray[j].avg_ttl = 0; dbarray[j].expires_cursor = 0; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { + dbarray[j].sub_dict[subdict].non_empty_slots = 0; dbarray[j].sub_dict[subdict].key_count = 0; dbarray[j].sub_dict[subdict].resize_cursor = -1; if (server.cluster_enabled) { @@ -1414,6 +1419,10 @@ unsigned long long int dbSize(redisDb *db, dbKeyType keyType) { return db->sub_dict[keyType].key_count; } +int dbNonEmptySlots(redisDb *db, dbKeyType keyType) { + return db->sub_dict[keyType].non_empty_slots; +} + /* This method provides the cumulative sum of all the dictionary buckets * across dictionaries in a database. */ unsigned long dbBuckets(redisDb *db, dbKeyType keyType) { @@ -1880,6 +1889,7 @@ int dbSwapDatabases(int id1, int id2) { db1->dict_count = db2->dict_count; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count; + db1->sub_dict[subdict].non_empty_slots = db2->sub_dict[subdict].non_empty_slots; db1->sub_dict[subdict].resize_cursor = db2->sub_dict[subdict].resize_cursor; db1->sub_dict[subdict].slot_size_index = db2->sub_dict[subdict].slot_size_index; } @@ -1891,6 +1901,7 @@ int dbSwapDatabases(int id1, int id2) { db2->dict_count = aux.dict_count; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count; + db2->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots; db2->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor; db2->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index; } @@ -1934,6 +1945,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) { activedb->dict_count = newdb->dict_count; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count; + activedb->sub_dict[subdict].non_empty_slots = newdb->sub_dict[subdict].non_empty_slots; activedb->sub_dict[subdict].resize_cursor = newdb->sub_dict[subdict].resize_cursor; activedb->sub_dict[subdict].slot_size_index = newdb->sub_dict[subdict].slot_size_index; } @@ -1945,6 +1957,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) { newdb->dict_count = aux.dict_count; for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count; + newdb->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots; newdb->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor; newdb->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index; } @@ -2000,9 +2013,9 @@ void swapdbCommand(client *c) { *----------------------------------------------------------------------------*/ int removeExpire(redisDb *db, robj *key) { - if (dictDelete(db->expires[(getKeySlot(key->ptr))],key->ptr) == DICT_OK) { - db->sub_dict[DB_EXPIRES].key_count--; - cumulativeKeyCountAdd(db, getKeySlot(key->ptr), -1, DB_EXPIRES); + int slot = getKeySlot(key->ptr); + if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) { + cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES); return 1; } else { return 0; @@ -2025,7 +2038,6 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { dictSetSignedIntegerVal(existing, when); } else { dictSetSignedIntegerVal(de, when); - db->sub_dict[DB_EXPIRES].key_count++; cumulativeKeyCountAdd(db, slot, 1, DB_EXPIRES); } diff --git a/src/evict.c b/src/evict.c index a9d93c5b7..809de3661 100644 --- a/src/evict.c +++ b/src/evict.c @@ -143,8 +143,7 @@ void evictionPoolAlloc(void) { * We insert keys on place in ascending order, so keys with the smaller * idle time are on the left, and keys with the higher idle time on the * right. */ - -void evictionPoolPopulate(int dbid, int slot, dict *sampledict, redisDb *db, struct evictionPoolEntry *pool) { +int evictionPoolPopulate(int dbid, int slot, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { int j, k, count; dictEntry *samples[server.maxmemory_samples]; @@ -162,7 +161,7 @@ void evictionPoolPopulate(int dbid, int slot, dict *sampledict, redisDb *db, str * dictionary (but the expires one) we need to lookup the key * again in the key dictionary to obtain the value object. */ if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) { - if (!(server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)) de = dictFind(db->dict[slot], key); + if (sampledict != keydict) de = dictFind(keydict, key); o = dictGetVal(de); } @@ -240,6 +239,8 @@ void evictionPoolPopulate(int dbid, int slot, dict *sampledict, redisDb *db, str pool[k].dbid = dbid; pool[k].slot = slot; } + + return count; } /* ---------------------------------------------------------------------------- @@ -586,33 +587,37 @@ int performEvictions(void) { server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { struct evictionPoolEntry *pool = EvictionPoolLRU; + dbKeyType keyType = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS ? + DB_MAIN : DB_EXPIRES); while (bestkey == NULL) { - unsigned long total_keys = 0, keys; + unsigned long total_keys = 0; /* We don't want to make local-db choices when expiring keys, * so to start populate the eviction pool sampling keys from * every DB. */ for (i = 0; i < server.dbnum; i++) { db = server.db+i; - do { - int slot = 0; - if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { - slot = getFairRandomSlot(db, DB_MAIN); - dict = db->dict[slot]; - } else { - slot = getFairRandomSlot(db, DB_EXPIRES); - dict = db->expires[slot]; - } - if ((keys = dictSize(dict)) != 0) { - evictionPoolPopulate(i, slot, dict, db, pool); - total_keys += keys; - } - /* Since keys are distributed across smaller slot-specific dictionaries in cluster mode, we may need to - * visit more than one dictionary in order to populate required number of samples into eviction pool. */ - } while (server.cluster_enabled && keys != 0 && server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS && - total_keys < (unsigned long) server.maxmemory_samples - ); + unsigned long sampled_keys = 0; + unsigned long current_db_keys = dbSize(db, keyType); + if (current_db_keys == 0) continue; + + total_keys += current_db_keys; + int l = dbNonEmptySlots(db, keyType); + /* Do not exceed the number of non-empty slots when looping. */ + while (l--) { + int slot = getFairRandomSlot(db, keyType); + dict = (keyType == DB_MAIN ? db->dict[slot] : db->expires[slot]); + sampled_keys += evictionPoolPopulate(i, slot, dict, db->dict[slot], pool); + /* We have sampled enough keys in the current db, exit the loop. */ + if (sampled_keys >= (unsigned long) server.maxmemory_samples) + break; + /* If there are not a lot of keys in the current db, dict/s may be very + * sparsely populated, exit the loop without meeting the sampling + * requirement. */ + if (current_db_keys < (unsigned long) server.maxmemory_samples*10) + break; + } } if (!total_keys) break; /* No keys to evict. */ diff --git a/src/server.c b/src/server.c index a7033b433..7bc0ffa51 100644 --- a/src/server.c +++ b/src/server.c @@ -2654,6 +2654,7 @@ void makeThreadKillable(void) { void initDbState(redisDb *db){ for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) { db->sub_dict[subdict].rehashing = listCreate(); + db->sub_dict[subdict].non_empty_slots = 0; db->sub_dict[subdict].key_count = 0; db->sub_dict[subdict].resize_cursor = -1; db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL; diff --git a/src/server.h b/src/server.h index a816a0c70..ee408fb24 100644 --- a/src/server.h +++ b/src/server.h @@ -972,6 +972,7 @@ typedef struct replBufBlock { typedef struct dbDictState { list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */ int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used for cluster-enabled). */ + int non_empty_slots; /* The number of non-empty slots. */ unsigned long long key_count; /* Total number of keys in this DB. */ unsigned long long bucket_count; /* Total number of buckets in this DB across dictionaries (only used for cluster-enabled). */ unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */ @@ -3128,6 +3129,7 @@ void dismissMemoryInChild(void); #define RESTART_SERVER_CONFIG_REWRITE (1<<1) /* CONFIG REWRITE before restart.*/ int restartServer(int flags, mstime_t delay); unsigned long long int dbSize(redisDb *db, dbKeyType keyType); +int dbNonEmptySlots(redisDb *db, dbKeyType keyType); int getKeySlot(sds key); int calculateKeySlot(sds key); unsigned long dbBuckets(redisDb *db, dbKeyType keyType);