From bef72e5f6a2972bf0f654cf5c98cd522537d5ff1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 25 Jan 2021 16:29:47 +0000 Subject: [PATCH] Implement rehash during spinlock Former-commit-id: f68a26381a35b27948046d46c2c7bcfbdc21143d --- src/ae.cpp | 8 ++++- src/ae.h | 1 + src/dict.cpp | 67 +++++++++++++++++++++++------------- src/dict.h | 4 ++- src/fastlock.cpp | 11 ++++-- src/fastlock.h | 12 +++++-- src/fastlock_x64.asm | 52 +++++++++++++++++++++++++--- src/server.cpp | 81 +++++++++++++++++++++++++------------------- src/server.h | 1 + 9 files changed, 167 insertions(+), 70 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 89b36beaa..16a2ddb64 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -842,9 +842,15 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, eventLoop->aftersleepFlags = flags; } +thread_local spin_worker tl_worker = nullptr; +void setAeLockSetThreadSpinWorker(spin_worker worker) +{ + tl_worker = worker; +} + void aeAcquireLock() { - g_lock.lock(); + g_lock.lock(tl_worker); } int aeTryAcquireLock(int fWeak) diff --git a/src/ae.h b/src/ae.h index ab7127f8b..3a240877d 100644 --- a/src/ae.h +++ b/src/ae.h @@ -164,6 +164,7 @@ aeEventLoop *aeGetCurrentEventLoop(); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeSetDontWait(aeEventLoop *eventLoop, int noWait); +void setAeLockSetThreadSpinWorker(spin_worker worker); void aeAcquireLock(); int aeTryAcquireLock(int fWeak); void aeReleaseLock(); diff --git a/src/dict.cpp b/src/dict.cpp index 37e6998c4..e882fff2a 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -231,13 +231,14 @@ int dictRehash(dict *d, int n) { return 1; } -dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) { +dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { if (!dictIsRehashing(d)) return 0; + d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata); - int empty_visits = dictAsyncRehashCtl::c_targetQueueSize * 10; + int empty_visits = buckets * 10; - while (d->asyncdata->queue.size() < dictAsyncRehashCtl::c_targetQueueSize && (d->ht[0].used - d->asyncdata->queue.size()) != 0) { + while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) { dictEntry *de; /* Note that rehashidx can't overflow as we are sure there are more @@ -245,11 +246,12 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) { while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) goto LDone; + if (d->rehashidx >= d->ht[0].size) goto LDone; } de = d->ht[0].table[d->rehashidx]; // We have to queue every node in the bucket, even if we go over our target size - while (de) { + while (de != nullptr) { d->asyncdata->queue.emplace_back(de); de = de->next; } @@ -268,9 +270,21 @@ LDone: } void dictRehashAsync(dictAsyncRehashCtl *ctl) { - for (auto &wi : ctl->queue) { + for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) { + auto &wi = ctl->queue[idx]; wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); } + ctl->hashIdx = ctl->queue.size(); +} + +bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) { + size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size()); + for (; ctl->hashIdx < max; ++ctl->hashIdx) { + auto &wi = ctl->queue[ctl->hashIdx]; + wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); + } + + return ctl->hashIdx < ctl->queue.size(); } void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) { @@ -287,28 +301,33 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) { // Was the dictionary free'd while we were in flight? if (ctl->release) { - dictRelease(d); + if (d->asyncdata != nullptr) + d->asyncdata->release = true; + else + dictRelease(d); delete ctl; return; } - for (auto &wi : ctl->queue) { - // We need to remove it from the source hash table, and store it in the dest. - // Note that it may have been deleted in the meantime and therefore not exist. - // In this case it will be in the garbage collection list - - dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask]; - while (*pdePrev != nullptr && *pdePrev != wi.de) { - pdePrev = &((*pdePrev)->next); - } - if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list - assert(*pdePrev == wi.de); - *pdePrev = wi.de->next; - // Now link it to the dest hash table - wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask]; - d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de; - d->ht[0].used--; - d->ht[1].used++; + if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash + for (auto &wi : ctl->queue) { + // We need to remove it from the source hash table, and store it in the dest. + // Note that it may have been deleted in the meantime and therefore not exist. + // In this case it will be in the garbage collection list + + dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask]; + while (*pdePrev != nullptr && *pdePrev != wi.de) { + pdePrev = &((*pdePrev)->next); + } + if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list + assert(*pdePrev == wi.de); + *pdePrev = wi.de->next; + // Now link it to the dest hash table + wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask]; + d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de; + d->ht[0].used--; + d->ht[1].used++; + } } } @@ -565,7 +584,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { if ((he = ht->table[i]) == NULL) continue; while(he) { nextHe = he->next; - if (i == 0 && d->asyncdata) { + if (d->asyncdata) { he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { diff --git a/src/dict.h b/src/dict.h index b5cac21f9..2f6c9677a 100644 --- a/src/dict.h +++ b/src/dict.h @@ -96,6 +96,7 @@ struct dictAsyncRehashCtl { dictEntry *deGCList = nullptr; struct dict *dict = nullptr; std::vector queue; + size_t hashIdx = 0; bool release = false; dictAsyncRehashCtl *next = nullptr; @@ -218,8 +219,9 @@ uint64_t dictGetHash(dict *d, const void *key); dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); /* Async Rehash Functions */ -dictAsyncRehashCtl *dictRehashAsyncStart(dict *d); +dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize); void dictRehashAsync(dictAsyncRehashCtl *ctl); +bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes); void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl); /* Hash table types */ diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 4cada72cc..61cd1752e 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -338,7 +338,7 @@ extern "C" void fastlock_init(struct fastlock *lock, const char *name) } #ifndef ASM_SPINLOCK -extern "C" void fastlock_lock(struct fastlock *lock) +extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker) { int pidOwner; __atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE); @@ -362,11 +362,16 @@ extern "C" void fastlock_lock(struct fastlock *lock) if ((ticketT.u & 0xffff) == myticket) break; + if (worker != nullptr) { + worker(); + } else { #if defined(__i386__) || defined(__amd64__) - __asm__ __volatile__ ("pause"); + __asm__ __volatile__ ("pause"); #elif defined(__aarch64__) - __asm__ __volatile__ ("yield"); + __asm__ __volatile__ ("yield"); #endif + } + if ((++cloops % loopLimit) == 0) { fastlock_sleep(lock, tid, ticketT.u, myticket); diff --git a/src/fastlock.h b/src/fastlock.h index 0809c8bcd..ede6d4e16 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -6,10 +6,16 @@ extern "C" { #endif +typedef int (*spin_worker)(); + /* Begin C API */ struct fastlock; void fastlock_init(struct fastlock *lock, const char *name); -void fastlock_lock(struct fastlock *lock); +#ifdef __cplusplus +void fastlock_lock(struct fastlock *lock, spin_worker worker = nullptr); +#else +void fastlock_lock(struct fastlock *lock, spin_worker worker); +#endif int fastlock_trylock(struct fastlock *lock, int fWeak); void fastlock_unlock(struct fastlock *lock); void fastlock_free(struct fastlock *lock); @@ -56,9 +62,9 @@ struct fastlock fastlock_init(this, name); } - inline void lock() + inline void lock(spin_worker worker = nullptr) { - fastlock_lock(this); + fastlock_lock(this, worker); } inline bool try_lock(bool fWeak = false) diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 26c302434..5fa8e0ade 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -22,14 +22,22 @@ fastlock_lock: # [rdi+64] ... # uint16_t active # uint16_t avail + # + # RSI points to a spin function to call, or NULL # First get our TID and put it in ecx - push rdi # we need our struct pointer (also balance the stack for the call) - .cfi_adjust_cfa_offset 8 + sub rsp, 24 # We only use 16 bytes, but we also need the stack aligned + .cfi_adjust_cfa_offset 24 + mov [rsp], rdi # we need our struct pointer (also balance the stack for the call) + mov [rsp+8], rsi # backup the spin function + call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining) mov esi, eax # back it up in esi - pop rdi # get our pointer back - .cfi_adjust_cfa_offset -8 + + mov rdi, [rsp] # Restore spin struct + mov r8, [rsp+8] # restore the function (in a different register) + add rsp, 24 + .cfi_adjust_cfa_offset -24 cmp [rdi], esi # Is the TID we got back the owner of the lock? je .LLocked # Don't spin in that case @@ -47,6 +55,8 @@ fastlock_lock: # ax now contains the ticket # OK Start the wait loop xor ecx, ecx + test r8, r8 + jnz .LLoopFunction .ALIGN 16 .LLoop: mov edx, [rdi+64] @@ -83,6 +93,40 @@ fastlock_lock: mov [rdi], esi # lock->m_pidOwner = gettid() inc dword ptr [rdi+4] # lock->m_depth++ ret + +.LLoopFunction: + sub rsp, 40 + .cfi_adjust_cfa_offset 40 + xor ecx, ecx + mov [rsp], rcx + mov [rsp+8], r8 + mov [rsp+16], rdi + mov [rsp+24], rsi + mov [rsp+32], eax +.LLoopFunctionCore: + mov edx, [rdi+64] + cmp dx, ax + je .LExitLoopFunction + mov r8, [rsp+8] + call r8 + test eax, eax + jz .LExitLoopFunctionForNormal + mov eax, [rsp+32] # restore clobbered eax + mov rdi, [rsp+16] + jmp .LLoopFunctionCore +.LExitLoopFunction: + mov rsi, [rsp+24] + add rsp, 40 + .cfi_adjust_cfa_offset -40 + jmp .LLocked +.LExitLoopFunctionForNormal: + xor ecx, ecx + mov rdi, [rsp+16] + mov rsi, [rsp+24] + mov eax, [rsp+32] + add rsp, 40 + .cfi_adjust_cfa_offset 40 + jmp .LLoop .cfi_endproc .ALIGN 16 diff --git a/src/server.cpp b/src/server.cpp index fdb8885b1..f1dd30d0d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1508,13 +1508,12 @@ void tryResizeHashTables(int dbid) { * table will use two tables for a long time. So we try to use 1 millisecond * of CPU time at every call of this function to perform some rehashing. * - * The function returns 1 if some rehashing was performed, otherwise 0 + * The function returns the number of rehashes if some rehashing was performed, otherwise 0 * is returned. */ int incrementallyRehash(int dbid) { /* Keys dictionary */ if (dictIsRehashing(g_pserver->db[dbid].dict)) { - dictRehashMilliseconds(g_pserver->db[dbid].dict,1); - return 1; /* already used our millisecond for this loop... */ + return dictRehashMilliseconds(g_pserver->db[dbid].dict,1); } return 0; } @@ -1772,12 +1771,22 @@ bool expireOwnKeys() return false; } +int hash_spin_worker() { + auto ctl = serverTL->rehashCtl; + if (ctl->hashIdx < ctl->queue.size()) { + ctl->queue[ctl->hashIdx].hash = dictHashKey(ctl->dict, dictGetKey(ctl->queue[ctl->hashIdx].de)); + ctl->hashIdx++; + return true; + } else { + return false; + } +} + /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ -dictAsyncRehashCtl* databasesCron(bool fMainThread) { - dictAsyncRehashCtl *ctl = nullptr; - +void databasesCron(bool fMainThread) { + static int rehashes_per_ms = 0; if (fMainThread) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ @@ -1819,24 +1828,42 @@ dictAsyncRehashCtl* databasesCron(bool fMainThread) { /* Rehash */ if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { - ctl = dictRehashAsyncStart(g_pserver->db[rehash_db].pdict); - if (ctl) + if (serverTL->rehashCtl != nullptr) { + if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) { + break; + } else { + dictCompleteRehashAsync(serverTL->rehashCtl); + serverTL->rehashCtl = nullptr; + } + } + if (rehashes_per_ms > 0) + serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms); + if (serverTL->rehashCtl) break; - int work_done = fMainThread && incrementallyRehash(rehash_db); - if (work_done) { - /* If the function did some work, stop here, we'll do - * more at the next cron loop. */ - break; - } else { - /* If this db didn't need rehash, we'll try the next one. */ - rehash_db++; - rehash_db %= cserver.dbnum; + + if (fMainThread) { + // We only synchronously rehash on the main thread, otherwise we'll cause too much latency + rehashes_per_ms = incrementallyRehash(rehash_db); + if (rehashes_per_ms > 0) { + /* If the function did some work, stop here, we'll do + * more at the next cron loop. */ + serverLog(LL_WARNING, "Rehashes per ms: %d", rehashes_per_ms); + break; + } else { + /* If this db didn't need rehash, we'll try the next one. */ + rehash_db++; + rehash_db %= cserver.dbnum; + } } } } } - return ctl; + if (serverTL->rehashCtl) { + setAeLockSetThreadSpinWorker(hash_spin_worker); + } else { + setAeLockSetThreadSpinWorker(nullptr); + } } /* We take a cached value of the unix time in the global state because with @@ -2076,14 +2103,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { clientsCron(IDX_EVENT_LOOP_MAIN); /* Handle background operations on Redis databases. */ - auto asyncRehash = databasesCron(true /* fMainThread */); - - if (asyncRehash) { - aeReleaseLock(); - dictRehashAsync(asyncRehash); - aeAcquireLock(); - dictCompleteRehashAsync(asyncRehash); - } + databasesCron(true /* fMainThread */); /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ @@ -2234,14 +2254,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData } /* Handle background operations on Redis databases. */ - auto asyncRehash = databasesCron(false /* fMainThread */); - - if (asyncRehash) { - aeReleaseLock(); - dictRehashAsync(asyncRehash); - aeAcquireLock(); - dictCompleteRehashAsync(asyncRehash); - } + databasesCron(false /* fMainThread */); /* Unpause clients if enough time has elapsed */ unpauseClientsIfNecessary(); diff --git a/src/server.h b/src/server.h index 0da8b84f3..ea1356889 100644 --- a/src/server.h +++ b/src/server.h @@ -1393,6 +1393,7 @@ struct redisServerThreadVars { long unsigned commandsExecuted = 0; bool fRetrySetAofEvent = false; std::vector vecclientsProcess; + dictAsyncRehashCtl *rehashCtl = nullptr; }; struct redisMaster {