diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d908a5fcc..0327be059 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,7 @@ jobs: run: ./utils/gen-test-certs.sh - name: test-tls run: | - sudo apt-get -y install tcl8.5 tcl-tls + sudo apt-get -y install tcl tcl-tls ./runtest --clients 2 --verbose --tls - name: cluster-test run: | diff --git a/src/ae.cpp b/src/ae.cpp index 7459ea373..d88328ded 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -109,13 +109,6 @@ enum class AE_ASYNC_OP CreateFileEvent, }; -struct aeCommandControl -{ - std::condition_variable cv; - std::atomic rval; - std::mutex mutexcv; -}; - struct aeCommand { AE_ASYNC_OP op; @@ -128,7 +121,6 @@ struct aeCommand std::function *pfn; }; void *clientData; - aeCommandControl *pctl; }; #ifdef PIPE_BUF static_assert(sizeof(aeCommand) <= PIPE_BUF, "aeCommand must be small enough to send atomically through a pipe"); @@ -152,19 +144,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) break; case AE_ASYNC_OP::CreateFileEvent: - { - if (cmd.pctl != nullptr) - { - cmd.pctl->mutexcv.lock(); - std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData)); - cmd.pctl->cv.notify_all(); - cmd.pctl->mutexcv.unlock(); - } - else - { - aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData); - } - } + aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData); break; case AE_ASYNC_OP::PostFunction: @@ -178,19 +158,11 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostCppFunction: { - if (cmd.pctl != nullptr) - cmd.pctl->mutexcv.lock(); - std::unique_lock ulock(g_lock, std::defer_lock); if (cmd.fLock) ulock.lock(); (*cmd.pfn)(); - - if (cmd.pctl != nullptr) - { - cmd.pctl->cv.notify_all(); - cmd.pctl->mutexcv.unlock(); - } + delete cmd.pfn; } break; @@ -229,7 +201,7 @@ ssize_t safe_write(int fd, const void *pv, size_t cb) } int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, int fSynchronous) + aeFileProc *proc, void *clientData) { if (eventLoop == g_eventLoopThisThread) return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData); @@ -242,13 +214,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, cmd.mask = mask; cmd.fproc = proc; cmd.clientData = clientData; - cmd.pctl = nullptr; cmd.fLock = true; - if (fSynchronous) - { - cmd.pctl = new aeCommandControl(); - cmd.pctl->mutexcv.lock(); - } auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) @@ -257,16 +223,6 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, serverAssert(errno == EAGAIN); ret = AE_ERR; } - - if (fSynchronous) - { - { - std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); - cmd.pctl->cv.wait(ulock); - ret = cmd.pctl->rval; - } - delete cmd.pctl; - } return ret; } @@ -289,7 +245,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock, bool fForceQueue) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock, bool fForceQueue) { if (eventLoop == g_eventLoopThisThread && !fForceQueue) { @@ -300,13 +256,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch aeCommand cmd = {}; cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.pfn = new std::function(fn); - cmd.pctl = nullptr; cmd.fLock = fLock; - if (fSynchronous) - { - cmd.pctl = new aeCommandControl(); - cmd.pctl->mutexcv.lock(); - } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (!(!size || size == sizeof(cmd))) { @@ -316,17 +266,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch if (size == 0) return AE_ERR; - int ret = AE_OK; - if (fSynchronous) - { - { - std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); - cmd.pctl->cv.wait(ulock); - ret = cmd.pctl->rval; - } - delete cmd.pctl; - } - return ret; + + return AE_OK; } aeEventLoop *aeCreateEventLoop(int setsize) { @@ -904,9 +845,15 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, eventLoop->aftersleepFlags = flags; } +thread_local spin_worker tl_worker = nullptr; +void setAeLockSetThreadSpinWorker(spin_worker worker) +{ + tl_worker = worker; +} + void aeAcquireLock() { - g_lock.lock(); + g_lock.lock(tl_worker); } int aeTryAcquireLock(int fWeak) diff --git a/src/ae.h b/src/ae.h index e77abb01f..3a240877d 100644 --- a/src/ae.h +++ b/src/ae.h @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock = true, bool fForceQueue = false); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); @@ -144,7 +144,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, int fSynchronous); + aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask); @@ -164,6 +164,7 @@ aeEventLoop *aeGetCurrentEventLoop(); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeSetDontWait(aeEventLoop *eventLoop, int noWait); +void setAeLockSetThreadSpinWorker(spin_worker worker); void aeAcquireLock(); int aeTryAcquireLock(int fWeak); void aeReleaseLock(); diff --git a/src/dict.cpp b/src/dict.cpp index 8c111e183..819921dbb 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -128,6 +128,7 @@ int _dictInit(dict *d, dictType *type, d->privdata = privDataPtr; d->rehashidx = -1; d->iterators = 0; + d->asyncdata = nullptr; return DICT_OK; } @@ -328,13 +329,13 @@ int dictMerge(dict *dst, dict *src) int dictRehash(dict *d, int n) { int empty_visits = n*10; /* Max number of empty buckets to visit. */ if (!dictIsRehashing(d)) return 0; + if (d->asyncdata) return 0; while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde; /* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ - assert(d->ht[0].size > (unsigned long)d->rehashidx); while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return 1; @@ -370,6 +371,144 @@ int dictRehash(dict *d, int n) { return 1; } +dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { + if (!dictIsRehashing(d)) return 0; + + d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata); + + int empty_visits = buckets * 10; + + while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) { + dictEntry *de; + + /* Note that rehashidx can't overflow as we are sure there are more + * elements because ht[0].used != 0 */ + while(d->ht[0].table[d->rehashidx] == NULL) { + d->rehashidx++; + if (--empty_visits == 0) goto LDone; + if (d->rehashidx >= d->ht[0].size) goto LDone; + } + + de = d->ht[0].table[d->rehashidx]; + // We have to queue every node in the bucket, even if we go over our target size + while (de != nullptr) { + d->asyncdata->queue.emplace_back(de); + de = de->next; + } + d->rehashidx++; + } + +LDone: + if (d->asyncdata->queue.empty()) { + // We didn't find any work to do (can happen if there is a large gap in the hash table) + auto asyncT = d->asyncdata; + d->asyncdata = d->asyncdata->next; + delete asyncT; + return nullptr; + } + return d->asyncdata; +} + +void dictRehashAsync(dictAsyncRehashCtl *ctl) { + for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) { + auto &wi = ctl->queue[idx]; + wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); + } + ctl->hashIdx = ctl->queue.size(); + ctl->done = true; +} + +bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) { + size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size()); + for (; ctl->hashIdx < max; ++ctl->hashIdx) { + auto &wi = ctl->queue[ctl->hashIdx]; + wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); + } + + if (ctl->hashIdx == ctl->queue.size()) ctl->done = true; + return ctl->hashIdx < ctl->queue.size(); +} + +void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { + dict *d = ctl->dict; + assert(ctl->done); + + // Unlink ourselves + bool fUnlinked = false; + dictAsyncRehashCtl **pprev = &d->asyncdata; + + while (*pprev != nullptr) { + if (*pprev == ctl) { + *pprev = ctl->next; + fUnlinked = true; + break; + } + pprev = &((*pprev)->next); + } + + if (fUnlinked) { + if (ctl->next != nullptr && ctl->deGCList != nullptr) { + // An older work item may depend on our free list, so pass our free list to them + dictEntry **deGC = &ctl->next->deGCList; + while (*deGC != nullptr) deGC = &((*deGC)->next); + *deGC = ctl->deGCList; + ctl->deGCList = nullptr; + } + } + + if (fUnlinked && !ctl->release) { + if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash + for (auto &wi : ctl->queue) { + // We need to remove it from the source hash table, and store it in the dest. + // Note that it may have been deleted in the meantime and therefore not exist. + // In this case it will be in the garbage collection list + + dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask]; + while (*pdePrev != nullptr && *pdePrev != wi.de) { + pdePrev = &((*pdePrev)->next); + } + if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list + assert(*pdePrev == wi.de); + *pdePrev = wi.de->next; + // Now link it to the dest hash table + wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask]; + d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de; + d->ht[0].used--; + d->ht[1].used++; + } + } + } + + /* Check if we already rehashed the whole table... */ + if (d->ht[0].used == 0 && d->asyncdata == nullptr) { + zfree(d->ht[0].table); + d->ht[0] = d->ht[1]; + _dictReset(&d->ht[1]); + d->rehashidx = -1; + } + } + + if (fFree) { + while (ctl->deGCList != nullptr) { + auto next = ctl->deGCList->next; + dictFreeKey(d, ctl->deGCList); + dictFreeVal(d, ctl->deGCList); + zfree(ctl->deGCList); + ctl->deGCList = next; + } + + // Was the dictionary free'd while we were in flight? + if (ctl->release) { + if (d->asyncdata != nullptr) + d->asyncdata->release = true; + else + dictRelease(d); + } + + delete ctl; + } +} + long long timeInMilliseconds(void) { struct timeval tv; @@ -527,9 +666,14 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { else d->ht[table].table[idx] = he->next; if (!nofree) { - dictFreeKey(d, he); - dictFreeVal(d, he); - zfree(he); + if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) { + he->next = d->asyncdata->deGCList; + d->asyncdata->deGCList = he->next; + } else { + dictFreeKey(d, he); + dictFreeVal(d, he); + zfree(he); + } } d->ht[table].used--; return he; @@ -577,9 +721,14 @@ dictEntry *dictUnlink(dict *ht, const void *key) { * to dictUnlink(). It's safe to call this function with 'he' = NULL. */ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) { if (he == NULL) return; - dictFreeKey(d, he); - dictFreeVal(d, he); - zfree(he); + if (d->asyncdata) { + he->next = d->asyncdata->deGCList; + d->asyncdata->deGCList = he; + } else { + dictFreeKey(d, he); + dictFreeVal(d, he); + zfree(he); + } } /* Destroy an entire dictionary */ @@ -595,9 +744,14 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { if ((he = ht->table[i]) == NULL) continue; while(he) { nextHe = he->next; - dictFreeKey(d, he); - dictFreeVal(d, he); - zfree(he); + if (d->asyncdata && i < d->rehashidx) { + he->next = d->asyncdata->deGCList; + d->asyncdata->deGCList = he; + } else { + dictFreeKey(d, he); + dictFreeVal(d, he); + zfree(he); + } ht->used--; he = nextHe; } @@ -612,6 +766,10 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { /* Clear & Release the hash table */ void dictRelease(dict *d) { + if (d->asyncdata) { + d->asyncdata->release = true; + return; + } _dictClear(d,&d->ht[0],NULL); _dictClear(d,&d->ht[1],NULL); zfree(d); diff --git a/src/dict.h b/src/dict.h index 13f9d2aaa..0be8e1c53 100644 --- a/src/dict.h +++ b/src/dict.h @@ -36,6 +36,8 @@ #include #ifdef __cplusplus +#include +#include extern "C" { #endif @@ -81,12 +83,40 @@ typedef struct dictht { unsigned long used; } dictht; +#ifdef __cplusplus +struct dictAsyncRehashCtl { + struct workItem { + dictEntry *de; + uint64_t hash; + workItem(dictEntry *de) { + this->de = de; + } + }; + + static const int c_targetQueueSize = 512; + dictEntry *deGCList = nullptr; + struct dict *dict = nullptr; + std::vector queue; + size_t hashIdx = 0; + bool release = false; + dictAsyncRehashCtl *next = nullptr; + std::atomic done { false }; + + dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { + queue.reserve(c_targetQueueSize); + } +}; +#else +struct dictAsyncRehashCtl; +#endif + typedef struct dict { dictType *type; void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ unsigned long iterators; /* number of iterators currently running */ + dictAsyncRehashCtl *asyncdata; } dict; /* If safe is set to 1 this is a safe iterator, that means, you can call @@ -193,6 +223,12 @@ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t h void dictForceRehash(dict *d); int dictMerge(dict *dst, dict *src); +/* Async Rehash Functions */ +dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize); +void dictRehashAsync(dictAsyncRehashCtl *ctl); +bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes); +void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree); + /* Hash table types */ extern dictType dictTypeHeapStringCopyKey; extern dictType dictTypeHeapStrings; diff --git a/src/expire.cpp b/src/expire.cpp index 21c1c39d0..7144d857c 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -137,7 +137,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { executeCronJobExpireHook(keyCopy, val); sdsfree(keyCopy); decrRefCount(val); - }, false, true /*fLock*/, true /*fForceQueue*/); + }, true /*fLock*/, true /*fForceQueue*/); } return; diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 4cada72cc..6e6b6e4d2 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -338,7 +338,7 @@ extern "C" void fastlock_init(struct fastlock *lock, const char *name) } #ifndef ASM_SPINLOCK -extern "C" void fastlock_lock(struct fastlock *lock) +extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker) { int pidOwner; __atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE); @@ -356,20 +356,32 @@ extern "C" void fastlock_lock(struct fastlock *lock) __atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED); unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000; - for (;;) - { - __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); - if ((ticketT.u & 0xffff) == myticket) - break; + if (worker != nullptr) { + for (;;) { + __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); + if ((ticketT.u & 0xffff) == myticket) + break; + if (!worker()) + goto LNormalLoop; + } + } else { +LNormalLoop: + for (;;) + { + __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); + if ((ticketT.u & 0xffff) == myticket) + break; #if defined(__i386__) || defined(__amd64__) - __asm__ __volatile__ ("pause"); + __asm__ __volatile__ ("pause"); #elif defined(__aarch64__) - __asm__ __volatile__ ("yield"); + __asm__ __volatile__ ("yield"); #endif - if ((++cloops % loopLimit) == 0) - { - fastlock_sleep(lock, tid, ticketT.u, myticket); + + if ((++cloops % loopLimit) == 0) + { + fastlock_sleep(lock, tid, ticketT.u, myticket); + } } } diff --git a/src/fastlock.h b/src/fastlock.h index 0809c8bcd..ede6d4e16 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -6,10 +6,16 @@ extern "C" { #endif +typedef int (*spin_worker)(); + /* Begin C API */ struct fastlock; void fastlock_init(struct fastlock *lock, const char *name); -void fastlock_lock(struct fastlock *lock); +#ifdef __cplusplus +void fastlock_lock(struct fastlock *lock, spin_worker worker = nullptr); +#else +void fastlock_lock(struct fastlock *lock, spin_worker worker); +#endif int fastlock_trylock(struct fastlock *lock, int fWeak); void fastlock_unlock(struct fastlock *lock); void fastlock_free(struct fastlock *lock); @@ -56,9 +62,9 @@ struct fastlock fastlock_init(this, name); } - inline void lock() + inline void lock(spin_worker worker = nullptr) { - fastlock_lock(this); + fastlock_lock(this, worker); } inline bool try_lock(bool fWeak = false) diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 26c302434..9e17f8995 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -22,14 +22,22 @@ fastlock_lock: # [rdi+64] ... # uint16_t active # uint16_t avail + # + # RSI points to a spin function to call, or NULL # First get our TID and put it in ecx - push rdi # we need our struct pointer (also balance the stack for the call) - .cfi_adjust_cfa_offset 8 + sub rsp, 24 # We only use 16 bytes, but we also need the stack aligned + .cfi_adjust_cfa_offset 24 + mov [rsp], rdi # we need our struct pointer (also balance the stack for the call) + mov [rsp+8], rsi # backup the spin function + call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining) mov esi, eax # back it up in esi - pop rdi # get our pointer back - .cfi_adjust_cfa_offset -8 + + mov rdi, [rsp] # Restore spin struct + mov r8, [rsp+8] # restore the function (in a different register) + add rsp, 24 + .cfi_adjust_cfa_offset -24 cmp [rdi], esi # Is the TID we got back the owner of the lock? je .LLocked # Don't spin in that case @@ -47,6 +55,8 @@ fastlock_lock: # ax now contains the ticket # OK Start the wait loop xor ecx, ecx + test r8, r8 + jnz .LLoopFunction .ALIGN 16 .LLoop: mov edx, [rdi+64] @@ -83,6 +93,40 @@ fastlock_lock: mov [rdi], esi # lock->m_pidOwner = gettid() inc dword ptr [rdi+4] # lock->m_depth++ ret + +.LLoopFunction: + sub rsp, 40 + .cfi_adjust_cfa_offset 40 + xor ecx, ecx + mov [rsp], rcx + mov [rsp+8], r8 + mov [rsp+16], rdi + mov [rsp+24], rsi + mov [rsp+32], eax +.LLoopFunctionCore: + mov edx, [rdi+64] + cmp dx, ax + je .LExitLoopFunction + mov r8, [rsp+8] + call r8 + test eax, eax + jz .LExitLoopFunctionForNormal + mov eax, [rsp+32] # restore clobbered eax + mov rdi, [rsp+16] + jmp .LLoopFunctionCore +.LExitLoopFunction: + mov rsi, [rsp+24] + add rsp, 40 + .cfi_adjust_cfa_offset -40 + jmp .LLocked +.LExitLoopFunctionForNormal: + xor ecx, ecx + mov rdi, [rsp+16] + mov rsi, [rsp+24] + mov eax, [rsp+32] + add rsp, 40 + .cfi_adjust_cfa_offset -40 + jmp .LLoop .cfi_endproc .ALIGN 16 diff --git a/src/module.cpp b/src/module.cpp index 62a11102a..ac064ac2b 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5046,7 +5046,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } -static bool g_fModuleThread = false; +thread_local bool g_fModuleThread = false; /* Acquire the server lock before executing a thread safe API call. * This is not needed for `RedisModule_Reply*` calls when there is * a blocked client connected to the thread safe context. */ @@ -5105,7 +5105,14 @@ void moduleAcquireGIL(int fServerThread) { } else { - s_mutexModule.lock(); + // It is possible that another module thread holds the GIL (and s_mutexModule as a result). + // When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns. + // This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns. + // As a result, a deadlock has occured. + // We release the lock on s_mutex and wait until we are able to safely acquire the GIL + // in order to prevent this deadlock from occuring. + while (!s_mutexModule.try_lock()) + s_cv.wait(lock); ++s_cAcquisitionsModule; fModuleGILWlocked++; } @@ -5644,6 +5651,9 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client * (If the time it takes to execute 'callback' is negligible the two * statements above mean the same) */ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) { + static uint64_t pending_key; + static mstime_t pending_period = -1; + RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL); timer->module = ctx->module; timer->callback = callback; @@ -5662,32 +5672,40 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod } } + bool fNeedPost = (pending_period < 0); // If pending_period is already set, then a PostFunction is in flight and we don't need to set a new one + if (pending_period < 0 || period < pending_period) { + pending_period = period; + pending_key = key; + } + /* We need to install the main event loop timer if it's not already * installed, or we may need to refresh its period if we just installed * a timer that will expire sooner than any other else (i.e. the timer * we just installed is the first timer in the Timers rax). */ - if (aeTimer != -1) { - raxIterator ri; - raxStart(&ri,Timers); - raxSeek(&ri,"^",NULL,0); - raxNext(&ri); - if (memcmp(ri.key,&key,sizeof(key)) == 0) { - /* This is the first key, we need to re-install the timer according - * to the just added event. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ - aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); - }, true /* synchronous */, false /* fLock */); - aeTimer = -1; - } - raxStop(&ri); - } + if (fNeedPost) { + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ + if (aeTimer != -1) { + raxIterator ri; + raxStart(&ri,Timers); + raxSeek(&ri,"^",NULL,0); + raxNext(&ri); + if (memcmp(ri.key,&pending_key,sizeof(key)) == 0) { + /* This is the first key, we need to re-install the timer according + * to the just added event. */ + aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); + aeTimer = -1; + } + raxStop(&ri); + } - /* If we have no main timer (the old one was invalidated, or this is the - * first module timer we have), install one. */ - if (aeTimer == -1) { - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ - aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); - }, true /* synchronous */, false /* fLock */); + /* If we have no main timer (the old one was invalidated, or this is the + * first module timer we have), install one. */ + if (aeTimer == -1) { + aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,pending_period,moduleTimerHandler,NULL,NULL); + } + + pending_period = -1; + }); } return key; diff --git a/src/networking.cpp b/src/networking.cpp index 6a5f4d392..80214963d 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -495,7 +495,7 @@ void addReplyErrorLength(client *c, const char *s, size_t len) { } /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */ -void afterErrorReply(client *c, const char *s, size_t len) { +void afterErrorReply(client *c, const char *s, size_t len, int severity = ERR_CRITICAL) { /* Sometimes it could be normal that a replica replies to a master with * an error and this function gets called. Actually the error will never * be sent because addReply*() against master clients has no effect... @@ -523,9 +523,30 @@ void afterErrorReply(client *c, const char *s, size_t len) { if (len > 4096) len = 4096; const char *cmdname = c->lastcmd ? c->lastcmd->name : ""; - serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " - "to its %s: '%.*s' after processing the command " - "'%s'", from, to, (int)len, s, cmdname); + switch (severity) { + case ERR_NOTICE: + serverLog(LL_NOTICE,"== NOTICE == This %s is rejecting a command " + "from its %s: '%.*s' after processing the command " + "'%s'", from, to, (int)len, s, cmdname); + break; + case ERR_WARNING: + serverLog(LL_WARNING,"== WARNING == This %s is rejecting a command " + "from its %s: '%.*s' after processing the command " + "'%s'", from, to, (int)len, s, cmdname); + break; + case ERR_ERROR: + serverLog(LL_WARNING,"== ERROR == This %s is sending an error " + "to its %s: '%.*s' after processing the command " + "'%s'", from, to, (int)len, s, cmdname); + break; + case ERR_CRITICAL: + default: + serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " + "to its %s: '%.*s' after processing the command " + "'%s'", from, to, (int)len, s, cmdname); + break; + } + if (ctype == CLIENT_TYPE_MASTER && g_pserver->repl_backlog && g_pserver->repl_backlog_histlen > 0) { @@ -537,9 +558,9 @@ void afterErrorReply(client *c, const char *s, size_t len) { /* The 'err' object is expected to start with -ERRORCODE and end with \r\n. * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */ -void addReplyErrorObject(client *c, robj *err) { +void addReplyErrorObject(client *c, robj *err, int severity) { addReply(c, err); - afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2); /* Ignore trailing \r\n */ + afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2, severity); /* Ignore trailing \r\n */ } void addReplyError(client *c, const char *err) { @@ -3302,6 +3323,8 @@ void flushSlavesOutputBuffers(void) { listIter li; listNode *ln; + flushReplBacklogToClients(); + listRewind(g_pserver->slaves,&li); while((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); @@ -3470,6 +3493,16 @@ void processEventsWhileBlocked(int iel) { locker.arm(nullptr); locker.release(); + // Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here) + for (int idb = 0; idb < cserver.dbnum; ++idb) { + redisDb *db = g_pserver->db[idb]; + while (db->dictUnsafeKeyOnly()->asyncdata != nullptr) { + if (!db->dictUnsafeKeyOnly()->asyncdata->done) + break; + dictCompleteRehashAsync(db->dictUnsafeKeyOnly()->asyncdata, false /*fFree*/); + } + } + // Restore it so the calling code is not confused if (fReplBacklog && !serverTL->el->stop) { g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; diff --git a/src/server.cpp b/src/server.cpp index de74548ae..137976437 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -690,7 +690,7 @@ struct redisCommand redisCommandTable[] = { * failure detection, and a loading server is considered to be * not available. */ {"ping",pingCommand,-1, - "ok-stale fast @connection @replication", + "ok-stale ok-loading fast @connection @replication", 0,NULL,0,0,0,0,0,0}, {"echo",echoCommand,2, @@ -1558,14 +1558,14 @@ void tryResizeHashTables(int dbid) { * table will use two tables for a long time. So we try to use 1 millisecond * of CPU time at every call of this function to perform some rehashing. * - * The function returns 1 if some rehashing was performed, otherwise 0 + * The function returns the number of rehashes if some rehashing was performed, otherwise 0 * is returned. */ int redisDbPersistentData::incrementallyRehash() { /* Keys dictionary */ if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) { - dictRehashMilliseconds(m_pdict,1); - dictRehashMilliseconds(m_pdictTombstone,1); - return 1; /* already used our millisecond for this loop... */ + int result = dictRehashMilliseconds(m_pdict,1); + result += dictRehashMilliseconds(m_pdictTombstone,1); + return result; /* already used our millisecond for this loop... */ } return 0; } @@ -1884,22 +1884,32 @@ bool expireOwnKeys() return false; } +int hash_spin_worker() { + auto ctl = serverTL->rehashCtl; + return dictRehashSomeAsync(ctl, 1); +} + /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ -void databasesCron(void) { - /* Expire keys by random sampling. Not required for slaves - * as master will synthesize DELs for us. */ - if (g_pserver->active_expire_enabled) { - if (expireOwnKeys()) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else { - expireSlaveKeys(); +void databasesCron(bool fMainThread) { + serverAssert(GlobalLocksAcquired()); + static int rehashes_per_ms = 0; + static int async_rehashes = 0; + if (fMainThread) { + /* Expire keys by random sampling. Not required for slaves + * as master will synthesize DELs for us. */ + if (g_pserver->active_expire_enabled) { + if (expireOwnKeys()) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); + } else { + expireSlaveKeys(); + } } - } - /* Defrag keys gradually. */ - activeDefragCycle(); + /* Defrag keys gradually. */ + activeDefragCycle(); + } /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad @@ -1916,28 +1926,70 @@ void databasesCron(void) { /* Don't test more DBs than we have. */ if (dbs_per_call > cserver.dbnum) dbs_per_call = cserver.dbnum; - /* Resize */ - for (j = 0; j < dbs_per_call; j++) { - tryResizeHashTables(resize_db % cserver.dbnum); - resize_db++; + if (fMainThread) { + /* Resize */ + for (j = 0; j < dbs_per_call; j++) { + tryResizeHashTables(resize_db % cserver.dbnum); + resize_db++; + } } /* Rehash */ if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { - int work_done = g_pserver->db[rehash_db]->incrementallyRehash(); - if (work_done) { - /* If the function did some work, stop here, we'll do - * more at the next cron loop. */ + if (serverTL->rehashCtl != nullptr) { + if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) { + break; + } else { + dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); + serverTL->rehashCtl = nullptr; + } + } + + serverAssert(serverTL->rehashCtl == nullptr); + ::dict *dict = g_pserver->db[rehash_db]->dictUnsafeKeyOnly(); + /* Are we async rehashing? And if so is it time to re-calibrate? */ + /* The recalibration limit is a prime number to ensure balancing across threads */ + if (rehashes_per_ms > 0 && async_rehashes < 131 && cserver.active_defrag_enabled) { + serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms); + ++async_rehashes; + } + if (serverTL->rehashCtl) break; - } else { - /* If this db didn't need rehash, we'll try the next one. */ + + // Before starting anything new, can we end the rehash of a blocked thread? + if (dict->asyncdata != nullptr) { + auto asyncdata = dict->asyncdata; + if (asyncdata->done) { + dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer + serverAssert(dict->asyncdata != asyncdata); + break; // completion can be expensive, don't do anything else + } + } + + rehashes_per_ms = g_pserver->db[rehash_db]->incrementallyRehash(); + async_rehashes = 0; + if (rehashes_per_ms > 0) { + /* If the function did some work, stop here, we'll do + * more at the next cron loop. */ + if (!cserver.active_defrag_enabled) { + serverLog(LL_VERBOSE, "Calibrated rehashes per ms: %d", rehashes_per_ms); + } + break; + } else if (dict->asyncdata == nullptr) { + /* If this db didn't need rehash and we have none in flight, we'll try the next one. */ rehash_db++; rehash_db %= cserver.dbnum; } } } } + + if (serverTL->rehashCtl) { + setAeLockSetThreadSpinWorker(hash_spin_worker); + } else { + setAeLockSetThreadSpinWorker(nullptr); + } } /* We take a cached value of the unix time in the global state because with @@ -2232,7 +2284,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { clientsCron(IDX_EVENT_LOOP_MAIN); /* Handle background operations on Redis databases. */ - databasesCron(); + databasesCron(true /* fMainThread */); /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ @@ -2413,6 +2465,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData processUnblockedClients(iel); } + /* Handle background operations on Redis databases. */ + databasesCron(false /* fMainThread */); + /* Unpause clients if enough time has elapsed */ unpauseClientsIfNecessary(); @@ -3002,7 +3057,7 @@ int setOOMScoreAdj(int process_class) { int val; char buf[64]; - val = g_pserver->oom_score_adj_base + g_pserver->oom_score_adj_values[process_class]; + val = g_pserver->oom_score_adj_values[process_class]; if (g_pserver->oom_score_adj == OOM_SCORE_RELATIVE) val += g_pserver->oom_score_adj_base; if (val > 1000) val = 1000; @@ -4028,13 +4083,14 @@ void call(client *c, int flags) { * If there's a transaction is flags it as dirty, and if the command is EXEC, * it aborts the transaction. * Note: 'reply' is expected to end with \r\n */ -void rejectCommand(client *c, robj *reply) { +void rejectCommand(client *c, robj *reply, int severity = ERR_CRITICAL) { flagTransaction(c); if (c->cmd && c->cmd->proc == execCommand) { execCommandAbort(c, szFromObj(reply)); - } else { + } + else { /* using addReplyError* rather than addReply so that the error can be logged. */ - addReplyErrorObject(c, reply); + addReplyErrorObject(c, reply, severity); } } @@ -4283,7 +4339,7 @@ int processCommand(client *c, int callFlags) { /* Active Replicas can execute read only commands, and optionally write commands */ if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad))) { - rejectCommand(c, shared.loadingerr); + rejectCommand(c, shared.loadingerr, ERR_WARNING); return C_OK; } } @@ -4345,7 +4401,7 @@ bool client::postFunction(std::function fn, bool fLock) { std::lock_guardlock)> lock(this->lock); fn(this); --casyncOpsPending; - }, false, fLock) == AE_OK; + }, fLock) == AE_OK; } /*================================== Shutdown =============================== */ @@ -4474,6 +4530,8 @@ int prepareForShutdown(int flags) { /* Best effort flush of replica output buffers, so that we hopefully * send them pending writes. */ flushSlavesOutputBuffers(); + g_pserver->repl_batch_idxStart = -1; + g_pserver->repl_batch_offStart = -1; /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); @@ -5484,7 +5542,7 @@ int linuxMadvFreeForkBugCheck(void) { const long map_size = 3 * 4096; /* Create a memory map that's in our full control (not one used by the allocator). */ - p = mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); + p = (char*)mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0); serverAssert(p != MAP_FAILED); q = p + 4096; diff --git a/src/server.h b/src/server.h index e6371f8d0..3f06c9f8b 100644 --- a/src/server.h +++ b/src/server.h @@ -611,6 +611,12 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define LL_WARNING 3 #define LL_RAW (1<<10) /* Modifier to log without timestamp */ +/* Error severity levels */ +#define ERR_CRITICAL 0 +#define ERR_ERROR 1 +#define ERR_WARNING 2 +#define ERR_NOTICE 3 + /* Supervision options */ #define SUPERVISED_NONE 0 #define SUPERVISED_AUTODETECT 1 @@ -1822,6 +1828,7 @@ struct redisServerThreadVars { const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; bool fRetrySetAofEvent = false; std::vector vecclientsProcess; + dictAsyncRehashCtl *rehashCtl = nullptr; int getRdbKeySaveDelay(); private: @@ -2529,7 +2536,7 @@ void addReplyBulkLongLong(client *c, long long ll); void addReply(client *c, robj_roptr obj); void addReplySds(client *c, sds s); void addReplyBulkSds(client *c, sds s); -void addReplyErrorObject(client *c, robj *err); +void addReplyErrorObject(client *c, robj *err, int severity); void addReplyErrorSds(client *c, sds err); void addReplyError(client *c, const char *err); void addReplyStatus(client *c, const char *status); diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 741281cfc..414733d1e 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -246,7 +246,7 @@ test_slave_buffers {slave buffer are counted correctly} 1000000 10 0 1 # test again with fewer (and bigger) commands without pipeline, but with eviction test_slave_buffers "replica buffer don't induce eviction" 100000 100 1 0 -start_server {tags {"maxmemory"}} { +start_server {tags {"maxmemory"} overrides {server-threads 1}} { test {client tracking don't cause eviction feedback loop} { r config set maxmemory 0 r config set maxmemory-policy allkeys-lru @@ -308,4 +308,4 @@ start_server {tags {"maxmemory"}} { if {$::verbose} { puts "evicted: $evicted" } } } -}; #run_solo \ No newline at end of file +}; #run_solo diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index c4af59bd2..a9387e757 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -132,6 +132,7 @@ tags "modules" { } $replica replicaof no one + after 300 test {Test role-master hook} { assert_equal [r hooks.event_count role-replica] 1 diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index 456d3ac82..1fbd29ff8 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -1,4 +1,4 @@ -start_server {tags {"obuf-limits"}} { +start_server {tags {"obuf-limits"} overrides { server-threads 1 }} { test {Client output buffer hard limit is enforced} { r config set client-output-buffer-limit {pubsub 100000 0 0} set rd1 [redis_deferring_client] diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index e3cd2f87f..b41956f9a 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -448,8 +448,8 @@ start_server {tags {"scripting"}} { set start [clock clicks -milliseconds] $rd eval {redis.call('set',KEYS[1],'y'); for i=1,1500000 do redis.call('ping') end return 'ok'} 1 x $rd flush - after 100 - catch {r ping} err + after 200 + catch {r echo "foo"} err assert_match {BUSY*} $err $rd read set elapsed [expr [clock clicks -milliseconds]-$start] @@ -457,8 +457,8 @@ start_server {tags {"scripting"}} { set start [clock clicks -milliseconds] $rd debug loadaof $rd flush - after 100 - catch {r ping} err + after 200 + catch {r echo "foo"} err assert_match {LOADING*} $err $rd read set elapsed [expr [clock clicks -milliseconds]-$start]