Implement rehash during spinlock

Former-commit-id: f68a26381a35b27948046d46c2c7bcfbdc21143d
This commit is contained in:
John Sully 2021-01-25 16:29:47 +00:00
parent 5ab1095022
commit bef72e5f6a
9 changed files with 167 additions and 70 deletions

View File

@ -842,9 +842,15 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep,
eventLoop->aftersleepFlags = flags; eventLoop->aftersleepFlags = flags;
} }
thread_local spin_worker tl_worker = nullptr;
void setAeLockSetThreadSpinWorker(spin_worker worker)
{
tl_worker = worker;
}
void aeAcquireLock() void aeAcquireLock()
{ {
g_lock.lock(); g_lock.lock(tl_worker);
} }
int aeTryAcquireLock(int fWeak) int aeTryAcquireLock(int fWeak)

View File

@ -164,6 +164,7 @@ aeEventLoop *aeGetCurrentEventLoop();
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait); void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
void setAeLockSetThreadSpinWorker(spin_worker worker);
void aeAcquireLock(); void aeAcquireLock();
int aeTryAcquireLock(int fWeak); int aeTryAcquireLock(int fWeak);
void aeReleaseLock(); void aeReleaseLock();

View File

@ -231,13 +231,14 @@ int dictRehash(dict *d, int n) {
return 1; return 1;
} }
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) { dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
if (!dictIsRehashing(d)) return 0; if (!dictIsRehashing(d)) return 0;
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata); d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
int empty_visits = dictAsyncRehashCtl::c_targetQueueSize * 10; int empty_visits = buckets * 10;
while (d->asyncdata->queue.size() < dictAsyncRehashCtl::c_targetQueueSize && (d->ht[0].used - d->asyncdata->queue.size()) != 0) { while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) {
dictEntry *de; dictEntry *de;
/* Note that rehashidx can't overflow as we are sure there are more /* Note that rehashidx can't overflow as we are sure there are more
@ -245,11 +246,12 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) {
while(d->ht[0].table[d->rehashidx] == NULL) { while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++; d->rehashidx++;
if (--empty_visits == 0) goto LDone; if (--empty_visits == 0) goto LDone;
if (d->rehashidx >= d->ht[0].size) goto LDone;
} }
de = d->ht[0].table[d->rehashidx]; de = d->ht[0].table[d->rehashidx];
// We have to queue every node in the bucket, even if we go over our target size // We have to queue every node in the bucket, even if we go over our target size
while (de) { while (de != nullptr) {
d->asyncdata->queue.emplace_back(de); d->asyncdata->queue.emplace_back(de);
de = de->next; de = de->next;
} }
@ -268,9 +270,21 @@ LDone:
} }
void dictRehashAsync(dictAsyncRehashCtl *ctl) { void dictRehashAsync(dictAsyncRehashCtl *ctl) {
for (auto &wi : ctl->queue) { for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) {
auto &wi = ctl->queue[idx];
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
} }
ctl->hashIdx = ctl->queue.size();
}
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size());
for (; ctl->hashIdx < max; ++ctl->hashIdx) {
auto &wi = ctl->queue[ctl->hashIdx];
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
}
return ctl->hashIdx < ctl->queue.size();
} }
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) { void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) {
@ -287,28 +301,33 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) {
// Was the dictionary free'd while we were in flight? // Was the dictionary free'd while we were in flight?
if (ctl->release) { if (ctl->release) {
dictRelease(d); if (d->asyncdata != nullptr)
d->asyncdata->release = true;
else
dictRelease(d);
delete ctl; delete ctl;
return; return;
} }
for (auto &wi : ctl->queue) { if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
// We need to remove it from the source hash table, and store it in the dest. for (auto &wi : ctl->queue) {
// Note that it may have been deleted in the meantime and therefore not exist. // We need to remove it from the source hash table, and store it in the dest.
// In this case it will be in the garbage collection list // Note that it may have been deleted in the meantime and therefore not exist.
// In this case it will be in the garbage collection list
dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask]; dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask];
while (*pdePrev != nullptr && *pdePrev != wi.de) { while (*pdePrev != nullptr && *pdePrev != wi.de) {
pdePrev = &((*pdePrev)->next); pdePrev = &((*pdePrev)->next);
} }
if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list
assert(*pdePrev == wi.de); assert(*pdePrev == wi.de);
*pdePrev = wi.de->next; *pdePrev = wi.de->next;
// Now link it to the dest hash table // Now link it to the dest hash table
wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask]; wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask];
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de; d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
d->ht[0].used--; d->ht[0].used--;
d->ht[1].used++; d->ht[1].used++;
}
} }
} }
@ -565,7 +584,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
if ((he = ht->table[i]) == NULL) continue; if ((he = ht->table[i]) == NULL) continue;
while(he) { while(he) {
nextHe = he->next; nextHe = he->next;
if (i == 0 && d->asyncdata) { if (d->asyncdata) {
he->next = d->asyncdata->deGCList; he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he; d->asyncdata->deGCList = he;
} else { } else {

View File

@ -96,6 +96,7 @@ struct dictAsyncRehashCtl {
dictEntry *deGCList = nullptr; dictEntry *deGCList = nullptr;
struct dict *dict = nullptr; struct dict *dict = nullptr;
std::vector<workItem> queue; std::vector<workItem> queue;
size_t hashIdx = 0;
bool release = false; bool release = false;
dictAsyncRehashCtl *next = nullptr; dictAsyncRehashCtl *next = nullptr;
@ -218,8 +219,9 @@ uint64_t dictGetHash(dict *d, const void *key);
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
/* Async Rehash Functions */ /* Async Rehash Functions */
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d); dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize);
void dictRehashAsync(dictAsyncRehashCtl *ctl); void dictRehashAsync(dictAsyncRehashCtl *ctl);
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes);
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl); void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl);
/* Hash table types */ /* Hash table types */

