The server should recalibrate periodically. Also reduce log noise
Former-commit-id: 8d8e1810d49da6aa921d2327cb4ea250c2b5b234
This commit is contained in:
parent
27552cb307
commit
1b3dc3d422
@ -3489,10 +3489,10 @@ void processEventsWhileBlocked(int iel) {
|
|||||||
// Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here)
|
// Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here)
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
redisDb *db = &g_pserver->db[idb];
|
redisDb *db = &g_pserver->db[idb];
|
||||||
while (db->pdict->asyncdata != nullptr) {
|
while (db->dict->asyncdata != nullptr) {
|
||||||
if (!db->pdict->asyncdata->done)
|
if (!db->dict->asyncdata->done)
|
||||||
break;
|
break;
|
||||||
dictCompleteRehashAsync(db->pdict->asyncdata, false /*fFree*/);
|
dictCompleteRehashAsync(db->dict->asyncdata, false /*fFree*/);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1782,6 +1782,7 @@ int hash_spin_worker() {
|
|||||||
void databasesCron(bool fMainThread) {
|
void databasesCron(bool fMainThread) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
static int rehashes_per_ms = 0;
|
static int rehashes_per_ms = 0;
|
||||||
|
static int async_rehashes = 0;
|
||||||
if (fMainThread) {
|
if (fMainThread) {
|
||||||
/* Expire keys by random sampling. Not required for slaves
|
/* Expire keys by random sampling. Not required for slaves
|
||||||
* as master will synthesize DELs for us. */
|
* as master will synthesize DELs for us. */
|
||||||
@ -1833,28 +1834,33 @@ void databasesCron(bool fMainThread) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
serverAssert(serverTL->rehashCtl == nullptr);
|
serverAssert(serverTL->rehashCtl == nullptr);
|
||||||
if (rehashes_per_ms > 0)
|
/* Are we async rehashing? And if so is it time to re-calibrate? */
|
||||||
|
/* The recalibration limit is a prime number to ensure balancing across threads */
|
||||||
|
if (rehashes_per_ms > 0 && async_rehashes < 131) {
|
||||||
serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms);
|
serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms);
|
||||||
|
++async_rehashes;
|
||||||
|
}
|
||||||
if (serverTL->rehashCtl)
|
if (serverTL->rehashCtl)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
// Before starting anything new, can we end the rehash of a blocked thread?
|
// Before starting anything new, can we end the rehash of a blocked thread?
|
||||||
if (g_pserver->db[rehash_db].pdict->asyncdata != nullptr) {
|
if (g_pserver->db[rehash_db].dict->asyncdata != nullptr) {
|
||||||
auto asyncdata = g_pserver->db[rehash_db].pdict->asyncdata;
|
auto asyncdata = g_pserver->db[rehash_db].dict->asyncdata;
|
||||||
if (asyncdata->done) {
|
if (asyncdata->done) {
|
||||||
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
|
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
|
||||||
serverAssert(g_pserver->db[rehash_db].pdict->asyncdata != asyncdata);
|
serverAssert(g_pserver->db[rehash_db].dict->asyncdata != asyncdata);
|
||||||
break; // completion can be expensive, don't do anything else
|
break; // completion can be expensive, don't do anything else
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rehashes_per_ms = incrementallyRehash(rehash_db);
|
rehashes_per_ms = incrementallyRehash(rehash_db);
|
||||||
|
async_rehashes = 0;
|
||||||
if (rehashes_per_ms > 0) {
|
if (rehashes_per_ms > 0) {
|
||||||
/* If the function did some work, stop here, we'll do
|
/* If the function did some work, stop here, we'll do
|
||||||
* more at the next cron loop. */
|
* more at the next cron loop. */
|
||||||
serverLog(LL_WARNING, "Rehashes per ms: %d", rehashes_per_ms);
|
serverLog(LL_VERBOSE, "Calibrated rehashes per ms: %d", rehashes_per_ms);
|
||||||
break;
|
break;
|
||||||
} else if (g_pserver->db[rehash_db].pdict->asyncdata == nullptr) {
|
} else if (g_pserver->db[rehash_db].dict->asyncdata == nullptr) {
|
||||||
/* If this db didn't need rehash and we have none in flight, we'll try the next one. */
|
/* If this db didn't need rehash and we have none in flight, we'll try the next one. */
|
||||||
rehash_db++;
|
rehash_db++;
|
||||||
rehash_db %= cserver.dbnum;
|
rehash_db %= cserver.dbnum;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user