Reduce dbBuckets operation time complexity from O(N) to O(1) ()

As part of  independent dictionaries were introduced per slot.
Time complexity to discover total no. of buckets across all dictionaries
increased to O(N) with straightforward implementation of iterating over
all dictionaries and adding dictBuckets of each.

To optimize the time complexity, we could maintain a global counter at
db level to keep track of the count of buckets and update it on the start
and end of rehashing.

---------

Co-authored-by: Roshan Khatri <rvkhatri@amazon.com>
This commit is contained in:
Harkrishn Patro 2023-10-27 12:05:40 -07:00 committed by GitHub
parent 7d68208a6e
commit 4145d628b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 18 deletions

@ -65,7 +65,7 @@ dict *dbGetDictFromIterator(dbIterator *dbit) {
else if (dbit->keyType == DB_EXPIRES)
return dbit->db->expires[dbit->slot];
else
serverAssert(0);
serverPanic("Unknown keyType");
}
/* Returns next dictionary from the iterator, or NULL if iteration is complete. */
@ -503,7 +503,7 @@ static inline unsigned long dictSizebySlot(redisDb *db, int slot, dbKeyType keyT
else if (keyType == DB_EXPIRES)
return dictSize(db->expires[slot]);
else
serverAssert(0);
serverPanic("Unknown keyType");
}
/* Finds a slot containing target element in a key space ordered by slot id.
@ -1390,14 +1390,16 @@ unsigned long long int dbSize(redisDb *db, dbKeyType keyType) {
/* This method proivdes the cumulative sum of all the dictionary buckets
* across dictionaries in a database. */
unsigned long dbBuckets(redisDb *db, dbKeyType keyType) {
unsigned long buckets = 0;
dict *d;
dbIterator *dbit = dbIteratorInit(db, keyType);
while ((d = dbIteratorNextDict(dbit))) {
buckets += dictBuckets(d);
if (server.cluster_enabled) {
return db->sub_dict[keyType].bucket_count;
} else {
if (keyType == DB_MAIN)
return dictBuckets(db->dict[0]);
else if (keyType == DB_EXPIRES)
return dictBuckets(db->expires[0]);
else
serverPanic("Unknown keyType");
}
zfree(dbit);
return buckets;
}
size_t dbMemUsage(redisDb *db, dbKeyType keyType) {
@ -1419,7 +1421,7 @@ dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType){
else if (keyType == DB_EXPIRES)
return dictFind(db->expires[slot], key);
else
serverAssert(0);
serverPanic("Unknown keyType");
}
/*
@ -1444,7 +1446,7 @@ unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long v,
else if (keyType == DB_EXPIRES)
d = db->expires[slot];
else
serverAssert(0);
serverPanic("Unknown keyType");
int is_dict_valid = (dictScanValidFunction == NULL || dictScanValidFunction(d) == C_OK);
if (is_dict_valid) {

@ -259,6 +259,8 @@ int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht_table[0] == NULL) {
if (d->type->rehashingStarted) d->type->rehashingStarted(d);
if (d->type->rehashingCompleted) d->type->rehashingCompleted(d);
d->ht_size_exp[0] = new_ht_size_exp;
d->ht_used[0] = new_ht_used;
d->ht_table[0] = new_ht_table;
@ -369,6 +371,7 @@ int dictRehash(dict *d, int n) {
/* Check if we already rehashed the whole table... */
if (d->ht_used[0] == 0) {
if (d->type->rehashingCompleted) d->type->rehashingCompleted(d);
zfree(d->ht_table[0]);
/* Copy the new ht onto the old one */
d->ht_table[0] = d->ht_table[1];
@ -1504,6 +1507,21 @@ dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash)
return NULL;
}
/* Provides the old and new ht size for a given dictionary during rehashing. This method
* should only be invoked during initialization/rehashing. */
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size) {
/* Expansion during initialization. */
if (d->ht_size_exp[0] == -1) {
*from_size = DICTHT_SIZE(d->ht_size_exp[0]);
*to_size = DICTHT_SIZE(DICT_HT_INITIAL_EXP);
return;
}
/* Invalid method usage if rehashing isn't ongoing. */
assert(dictIsRehashing(d));
*from_size = DICTHT_SIZE(d->ht_size_exp[0]);
*to_size = DICTHT_SIZE(d->ht_size_exp[1]);
}
/* ------------------------------- Debugging ---------------------------------*/
#define DICT_STATS_VECTLEN 50
void dictFreeStats(dictStats *stats) {

@ -55,7 +55,11 @@ typedef struct dictType {
void (*keyDestructor)(dict *d, void *key);
void (*valDestructor)(dict *d, void *obj);
int (*expandAllowed)(size_t moreMem, double usedRatio);
/* Invoked at the start of dict initialization/rehashing (old and new ht are already created) */
void (*rehashingStarted)(dict *d);
/* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists
* and are cleaned up after this callback. */
void (*rehashingCompleted)(dict *d);
/* Flags */
/* The 'no_value' flag, if set, indicates that values are not used, i.e. the
* dict is a set. When this flag is set, it's not possible to access the
@ -218,6 +222,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri
unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
void dictRehashingInfo(dict *d, unsigned long long *from, unsigned long long *to);
size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full);
dictStats* dictGetStatsHt(dict *d, int htidx, int full);

@ -400,17 +400,52 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) {
}
}
/* Adds dictionary to the rehashing list in cluster mode, which allows us
/* Updates the bucket count in cluster-mode for the given dictionary in a DB. bucket count
* incremented with the new ht size during the rehashing phase.
* And also adds dictionary to the rehashing list in cluster mode, which allows us
* to quickly find rehash targets during incremental rehashing.
* In non-cluster mode, we don't need this list as there is only one dictionary per DB. */
*
* In non-cluster mode, bucket count can be retrieved directly from single dict bucket and
* we don't need this list as there is only one dictionary per DB. */
void dictRehashingStarted(dict *d) {
if (!server.cluster_enabled || !server.activerehashing) return;
listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d);
if (!server.cluster_enabled) return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[DB_MAIN].bucket_count += to; /* Started rehashing (Add the new ht size) */
if (from == 0) return; /* No entries are to be moved. */
if (server.activerehashing) {
listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d);
}
}
/* Updates the bucket count for the given dictionary in a DB. It removes
* the old ht size of the dictionary from the total sum of buckets for a DB. */
void dictRehashingCompleted(dict *d) {
if (!server.cluster_enabled) return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[DB_MAIN].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
}
void dictRehashingStartedForExpires(dict *d) {
if (!server.cluster_enabled || !server.activerehashing) return;
listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d);
if (!server.cluster_enabled) return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[DB_EXPIRES].bucket_count += to; /* Started rehashing (Add the new ht size) */
if (from == 0) return; /* No entries are to be moved. */
if (server.activerehashing) {
listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d);
}
}
void dictRehashingCompletedForExpires(dict *d) {
if (!server.cluster_enabled) return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[DB_EXPIRES].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
}
/* Generic hash table type where keys are Redis Objects, Values
@ -469,6 +504,7 @@ dictType dbDictType = {
dictObjectDestructor, /* val destructor */
dictExpandAllowed, /* allow to expand */
dictRehashingStarted,
dictRehashingCompleted,
};
/* Db->expires */
@ -481,6 +517,7 @@ dictType dbExpiresDictType = {
NULL, /* val destructor */
dictExpandAllowed, /* allow to expand */
dictRehashingStartedForExpires,
dictRehashingCompletedForExpires,
};
/* Command table. sds string -> command struct pointer. */
@ -2603,6 +2640,7 @@ void initDbState(redisDb *db){
db->sub_dict[subdict].key_count = 0;
db->sub_dict[subdict].resize_cursor = 0;
db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL;
db->sub_dict[subdict].bucket_count = 0;
}
}

@ -962,8 +962,9 @@ typedef struct replBufBlock {
typedef struct dbDictState {
list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */
int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries. */
int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used for cluster-enabled). */
unsigned long long key_count; /* Total number of keys in this DB. */
unsigned long long bucket_count; /* Total number of buckets in this DB across dictionaries (only used for cluster-enabled). */
unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */
} dbDictState;