View File

@ -338,7 +338,7 @@ extern "C" void fastlock_init(struct fastlock *lock, const char *name)
} }
#ifndef ASM_SPINLOCK #ifndef ASM_SPINLOCK
extern "C" void fastlock_lock(struct fastlock *lock) extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker)
{ {
int pidOwner; int pidOwner;
__atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE); __atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE);
@ -362,11 +362,16 @@ extern "C" void fastlock_lock(struct fastlock *lock)
if ((ticketT.u & 0xffff) == myticket) if ((ticketT.u & 0xffff) == myticket)
break; break;
if (worker != nullptr) {
worker();
} else {
#if defined(__i386__) || defined(__amd64__) #if defined(__i386__) || defined(__amd64__)
__asm__ __volatile__ ("pause"); __asm__ __volatile__ ("pause");
#elif defined(__aarch64__) #elif defined(__aarch64__)
__asm__ __volatile__ ("yield"); __asm__ __volatile__ ("yield");
#endif #endif
}
if ((++cloops % loopLimit) == 0) if ((++cloops % loopLimit) == 0)
{ {
fastlock_sleep(lock, tid, ticketT.u, myticket); fastlock_sleep(lock, tid, ticketT.u, myticket);

View File

@ -6,10 +6,16 @@
extern "C" { extern "C" {
#endif #endif
typedef int (*spin_worker)();
/* Begin C API */ /* Begin C API */
struct fastlock; struct fastlock;
void fastlock_init(struct fastlock *lock, const char *name); void fastlock_init(struct fastlock *lock, const char *name);
void fastlock_lock(struct fastlock *lock); #ifdef __cplusplus
void fastlock_lock(struct fastlock *lock, spin_worker worker = nullptr);
#else
void fastlock_lock(struct fastlock *lock, spin_worker worker);
#endif
int fastlock_trylock(struct fastlock *lock, int fWeak); int fastlock_trylock(struct fastlock *lock, int fWeak);
void fastlock_unlock(struct fastlock *lock); void fastlock_unlock(struct fastlock *lock);
void fastlock_free(struct fastlock *lock); void fastlock_free(struct fastlock *lock);
@ -56,9 +62,9 @@ struct fastlock
fastlock_init(this, name); fastlock_init(this, name);
} }
inline void lock() inline void lock(spin_worker worker = nullptr)
{ {
fastlock_lock(this); fastlock_lock(this, worker);
} }
inline bool try_lock(bool fWeak = false) inline bool try_lock(bool fWeak = false)

View File

@ -22,14 +22,22 @@ fastlock_lock:
# [rdi+64] ... # [rdi+64] ...
# uint16_t active # uint16_t active
# uint16_t avail # uint16_t avail
#
# RSI points to a spin function to call, or NULL
# First get our TID and put it in ecx # First get our TID and put it in ecx
push rdi # we need our struct pointer (also balance the stack for the call) sub rsp, 24 # We only use 16 bytes, but we also need the stack aligned
.cfi_adjust_cfa_offset 8 .cfi_adjust_cfa_offset 24
mov [rsp], rdi # we need our struct pointer (also balance the stack for the call)
mov [rsp+8], rsi # backup the spin function
call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining) call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining)
mov esi, eax # back it up in esi mov esi, eax # back it up in esi
pop rdi # get our pointer back
.cfi_adjust_cfa_offset -8 mov rdi, [rsp] # Restore spin struct
mov r8, [rsp+8] # restore the function (in a different register)
add rsp, 24
.cfi_adjust_cfa_offset -24
cmp [rdi], esi # Is the TID we got back the owner of the lock? cmp [rdi], esi # Is the TID we got back the owner of the lock?
je .LLocked # Don't spin in that case je .LLocked # Don't spin in that case
@ -47,6 +55,8 @@ fastlock_lock:
# ax now contains the ticket # ax now contains the ticket
# OK Start the wait loop # OK Start the wait loop
xor ecx, ecx xor ecx, ecx
test r8, r8
jnz .LLoopFunction
.ALIGN 16 .ALIGN 16
.LLoop: .LLoop:
mov edx, [rdi+64] mov edx, [rdi+64]
@ -83,6 +93,40 @@ fastlock_lock:
mov [rdi], esi # lock->m_pidOwner = gettid() mov [rdi], esi # lock->m_pidOwner = gettid()
inc dword ptr [rdi+4] # lock->m_depth++ inc dword ptr [rdi+4] # lock->m_depth++
ret ret
.LLoopFunction:
sub rsp, 40
.cfi_adjust_cfa_offset 40
xor ecx, ecx
mov [rsp], rcx
mov [rsp+8], r8
mov [rsp+16], rdi
mov [rsp+24], rsi
mov [rsp+32], eax
.LLoopFunctionCore:
mov edx, [rdi+64]
cmp dx, ax
je .LExitLoopFunction
mov r8, [rsp+8]
call r8
test eax, eax
jz .LExitLoopFunctionForNormal
mov eax, [rsp+32] # restore clobbered eax
mov rdi, [rsp+16]
jmp .LLoopFunctionCore
.LExitLoopFunction:
mov rsi, [rsp+24]
add rsp, 40
.cfi_adjust_cfa_offset -40
jmp .LLocked
.LExitLoopFunctionForNormal:
xor ecx, ecx
mov rdi, [rsp+16]
mov rsi, [rsp+24]
mov eax, [rsp+32]
add rsp, 40
.cfi_adjust_cfa_offset 40
jmp .LLoop
.cfi_endproc .cfi_endproc
.ALIGN 16 .ALIGN 16

