Merge branch 'unstable' into keydbpro
Former-commit-id: e2140793f2bf565972ada799af73bf4457e2718d
This commit is contained in:
commit
078abba316
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -20,7 +20,7 @@ jobs:
|
||||
run: ./utils/gen-test-certs.sh
|
||||
- name: test-tls
|
||||
run: |
|
||||
sudo apt-get -y install tcl8.5 tcl-tls
|
||||
sudo apt-get -y install tcl tcl-tls
|
||||
./runtest --clients 2 --verbose --tls
|
||||
- name: cluster-test
|
||||
run: |
|
||||
|
79
src/ae.cpp
79
src/ae.cpp
@ -109,13 +109,6 @@ enum class AE_ASYNC_OP
|
||||
CreateFileEvent,
|
||||
};
|
||||
|
||||
struct aeCommandControl
|
||||
{
|
||||
std::condition_variable cv;
|
||||
std::atomic<int> rval;
|
||||
std::mutex mutexcv;
|
||||
};
|
||||
|
||||
struct aeCommand
|
||||
{
|
||||
AE_ASYNC_OP op;
|
||||
@ -128,7 +121,6 @@ struct aeCommand
|
||||
std::function<void()> *pfn;
|
||||
};
|
||||
void *clientData;
|
||||
aeCommandControl *pctl;
|
||||
};
|
||||
#ifdef PIPE_BUF
|
||||
static_assert(sizeof(aeCommand) <= PIPE_BUF, "aeCommand must be small enough to send atomically through a pipe");
|
||||
@ -152,19 +144,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
||||
break;
|
||||
|
||||
case AE_ASYNC_OP::CreateFileEvent:
|
||||
{
|
||||
if (cmd.pctl != nullptr)
|
||||
{
|
||||
cmd.pctl->mutexcv.lock();
|
||||
std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData));
|
||||
cmd.pctl->cv.notify_all();
|
||||
cmd.pctl->mutexcv.unlock();
|
||||
}
|
||||
else
|
||||
{
|
||||
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
|
||||
}
|
||||
}
|
||||
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
|
||||
break;
|
||||
|
||||
case AE_ASYNC_OP::PostFunction:
|
||||
@ -178,19 +158,11 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
||||
|
||||
case AE_ASYNC_OP::PostCppFunction:
|
||||
{
|
||||
if (cmd.pctl != nullptr)
|
||||
cmd.pctl->mutexcv.lock();
|
||||
|
||||
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
||||
if (cmd.fLock)
|
||||
ulock.lock();
|
||||
(*cmd.pfn)();
|
||||
|
||||
if (cmd.pctl != nullptr)
|
||||
{
|
||||
cmd.pctl->cv.notify_all();
|
||||
cmd.pctl->mutexcv.unlock();
|
||||
}
|
||||
|
||||
delete cmd.pfn;
|
||||
}
|
||||
break;
|
||||
@ -229,7 +201,7 @@ ssize_t safe_write(int fd, const void *pv, size_t cb)
|
||||
}
|
||||
|
||||
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData, int fSynchronous)
|
||||
aeFileProc *proc, void *clientData)
|
||||
{
|
||||
if (eventLoop == g_eventLoopThisThread)
|
||||
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
|
||||
@ -242,13 +214,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
cmd.mask = mask;
|
||||
cmd.fproc = proc;
|
||||
cmd.clientData = clientData;
|
||||
cmd.pctl = nullptr;
|
||||
cmd.fLock = true;
|
||||
if (fSynchronous)
|
||||
{
|
||||
cmd.pctl = new aeCommandControl();
|
||||
cmd.pctl->mutexcv.lock();
|
||||
}
|
||||
|
||||
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||
if (size != sizeof(cmd))
|
||||
@ -257,16 +223,6 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
serverAssert(errno == EAGAIN);
|
||||
ret = AE_ERR;
|
||||
}
|
||||
|
||||
if (fSynchronous)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
|
||||
cmd.pctl->cv.wait(ulock);
|
||||
ret = cmd.pctl->rval;
|
||||
}
|
||||
delete cmd.pctl;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -289,7 +245,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
|
||||
return AE_OK;
|
||||
}
|
||||
|
||||
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock, bool fForceQueue)
|
||||
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock, bool fForceQueue)
|
||||
{
|
||||
if (eventLoop == g_eventLoopThisThread && !fForceQueue)
|
||||
{
|
||||
@ -300,13 +256,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
||||
aeCommand cmd = {};
|
||||
cmd.op = AE_ASYNC_OP::PostCppFunction;
|
||||
cmd.pfn = new std::function<void()>(fn);
|
||||
cmd.pctl = nullptr;
|
||||
cmd.fLock = fLock;
|
||||
if (fSynchronous)
|
||||
{
|
||||
cmd.pctl = new aeCommandControl();
|
||||
cmd.pctl->mutexcv.lock();
|
||||
}
|
||||
|
||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||
if (!(!size || size == sizeof(cmd))) {
|
||||
@ -316,17 +266,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
||||
|
||||
if (size == 0)
|
||||
return AE_ERR;
|
||||
int ret = AE_OK;
|
||||
if (fSynchronous)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
|
||||
cmd.pctl->cv.wait(ulock);
|
||||
ret = cmd.pctl->rval;
|
||||
}
|
||||
delete cmd.pctl;
|
||||
}
|
||||
return ret;
|
||||
|
||||
return AE_OK;
|
||||
}
|
||||
|
||||
aeEventLoop *aeCreateEventLoop(int setsize) {
|
||||
@ -904,9 +845,15 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep,
|
||||
eventLoop->aftersleepFlags = flags;
|
||||
}
|
||||
|
||||
thread_local spin_worker tl_worker = nullptr;
|
||||
void setAeLockSetThreadSpinWorker(spin_worker worker)
|
||||
{
|
||||
tl_worker = worker;
|
||||
}
|
||||
|
||||
void aeAcquireLock()
|
||||
{
|
||||
g_lock.lock();
|
||||
g_lock.lock(tl_worker);
|
||||
}
|
||||
|
||||
int aeTryAcquireLock(int fWeak)
|
||||
|
5
src/ae.h
5
src/ae.h
@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
|
||||
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
|
||||
#ifdef __cplusplus
|
||||
} // EXTERN C
|
||||
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false);
|
||||
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock = true, bool fForceQueue = false);
|
||||
extern "C" {
|
||||
#endif
|
||||
void aeDeleteEventLoop(aeEventLoop *eventLoop);
|
||||
@ -144,7 +144,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData);
|
||||
|
||||
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||
aeFileProc *proc, void *clientData, int fSynchronous);
|
||||
aeFileProc *proc, void *clientData);
|
||||
|
||||
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
|
||||
void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask);
|
||||
@ -164,6 +164,7 @@ aeEventLoop *aeGetCurrentEventLoop();
|
||||
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
||||
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
|
||||
|
||||
void setAeLockSetThreadSpinWorker(spin_worker worker);
|
||||
void aeAcquireLock();
|
||||
int aeTryAcquireLock(int fWeak);
|
||||
void aeReleaseLock();
|
||||
|
178
src/dict.cpp
178
src/dict.cpp
@ -128,6 +128,7 @@ int _dictInit(dict *d, dictType *type,
|
||||
d->privdata = privDataPtr;
|
||||
d->rehashidx = -1;
|
||||
d->iterators = 0;
|
||||
d->asyncdata = nullptr;
|
||||
return DICT_OK;
|
||||
}
|
||||
|
||||
@ -328,13 +329,13 @@ int dictMerge(dict *dst, dict *src)
|
||||
int dictRehash(dict *d, int n) {
|
||||
int empty_visits = n*10; /* Max number of empty buckets to visit. */
|
||||
if (!dictIsRehashing(d)) return 0;
|
||||
if (d->asyncdata) return 0;
|
||||
|
||||
while(n-- && d->ht[0].used != 0) {
|
||||
dictEntry *de, *nextde;
|
||||
|
||||
/* Note that rehashidx can't overflow as we are sure there are more
|
||||
* elements because ht[0].used != 0 */
|
||||
assert(d->ht[0].size > (unsigned long)d->rehashidx);
|
||||
while(d->ht[0].table[d->rehashidx] == NULL) {
|
||||
d->rehashidx++;
|
||||
if (--empty_visits == 0) return 1;
|
||||
@ -370,6 +371,144 @@ int dictRehash(dict *d, int n) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
|
||||
if (!dictIsRehashing(d)) return 0;
|
||||
|
||||
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
|
||||
|
||||
int empty_visits = buckets * 10;
|
||||
|
||||
while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) {
|
||||
dictEntry *de;
|
||||
|
||||
/* Note that rehashidx can't overflow as we are sure there are more
|
||||
* elements because ht[0].used != 0 */
|
||||
while(d->ht[0].table[d->rehashidx] == NULL) {
|
||||
d->rehashidx++;
|
||||
if (--empty_visits == 0) goto LDone;
|
||||
if (d->rehashidx >= d->ht[0].size) goto LDone;
|
||||
}
|
||||
|
||||
de = d->ht[0].table[d->rehashidx];
|
||||
// We have to queue every node in the bucket, even if we go over our target size
|
||||
while (de != nullptr) {
|
||||
d->asyncdata->queue.emplace_back(de);
|
||||
de = de->next;
|
||||
}
|
||||
d->rehashidx++;
|
||||
}
|
||||
|
||||
LDone:
|
||||
if (d->asyncdata->queue.empty()) {
|
||||
// We didn't find any work to do (can happen if there is a large gap in the hash table)
|
||||
auto asyncT = d->asyncdata;
|
||||
d->asyncdata = d->asyncdata->next;
|
||||
delete asyncT;
|
||||
return nullptr;
|
||||
}
|
||||
return d->asyncdata;
|
||||
}
|
||||
|
||||
void dictRehashAsync(dictAsyncRehashCtl *ctl) {
|
||||
for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) {
|
||||
auto &wi = ctl->queue[idx];
|
||||
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
|
||||
}
|
||||
ctl->hashIdx = ctl->queue.size();
|
||||
ctl->done = true;
|
||||
}
|
||||
|
||||
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
|
||||
size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size());
|
||||
for (; ctl->hashIdx < max; ++ctl->hashIdx) {
|
||||
auto &wi = ctl->queue[ctl->hashIdx];
|
||||
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
|
||||
}
|
||||
|
||||
if (ctl->hashIdx == ctl->queue.size()) ctl->done = true;
|
||||
return ctl->hashIdx < ctl->queue.size();
|
||||
}
|
||||
|
||||
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
|
||||
dict *d = ctl->dict;
|
||||
assert(ctl->done);
|
||||
|
||||
// Unlink ourselves
|
||||
bool fUnlinked = false;
|
||||
dictAsyncRehashCtl **pprev = &d->asyncdata;
|
||||
|
||||
while (*pprev != nullptr) {
|
||||
if (*pprev == ctl) {
|
||||
*pprev = ctl->next;
|
||||
fUnlinked = true;
|
||||
break;
|
||||
}
|
||||
pprev = &((*pprev)->next);
|
||||
}
|
||||
|
||||
if (fUnlinked) {
|
||||
if (ctl->next != nullptr && ctl->deGCList != nullptr) {
|
||||
// An older work item may depend on our free list, so pass our free list to them
|
||||
dictEntry **deGC = &ctl->next->deGCList;
|
||||
while (*deGC != nullptr) deGC = &((*deGC)->next);
|
||||
*deGC = ctl->deGCList;
|
||||
ctl->deGCList = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
if (fUnlinked && !ctl->release) {
|
||||
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
|
||||
for (auto &wi : ctl->queue) {
|
||||
// We need to remove it from the source hash table, and store it in the dest.
|
||||
// Note that it may have been deleted in the meantime and therefore not exist.
|
||||
// In this case it will be in the garbage collection list
|
||||
|
||||
dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask];
|
||||
while (*pdePrev != nullptr && *pdePrev != wi.de) {
|
||||
pdePrev = &((*pdePrev)->next);
|
||||
}
|
||||
if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list
|
||||
assert(*pdePrev == wi.de);
|
||||
*pdePrev = wi.de->next;
|
||||
// Now link it to the dest hash table
|
||||
wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask];
|
||||
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
|
||||
d->ht[0].used--;
|
||||
d->ht[1].used++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Check if we already rehashed the whole table... */
|
||||
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
|
||||
zfree(d->ht[0].table);
|
||||
d->ht[0] = d->ht[1];
|
||||
_dictReset(&d->ht[1]);
|
||||
d->rehashidx = -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (fFree) {
|
||||
while (ctl->deGCList != nullptr) {
|
||||
auto next = ctl->deGCList->next;
|
||||
dictFreeKey(d, ctl->deGCList);
|
||||
dictFreeVal(d, ctl->deGCList);
|
||||
zfree(ctl->deGCList);
|
||||
ctl->deGCList = next;
|
||||
}
|
||||
|
||||
// Was the dictionary free'd while we were in flight?
|
||||
if (ctl->release) {
|
||||
if (d->asyncdata != nullptr)
|
||||
d->asyncdata->release = true;
|
||||
else
|
||||
dictRelease(d);
|
||||
}
|
||||
|
||||
delete ctl;
|
||||
}
|
||||
}
|
||||
|
||||
long long timeInMilliseconds(void) {
|
||||
struct timeval tv;
|
||||
|
||||
@ -527,9 +666,14 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
|
||||
else
|
||||
d->ht[table].table[idx] = he->next;
|
||||
if (!nofree) {
|
||||
dictFreeKey(d, he);
|
||||
dictFreeVal(d, he);
|
||||
zfree(he);
|
||||
if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) {
|
||||
he->next = d->asyncdata->deGCList;
|
||||
d->asyncdata->deGCList = he->next;
|
||||
} else {
|
||||
dictFreeKey(d, he);
|
||||
dictFreeVal(d, he);
|
||||
zfree(he);
|
||||
}
|
||||
}
|
||||
d->ht[table].used--;
|
||||
return he;
|
||||
@ -577,9 +721,14 @@ dictEntry *dictUnlink(dict *ht, const void *key) {
|
||||
* to dictUnlink(). It's safe to call this function with 'he' = NULL. */
|
||||
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
|
||||
if (he == NULL) return;
|
||||
dictFreeKey(d, he);
|
||||
dictFreeVal(d, he);
|
||||
zfree(he);
|
||||
if (d->asyncdata) {
|
||||
he->next = d->asyncdata->deGCList;
|
||||
d->asyncdata->deGCList = he;
|
||||
} else {
|
||||
dictFreeKey(d, he);
|
||||
dictFreeVal(d, he);
|
||||
zfree(he);
|
||||
}
|
||||
}
|
||||
|
||||
/* Destroy an entire dictionary */
|
||||
@ -595,9 +744,14 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
||||
if ((he = ht->table[i]) == NULL) continue;
|
||||
while(he) {
|
||||
nextHe = he->next;
|
||||
dictFreeKey(d, he);
|
||||
dictFreeVal(d, he);
|
||||
zfree(he);
|
||||
if (d->asyncdata && i < d->rehashidx) {
|
||||
he->next = d->asyncdata->deGCList;
|
||||
d->asyncdata->deGCList = he;
|
||||
} else {
|
||||
dictFreeKey(d, he);
|
||||
dictFreeVal(d, he);
|
||||
zfree(he);
|
||||
}
|
||||
ht->used--;
|
||||
he = nextHe;
|
||||
}
|
||||
@ -612,6 +766,10 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
||||
/* Clear & Release the hash table */
|
||||
void dictRelease(dict *d)
|
||||
{
|
||||
if (d->asyncdata) {
|
||||
d->asyncdata->release = true;
|
||||
return;
|
||||
}
|
||||
_dictClear(d,&d->ht[0],NULL);
|
||||
_dictClear(d,&d->ht[1],NULL);
|
||||
zfree(d);
|
||||
|
36
src/dict.h
36
src/dict.h
@ -36,6 +36,8 @@
|
||||
#include <stdint.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
#include <vector>
|
||||
#include <atomic>
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
@ -81,12 +83,40 @@ typedef struct dictht {
|
||||
unsigned long used;
|
||||
} dictht;
|
||||
|
||||
#ifdef __cplusplus
|
||||
struct dictAsyncRehashCtl {
|
||||
struct workItem {
|
||||
dictEntry *de;
|
||||
uint64_t hash;
|
||||
workItem(dictEntry *de) {
|
||||
this->de = de;
|
||||
}
|
||||
};
|
||||
|
||||
static const int c_targetQueueSize = 512;
|
||||
dictEntry *deGCList = nullptr;
|
||||
struct dict *dict = nullptr;
|
||||
std::vector<workItem> queue;
|
||||
size_t hashIdx = 0;
|
||||
bool release = false;
|
||||
dictAsyncRehashCtl *next = nullptr;
|
||||
std::atomic<bool> done { false };
|
||||
|
||||
dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
|
||||
queue.reserve(c_targetQueueSize);
|
||||
}
|
||||
};
|
||||
#else
|
||||
struct dictAsyncRehashCtl;
|
||||
#endif
|
||||
|
||||
typedef struct dict {
|
||||
dictType *type;
|
||||
void *privdata;
|
||||
dictht ht[2];
|
||||
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
||||
unsigned long iterators; /* number of iterators currently running */
|
||||
dictAsyncRehashCtl *asyncdata;
|
||||
} dict;
|
||||
|
||||
/* If safe is set to 1 this is a safe iterator, that means, you can call
|
||||
@ -193,6 +223,12 @@ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t h
|
||||
void dictForceRehash(dict *d);
|
||||
int dictMerge(dict *dst, dict *src);
|
||||
|
||||
/* Async Rehash Functions */
|
||||
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize);
|
||||
void dictRehashAsync(dictAsyncRehashCtl *ctl);
|
||||
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes);
|
||||
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree);
|
||||
|
||||
/* Hash table types */
|
||||
extern dictType dictTypeHeapStringCopyKey;
|
||||
extern dictType dictTypeHeapStrings;
|
||||
|
@ -137,7 +137,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
||||
executeCronJobExpireHook(keyCopy, val);
|
||||
sdsfree(keyCopy);
|
||||
decrRefCount(val);
|
||||
}, false, true /*fLock*/, true /*fForceQueue*/);
|
||||
}, true /*fLock*/, true /*fForceQueue*/);
|
||||
}
|
||||
return;
|
||||
|
||||
|
@ -338,7 +338,7 @@ extern "C" void fastlock_init(struct fastlock *lock, const char *name)
|
||||
}
|
||||
|
||||
#ifndef ASM_SPINLOCK
|
||||
extern "C" void fastlock_lock(struct fastlock *lock)
|
||||
extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker)
|
||||
{
|
||||
int pidOwner;
|
||||
__atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE);
|
||||
@ -356,20 +356,32 @@ extern "C" void fastlock_lock(struct fastlock *lock)
|
||||
__atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED);
|
||||
unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000;
|
||||
|
||||
for (;;)
|
||||
{
|
||||
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
|
||||
if ((ticketT.u & 0xffff) == myticket)
|
||||
break;
|
||||
if (worker != nullptr) {
|
||||
for (;;) {
|
||||
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
|
||||
if ((ticketT.u & 0xffff) == myticket)
|
||||
break;
|
||||
if (!worker())
|
||||
goto LNormalLoop;
|
||||
}
|
||||
} else {
|
||||
LNormalLoop:
|
||||
for (;;)
|
||||
{
|
||||
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
|
||||
if ((ticketT.u & 0xffff) == myticket)
|
||||
break;
|
||||
|
||||
#if defined(__i386__) || defined(__amd64__)
|
||||
__asm__ __volatile__ ("pause");
|
||||
__asm__ __volatile__ ("pause");
|
||||
#elif defined(__aarch64__)
|
||||
__asm__ __volatile__ ("yield");
|
||||
__asm__ __volatile__ ("yield");
|
||||
#endif
|
||||
if ((++cloops % loopLimit) == 0)
|
||||
{
|
||||
fastlock_sleep(lock, tid, ticketT.u, myticket);
|
||||
|
||||
if ((++cloops % loopLimit) == 0)
|
||||
{
|
||||
fastlock_sleep(lock, tid, ticketT.u, myticket);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,10 +6,16 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef int (*spin_worker)();
|
||||
|
||||
/* Begin C API */
|
||||
struct fastlock;
|
||||
void fastlock_init(struct fastlock *lock, const char *name);
|
||||
void fastlock_lock(struct fastlock *lock);
|
||||
#ifdef __cplusplus
|
||||
void fastlock_lock(struct fastlock *lock, spin_worker worker = nullptr);
|
||||
#else
|
||||
void fastlock_lock(struct fastlock *lock, spin_worker worker);
|
||||
#endif
|
||||
int fastlock_trylock(struct fastlock *lock, int fWeak);
|
||||
void fastlock_unlock(struct fastlock *lock);
|
||||
void fastlock_free(struct fastlock *lock);
|
||||
@ -56,9 +62,9 @@ struct fastlock
|
||||
fastlock_init(this, name);
|
||||
}
|
||||
|
||||
inline void lock()
|
||||
inline void lock(spin_worker worker = nullptr)
|
||||
{
|
||||
fastlock_lock(this);
|
||||
fastlock_lock(this, worker);
|
||||
}
|
||||
|
||||
inline bool try_lock(bool fWeak = false)
|
||||
|
@ -22,14 +22,22 @@ fastlock_lock:
|
||||
# [rdi+64] ...
|
||||
# uint16_t active
|
||||
# uint16_t avail
|
||||
#
|
||||
# RSI points to a spin function to call, or NULL
|
||||
|
||||
# First get our TID and put it in ecx
|
||||
push rdi # we need our struct pointer (also balance the stack for the call)
|
||||
.cfi_adjust_cfa_offset 8
|
||||
sub rsp, 24 # We only use 16 bytes, but we also need the stack aligned
|
||||
.cfi_adjust_cfa_offset 24
|
||||
mov [rsp], rdi # we need our struct pointer (also balance the stack for the call)
|
||||
mov [rsp+8], rsi # backup the spin function
|
||||
|
||||
call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining)
|
||||
mov esi, eax # back it up in esi
|
||||
pop rdi # get our pointer back
|
||||
.cfi_adjust_cfa_offset -8
|
||||
|
||||
mov rdi, [rsp] # Restore spin struct
|
||||
mov r8, [rsp+8] # restore the function (in a different register)
|
||||
add rsp, 24
|
||||
.cfi_adjust_cfa_offset -24
|
||||
|
||||
cmp [rdi], esi # Is the TID we got back the owner of the lock?
|
||||
je .LLocked # Don't spin in that case
|
||||
@ -47,6 +55,8 @@ fastlock_lock:
|
||||
# ax now contains the ticket
|
||||
# OK Start the wait loop
|
||||
xor ecx, ecx
|
||||
test r8, r8
|
||||
jnz .LLoopFunction
|
||||
.ALIGN 16
|
||||
.LLoop:
|
||||
mov edx, [rdi+64]
|
||||
@ -83,6 +93,40 @@ fastlock_lock:
|
||||
mov [rdi], esi # lock->m_pidOwner = gettid()
|
||||
inc dword ptr [rdi+4] # lock->m_depth++
|
||||
ret
|
||||
|
||||
.LLoopFunction:
|
||||
sub rsp, 40
|
||||
.cfi_adjust_cfa_offset 40
|
||||
xor ecx, ecx
|
||||
mov [rsp], rcx
|
||||
mov [rsp+8], r8
|
||||
mov [rsp+16], rdi
|
||||
mov [rsp+24], rsi
|
||||
mov [rsp+32], eax
|
||||
.LLoopFunctionCore:
|
||||
mov edx, [rdi+64]
|
||||
cmp dx, ax
|
||||
je .LExitLoopFunction
|
||||
mov r8, [rsp+8]
|
||||
call r8
|
||||
test eax, eax
|
||||
jz .LExitLoopFunctionForNormal
|
||||
mov eax, [rsp+32] # restore clobbered eax
|
||||
mov rdi, [rsp+16]
|
||||
jmp .LLoopFunctionCore
|
||||
.LExitLoopFunction:
|
||||
mov rsi, [rsp+24]
|
||||
add rsp, 40
|
||||
.cfi_adjust_cfa_offset -40
|
||||
jmp .LLocked
|
||||
.LExitLoopFunctionForNormal:
|
||||
xor ecx, ecx
|
||||
mov rdi, [rsp+16]
|
||||
mov rsi, [rsp+24]
|
||||
mov eax, [rsp+32]
|
||||
add rsp, 40
|
||||
.cfi_adjust_cfa_offset -40
|
||||
jmp .LLoop
|
||||
.cfi_endproc
|
||||
|
||||
.ALIGN 16
|
||||
|
@ -5046,7 +5046,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
||||
zfree(ctx);
|
||||
}
|
||||
|
||||
static bool g_fModuleThread = false;
|
||||
thread_local bool g_fModuleThread = false;
|
||||
/* Acquire the server lock before executing a thread safe API call.
|
||||
* This is not needed for `RedisModule_Reply*` calls when there is
|
||||
* a blocked client connected to the thread safe context. */
|
||||
@ -5105,7 +5105,14 @@ void moduleAcquireGIL(int fServerThread) {
|
||||
}
|
||||
else
|
||||
{
|
||||
s_mutexModule.lock();
|
||||
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
|
||||
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
|
||||
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
|
||||
// As a result, a deadlock has occured.
|
||||
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
|
||||
// in order to prevent this deadlock from occuring.
|
||||
while (!s_mutexModule.try_lock())
|
||||
s_cv.wait(lock);
|
||||
++s_cAcquisitionsModule;
|
||||
fModuleGILWlocked++;
|
||||
}
|
||||
@ -5644,6 +5651,9 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client
|
||||
* (If the time it takes to execute 'callback' is negligible the two
|
||||
* statements above mean the same) */
|
||||
RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
|
||||
static uint64_t pending_key;
|
||||
static mstime_t pending_period = -1;
|
||||
|
||||
RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL);
|
||||
timer->module = ctx->module;
|
||||
timer->callback = callback;
|
||||
@ -5662,32 +5672,40 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
|
||||
}
|
||||
}
|
||||
|
||||
bool fNeedPost = (pending_period < 0); // If pending_period is already set, then a PostFunction is in flight and we don't need to set a new one
|
||||
if (pending_period < 0 || period < pending_period) {
|
||||
pending_period = period;
|
||||
pending_key = key;
|
||||
}
|
||||
|
||||
/* We need to install the main event loop timer if it's not already
|
||||
* installed, or we may need to refresh its period if we just installed
|
||||
* a timer that will expire sooner than any other else (i.e. the timer
|
||||
* we just installed is the first timer in the Timers rax). */
|
||||
if (aeTimer != -1) {
|
||||
raxIterator ri;
|
||||
raxStart(&ri,Timers);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
raxNext(&ri);
|
||||
if (memcmp(ri.key,&key,sizeof(key)) == 0) {
|
||||
/* This is the first key, we need to re-install the timer according
|
||||
* to the just added event. */
|
||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
|
||||
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
|
||||
}, true /* synchronous */, false /* fLock */);
|
||||
aeTimer = -1;
|
||||
}
|
||||
raxStop(&ri);
|
||||
}
|
||||
if (fNeedPost) {
|
||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
|
||||
if (aeTimer != -1) {
|
||||
raxIterator ri;
|
||||
raxStart(&ri,Timers);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
raxNext(&ri);
|
||||
if (memcmp(ri.key,&pending_key,sizeof(key)) == 0) {
|
||||
/* This is the first key, we need to re-install the timer according
|
||||
* to the just added event. */
|
||||
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
|
||||
aeTimer = -1;
|
||||
}
|
||||
raxStop(&ri);
|
||||
}
|
||||
|
||||
/* If we have no main timer (the old one was invalidated, or this is the
|
||||
* first module timer we have), install one. */
|
||||
if (aeTimer == -1) {
|
||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
|
||||
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL);
|
||||
}, true /* synchronous */, false /* fLock */);
|
||||
/* If we have no main timer (the old one was invalidated, or this is the
|
||||
* first module timer we have), install one. */
|
||||
if (aeTimer == -1) {
|
||||
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,pending_period,moduleTimerHandler,NULL,NULL);
|
||||
}
|
||||
|
||||
pending_period = -1;
|
||||
});
|
||||
}
|
||||
|
||||
return key;
|
||||
|
@ -495,7 +495,7 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
|
||||
}
|
||||
|
||||
/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
|
||||
void afterErrorReply(client *c, const char *s, size_t len) {
|
||||
void afterErrorReply(client *c, const char *s, size_t len, int severity = ERR_CRITICAL) {
|
||||
/* Sometimes it could be normal that a replica replies to a master with
|
||||
* an error and this function gets called. Actually the error will never
|
||||
* be sent because addReply*() against master clients has no effect...
|
||||
@ -523,9 +523,30 @@ void afterErrorReply(client *c, const char *s, size_t len) {
|
||||
|
||||
if (len > 4096) len = 4096;
|
||||
const char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
|
||||
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
|
||||
"to its %s: '%.*s' after processing the command "
|
||||
"'%s'", from, to, (int)len, s, cmdname);
|
||||
switch (severity) {
|
||||
case ERR_NOTICE:
|
||||
serverLog(LL_NOTICE,"== NOTICE == This %s is rejecting a command "
|
||||
"from its %s: '%.*s' after processing the command "
|
||||
"'%s'", from, to, (int)len, s, cmdname);
|
||||
break;
|
||||
case ERR_WARNING:
|
||||
serverLog(LL_WARNING,"== WARNING == This %s is rejecting a command "
|
||||
"from its %s: '%.*s' after processing the command "
|
||||
"'%s'", from, to, (int)len, s, cmdname);
|
||||
break;
|
||||
case ERR_ERROR:
|
||||
serverLog(LL_WARNING,"== ERROR == This %s is sending an error "
|
||||
"to its %s: '%.*s' after processing the command "
|
||||
"'%s'", from, to, (int)len, s, cmdname);
|
||||
break;
|
||||
case ERR_CRITICAL:
|
||||
default:
|
||||
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
|
||||
"to its %s: '%.*s' after processing the command "
|
||||
"'%s'", from, to, (int)len, s, cmdname);
|
||||
break;
|
||||
}
|
||||
|
||||
if (ctype == CLIENT_TYPE_MASTER && g_pserver->repl_backlog &&
|
||||
g_pserver->repl_backlog_histlen > 0)
|
||||
{
|
||||
@ -537,9 +558,9 @@ void afterErrorReply(client *c, const char *s, size_t len) {
|
||||
|
||||
/* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
|
||||
* Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
|
||||
void addReplyErrorObject(client *c, robj *err) {
|
||||
void addReplyErrorObject(client *c, robj *err, int severity) {
|
||||
addReply(c, err);
|
||||
afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2); /* Ignore trailing \r\n */
|
||||
afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2, severity); /* Ignore trailing \r\n */
|
||||
}
|
||||
|
||||
void addReplyError(client *c, const char *err) {
|
||||
@ -3302,6 +3323,8 @@ void flushSlavesOutputBuffers(void) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
flushReplBacklogToClients();
|
||||
|
||||
listRewind(g_pserver->slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *replica = (client*)listNodeValue(ln);
|
||||
@ -3470,6 +3493,16 @@ void processEventsWhileBlocked(int iel) {
|
||||
locker.arm(nullptr);
|
||||
locker.release();
|
||||
|
||||
// Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here)
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||
redisDb *db = g_pserver->db[idb];
|
||||
while (db->dictUnsafeKeyOnly()->asyncdata != nullptr) {
|
||||
if (!db->dictUnsafeKeyOnly()->asyncdata->done)
|
||||
break;
|
||||
dictCompleteRehashAsync(db->dictUnsafeKeyOnly()->asyncdata, false /*fFree*/);
|
||||
}
|
||||
}
|
||||
|
||||
// Restore it so the calling code is not confused
|
||||
if (fReplBacklog && !serverTL->el->stop) {
|
||||
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
||||
|
126
src/server.cpp
126
src/server.cpp
@ -690,7 +690,7 @@ struct redisCommand redisCommandTable[] = {
|
||||
* failure detection, and a loading server is considered to be
|
||||
* not available. */
|
||||
{"ping",pingCommand,-1,
|
||||
"ok-stale fast @connection @replication",
|
||||
"ok-stale ok-loading fast @connection @replication",
|
||||
0,NULL,0,0,0,0,0,0},
|
||||
|
||||
{"echo",echoCommand,2,
|
||||
@ -1558,14 +1558,14 @@ void tryResizeHashTables(int dbid) {
|
||||
* table will use two tables for a long time. So we try to use 1 millisecond
|
||||
* of CPU time at every call of this function to perform some rehashing.
|
||||
*
|
||||
* The function returns 1 if some rehashing was performed, otherwise 0
|
||||
* The function returns the number of rehashes if some rehashing was performed, otherwise 0
|
||||
* is returned. */
|
||||
int redisDbPersistentData::incrementallyRehash() {
|
||||
/* Keys dictionary */
|
||||
if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) {
|
||||
dictRehashMilliseconds(m_pdict,1);
|
||||
dictRehashMilliseconds(m_pdictTombstone,1);
|
||||
return 1; /* already used our millisecond for this loop... */
|
||||
int result = dictRehashMilliseconds(m_pdict,1);
|
||||
result += dictRehashMilliseconds(m_pdictTombstone,1);
|
||||
return result; /* already used our millisecond for this loop... */
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -1884,22 +1884,32 @@ bool expireOwnKeys()
|
||||
return false;
|
||||
}
|
||||
|
||||
int hash_spin_worker() {
|
||||
auto ctl = serverTL->rehashCtl;
|
||||
return dictRehashSomeAsync(ctl, 1);
|
||||
}
|
||||
|
||||
/* This function handles 'background' operations we are required to do
|
||||
* incrementally in Redis databases, such as active key expiring, resizing,
|
||||
* rehashing. */
|
||||
void databasesCron(void) {
|
||||
/* Expire keys by random sampling. Not required for slaves
|
||||
* as master will synthesize DELs for us. */
|
||||
if (g_pserver->active_expire_enabled) {
|
||||
if (expireOwnKeys()) {
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
||||
} else {
|
||||
expireSlaveKeys();
|
||||
void databasesCron(bool fMainThread) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
static int rehashes_per_ms = 0;
|
||||
static int async_rehashes = 0;
|
||||
if (fMainThread) {
|
||||
/* Expire keys by random sampling. Not required for slaves
|
||||
* as master will synthesize DELs for us. */
|
||||
if (g_pserver->active_expire_enabled) {
|
||||
if (expireOwnKeys()) {
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
||||
} else {
|
||||
expireSlaveKeys();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Defrag keys gradually. */
|
||||
activeDefragCycle();
|
||||
/* Defrag keys gradually. */
|
||||
activeDefragCycle();
|
||||
}
|
||||
|
||||
/* Perform hash tables rehashing if needed, but only if there are no
|
||||
* other processes saving the DB on disk. Otherwise rehashing is bad
|
||||
@ -1916,28 +1926,70 @@ void databasesCron(void) {
|
||||
/* Don't test more DBs than we have. */
|
||||
if (dbs_per_call > cserver.dbnum) dbs_per_call = cserver.dbnum;
|
||||
|
||||
/* Resize */
|
||||
for (j = 0; j < dbs_per_call; j++) {
|
||||
tryResizeHashTables(resize_db % cserver.dbnum);
|
||||
resize_db++;
|
||||
if (fMainThread) {
|
||||
/* Resize */
|
||||
for (j = 0; j < dbs_per_call; j++) {
|
||||
tryResizeHashTables(resize_db % cserver.dbnum);
|
||||
resize_db++;
|
||||
}
|
||||
}
|
||||
|
||||
/* Rehash */
|
||||
if (g_pserver->activerehashing) {
|
||||
for (j = 0; j < dbs_per_call; j++) {
|
||||
int work_done = g_pserver->db[rehash_db]->incrementallyRehash();
|
||||
if (work_done) {
|
||||
/* If the function did some work, stop here, we'll do
|
||||
* more at the next cron loop. */
|
||||
if (serverTL->rehashCtl != nullptr) {
|
||||
if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) {
|
||||
break;
|
||||
} else {
|
||||
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
|
||||
serverTL->rehashCtl = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
serverAssert(serverTL->rehashCtl == nullptr);
|
||||
::dict *dict = g_pserver->db[rehash_db]->dictUnsafeKeyOnly();
|
||||
/* Are we async rehashing? And if so is it time to re-calibrate? */
|
||||
/* The recalibration limit is a prime number to ensure balancing across threads */
|
||||
if (rehashes_per_ms > 0 && async_rehashes < 131 && cserver.active_defrag_enabled) {
|
||||
serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms);
|
||||
++async_rehashes;
|
||||
}
|
||||
if (serverTL->rehashCtl)
|
||||
break;
|
||||
} else {
|
||||
/* If this db didn't need rehash, we'll try the next one. */
|
||||
|
||||
// Before starting anything new, can we end the rehash of a blocked thread?
|
||||
if (dict->asyncdata != nullptr) {
|
||||
auto asyncdata = dict->asyncdata;
|
||||
if (asyncdata->done) {
|
||||
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
|
||||
serverAssert(dict->asyncdata != asyncdata);
|
||||
break; // completion can be expensive, don't do anything else
|
||||
}
|
||||
}
|
||||
|
||||
rehashes_per_ms = g_pserver->db[rehash_db]->incrementallyRehash();
|
||||
async_rehashes = 0;
|
||||
if (rehashes_per_ms > 0) {
|
||||
/* If the function did some work, stop here, we'll do
|
||||
* more at the next cron loop. */
|
||||
if (!cserver.active_defrag_enabled) {
|
||||
serverLog(LL_VERBOSE, "Calibrated rehashes per ms: %d", rehashes_per_ms);
|
||||
}
|
||||
break;
|
||||
} else if (dict->asyncdata == nullptr) {
|
||||
/* If this db didn't need rehash and we have none in flight, we'll try the next one. */
|
||||
rehash_db++;
|
||||
rehash_db %= cserver.dbnum;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (serverTL->rehashCtl) {
|
||||
setAeLockSetThreadSpinWorker(hash_spin_worker);
|
||||
} else {
|
||||
setAeLockSetThreadSpinWorker(nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
/* We take a cached value of the unix time in the global state because with
|
||||
@ -2232,7 +2284,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
clientsCron(IDX_EVENT_LOOP_MAIN);
|
||||
|
||||
/* Handle background operations on Redis databases. */
|
||||
databasesCron();
|
||||
databasesCron(true /* fMainThread */);
|
||||
|
||||
/* Start a scheduled AOF rewrite if this was requested by the user while
|
||||
* a BGSAVE was in progress. */
|
||||
@ -2413,6 +2465,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
||||
processUnblockedClients(iel);
|
||||
}
|
||||
|
||||
/* Handle background operations on Redis databases. */
|
||||
databasesCron(false /* fMainThread */);
|
||||
|
||||
/* Unpause clients if enough time has elapsed */
|
||||
unpauseClientsIfNecessary();
|
||||
|
||||
@ -3002,7 +3057,7 @@ int setOOMScoreAdj(int process_class) {
|
||||
int val;
|
||||
char buf[64];
|
||||
|
||||
val = g_pserver->oom_score_adj_base + g_pserver->oom_score_adj_values[process_class];
|
||||
val = g_pserver->oom_score_adj_values[process_class];
|
||||
if (g_pserver->oom_score_adj == OOM_SCORE_RELATIVE)
|
||||
val += g_pserver->oom_score_adj_base;
|
||||
if (val > 1000) val = 1000;
|
||||
@ -4028,13 +4083,14 @@ void call(client *c, int flags) {
|
||||
* If there's a transaction is flags it as dirty, and if the command is EXEC,
|
||||
* it aborts the transaction.
|
||||
* Note: 'reply' is expected to end with \r\n */
|
||||
void rejectCommand(client *c, robj *reply) {
|
||||
void rejectCommand(client *c, robj *reply, int severity = ERR_CRITICAL) {
|
||||
flagTransaction(c);
|
||||
if (c->cmd && c->cmd->proc == execCommand) {
|
||||
execCommandAbort(c, szFromObj(reply));
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
/* using addReplyError* rather than addReply so that the error can be logged. */
|
||||
addReplyErrorObject(c, reply);
|
||||
addReplyErrorObject(c, reply, severity);
|
||||
}
|
||||
}
|
||||
|
||||
@ -4283,7 +4339,7 @@ int processCommand(client *c, int callFlags) {
|
||||
/* Active Replicas can execute read only commands, and optionally write commands */
|
||||
if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad)))
|
||||
{
|
||||
rejectCommand(c, shared.loadingerr);
|
||||
rejectCommand(c, shared.loadingerr, ERR_WARNING);
|
||||
return C_OK;
|
||||
}
|
||||
}
|
||||
@ -4345,7 +4401,7 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
|
||||
std::lock_guard<decltype(this->lock)> lock(this->lock);
|
||||
fn(this);
|
||||
--casyncOpsPending;
|
||||
}, false, fLock) == AE_OK;
|
||||
}, fLock) == AE_OK;
|
||||
}
|
||||
|
||||
/*================================== Shutdown =============================== */
|
||||
@ -4474,6 +4530,8 @@ int prepareForShutdown(int flags) {
|
||||
/* Best effort flush of replica output buffers, so that we hopefully
|
||||
* send them pending writes. */
|
||||
flushSlavesOutputBuffers();
|
||||
g_pserver->repl_batch_idxStart = -1;
|
||||
g_pserver->repl_batch_offStart = -1;
|
||||
|
||||
/* Close the listening sockets. Apparently this allows faster restarts. */
|
||||
closeListeningSockets(1);
|
||||
@ -5484,7 +5542,7 @@ int linuxMadvFreeForkBugCheck(void) {
|
||||
const long map_size = 3 * 4096;
|
||||
|
||||
/* Create a memory map that's in our full control (not one used by the allocator). */
|
||||
p = mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
|
||||
p = (char*)mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
|
||||
serverAssert(p != MAP_FAILED);
|
||||
|
||||
q = p + 4096;
|
||||
|
@ -611,6 +611,12 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
||||
#define LL_WARNING 3
|
||||
#define LL_RAW (1<<10) /* Modifier to log without timestamp */
|
||||
|
||||
/* Error severity levels */
|
||||
#define ERR_CRITICAL 0
|
||||
#define ERR_ERROR 1
|
||||
#define ERR_WARNING 2
|
||||
#define ERR_NOTICE 3
|
||||
|
||||
/* Supervision options */
|
||||
#define SUPERVISED_NONE 0
|
||||
#define SUPERVISED_AUTODETECT 1
|
||||
@ -1822,6 +1828,7 @@ struct redisServerThreadVars {
|
||||
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
|
||||
bool fRetrySetAofEvent = false;
|
||||
std::vector<client*> vecclientsProcess;
|
||||
dictAsyncRehashCtl *rehashCtl = nullptr;
|
||||
|
||||
int getRdbKeySaveDelay();
|
||||
private:
|
||||
@ -2529,7 +2536,7 @@ void addReplyBulkLongLong(client *c, long long ll);
|
||||
void addReply(client *c, robj_roptr obj);
|
||||
void addReplySds(client *c, sds s);
|
||||
void addReplyBulkSds(client *c, sds s);
|
||||
void addReplyErrorObject(client *c, robj *err);
|
||||
void addReplyErrorObject(client *c, robj *err, int severity);
|
||||
void addReplyErrorSds(client *c, sds err);
|
||||
void addReplyError(client *c, const char *err);
|
||||
void addReplyStatus(client *c, const char *status);
|
||||
|
@ -246,7 +246,7 @@ test_slave_buffers {slave buffer are counted correctly} 1000000 10 0 1
|
||||
# test again with fewer (and bigger) commands without pipeline, but with eviction
|
||||
test_slave_buffers "replica buffer don't induce eviction" 100000 100 1 0
|
||||
|
||||
start_server {tags {"maxmemory"}} {
|
||||
start_server {tags {"maxmemory"} overrides {server-threads 1}} {
|
||||
test {client tracking don't cause eviction feedback loop} {
|
||||
r config set maxmemory 0
|
||||
r config set maxmemory-policy allkeys-lru
|
||||
@ -308,4 +308,4 @@ start_server {tags {"maxmemory"}} {
|
||||
if {$::verbose} { puts "evicted: $evicted" }
|
||||
}
|
||||
}
|
||||
}; #run_solo
|
||||
}; #run_solo
|
||||
|
@ -132,6 +132,7 @@ tags "modules" {
|
||||
}
|
||||
|
||||
$replica replicaof no one
|
||||
after 300
|
||||
|
||||
test {Test role-master hook} {
|
||||
assert_equal [r hooks.event_count role-replica] 1
|
||||
|
@ -1,4 +1,4 @@
|
||||
start_server {tags {"obuf-limits"}} {
|
||||
start_server {tags {"obuf-limits"} overrides { server-threads 1 }} {
|
||||
test {Client output buffer hard limit is enforced} {
|
||||
r config set client-output-buffer-limit {pubsub 100000 0 0}
|
||||
set rd1 [redis_deferring_client]
|
||||
|
@ -448,8 +448,8 @@ start_server {tags {"scripting"}} {
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd eval {redis.call('set',KEYS[1],'y'); for i=1,1500000 do redis.call('ping') end return 'ok'} 1 x
|
||||
$rd flush
|
||||
after 100
|
||||
catch {r ping} err
|
||||
after 200
|
||||
catch {r echo "foo"} err
|
||||
assert_match {BUSY*} $err
|
||||
$rd read
|
||||
set elapsed [expr [clock clicks -milliseconds]-$start]
|
||||
@ -457,8 +457,8 @@ start_server {tags {"scripting"}} {
|
||||
set start [clock clicks -milliseconds]
|
||||
$rd debug loadaof
|
||||
$rd flush
|
||||
after 100
|
||||
catch {r ping} err
|
||||
after 200
|
||||
catch {r echo "foo"} err
|
||||
assert_match {LOADING*} $err
|
||||
$rd read
|
||||
set elapsed [expr [clock clicks -milliseconds]-$start]
|
||||
|
Loading…
x
Reference in New Issue
Block a user