View File

@ -1508,13 +1508,12 @@ void tryResizeHashTables(int dbid) {
* table will use two tables for a long time. So we try to use 1 millisecond * table will use two tables for a long time. So we try to use 1 millisecond
* of CPU time at every call of this function to perform some rehashing. * of CPU time at every call of this function to perform some rehashing.
* *
* The function returns 1 if some rehashing was performed, otherwise 0 * The function returns the number of rehashes if some rehashing was performed, otherwise 0
* is returned. */ * is returned. */
int incrementallyRehash(int dbid) { int incrementallyRehash(int dbid) {
/* Keys dictionary */ /* Keys dictionary */
if (dictIsRehashing(g_pserver->db[dbid].dict)) { if (dictIsRehashing(g_pserver->db[dbid].dict)) {
dictRehashMilliseconds(g_pserver->db[dbid].dict,1); return dictRehashMilliseconds(g_pserver->db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
} }
return 0; return 0;
} }
@ -1772,12 +1771,22 @@ bool expireOwnKeys()
return false; return false;
} }
int hash_spin_worker() {
auto ctl = serverTL->rehashCtl;
if (ctl->hashIdx < ctl->queue.size()) {
ctl->queue[ctl->hashIdx].hash = dictHashKey(ctl->dict, dictGetKey(ctl->queue[ctl->hashIdx].de));
ctl->hashIdx++;
return true;
} else {
return false;
}
}
/* This function handles 'background' operations we are required to do /* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing, * incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */ * rehashing. */
dictAsyncRehashCtl* databasesCron(bool fMainThread) { void databasesCron(bool fMainThread) {
dictAsyncRehashCtl *ctl = nullptr; static int rehashes_per_ms = 0;
if (fMainThread) { if (fMainThread) {
/* Expire keys by random sampling. Not required for slaves /* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */ * as master will synthesize DELs for us. */
@ -1819,24 +1828,42 @@ dictAsyncRehashCtl* databasesCron(bool fMainThread) {
/* Rehash */ /* Rehash */
if (g_pserver->activerehashing) { if (g_pserver->activerehashing) {
for (j = 0; j < dbs_per_call; j++) { for (j = 0; j < dbs_per_call; j++) {
ctl = dictRehashAsyncStart(g_pserver->db[rehash_db].pdict); if (serverTL->rehashCtl != nullptr) {
if (ctl) if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) {
break;
} else {
dictCompleteRehashAsync(serverTL->rehashCtl);
serverTL->rehashCtl = nullptr;
}
}
if (rehashes_per_ms > 0)
serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms);
if (serverTL->rehashCtl)
break; break;
int work_done = fMainThread && incrementallyRehash(rehash_db);
if (work_done) { if (fMainThread) {
/* If the function did some work, stop here, we'll do // We only synchronously rehash on the main thread, otherwise we'll cause too much latency
* more at the next cron loop. */ rehashes_per_ms = incrementallyRehash(rehash_db);
break; if (rehashes_per_ms > 0) {
} else { /* If the function did some work, stop here, we'll do
/* If this db didn't need rehash, we'll try the next one. */ * more at the next cron loop. */
rehash_db++; serverLog(LL_WARNING, "Rehashes per ms: %d", rehashes_per_ms);
rehash_db %= cserver.dbnum; break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= cserver.dbnum;
}
} }
} }
} }
} }
return ctl; if (serverTL->rehashCtl) {
setAeLockSetThreadSpinWorker(hash_spin_worker);
} else {
setAeLockSetThreadSpinWorker(nullptr);
}
} }
/* We take a cached value of the unix time in the global state because with /* We take a cached value of the unix time in the global state because with
@ -2076,14 +2103,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
clientsCron(IDX_EVENT_LOOP_MAIN); clientsCron(IDX_EVENT_LOOP_MAIN);
/* Handle background operations on Redis databases. */ /* Handle background operations on Redis databases. */
auto asyncRehash = databasesCron(true /* fMainThread */); databasesCron(true /* fMainThread */);
if (asyncRehash) {
aeReleaseLock();
dictRehashAsync(asyncRehash);
aeAcquireLock();
dictCompleteRehashAsync(asyncRehash);
}
/* Start a scheduled AOF rewrite if this was requested by the user while /* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */ * a BGSAVE was in progress. */
@ -2234,14 +2254,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
} }
/* Handle background operations on Redis databases. */ /* Handle background operations on Redis databases. */
auto asyncRehash = databasesCron(false /* fMainThread */); databasesCron(false /* fMainThread */);
if (asyncRehash) {
aeReleaseLock();
dictRehashAsync(asyncRehash);
aeAcquireLock();
dictCompleteRehashAsync(asyncRehash);
}
/* Unpause clients if enough time has elapsed */ /* Unpause clients if enough time has elapsed */
unpauseClientsIfNecessary(); unpauseClientsIfNecessary();

View File

@ -1393,6 +1393,7 @@ struct redisServerThreadVars {
long unsigned commandsExecuted = 0; long unsigned commandsExecuted = 0;
bool fRetrySetAofEvent = false; bool fRetrySetAofEvent = false;
std::vector<client*> vecclientsProcess; std::vector<client*> vecclientsProcess;
dictAsyncRehashCtl *rehashCtl = nullptr;
}; };
struct redisMaster { struct redisMaster {