Merge branch 'keydbpro' into PRO_RELEASE_6
Former-commit-id: 7609673e282b46a760a3d0e0b42d685af9b4d056
This commit is contained in:
commit
0cba72ba14
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -20,7 +20,7 @@ jobs:
|
|||||||
run: ./utils/gen-test-certs.sh
|
run: ./utils/gen-test-certs.sh
|
||||||
- name: test-tls
|
- name: test-tls
|
||||||
run: |
|
run: |
|
||||||
sudo apt-get -y install tcl8.5 tcl-tls
|
sudo apt-get -y install tcl tcl-tls
|
||||||
./runtest --clients 2 --verbose --tls
|
./runtest --clients 2 --verbose --tls
|
||||||
- name: cluster-test
|
- name: cluster-test
|
||||||
run: |
|
run: |
|
||||||
|
79
src/ae.cpp
79
src/ae.cpp
@ -109,13 +109,6 @@ enum class AE_ASYNC_OP
|
|||||||
CreateFileEvent,
|
CreateFileEvent,
|
||||||
};
|
};
|
||||||
|
|
||||||
struct aeCommandControl
|
|
||||||
{
|
|
||||||
std::condition_variable cv;
|
|
||||||
std::atomic<int> rval;
|
|
||||||
std::mutex mutexcv;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct aeCommand
|
struct aeCommand
|
||||||
{
|
{
|
||||||
AE_ASYNC_OP op;
|
AE_ASYNC_OP op;
|
||||||
@ -128,7 +121,6 @@ struct aeCommand
|
|||||||
std::function<void()> *pfn;
|
std::function<void()> *pfn;
|
||||||
};
|
};
|
||||||
void *clientData;
|
void *clientData;
|
||||||
aeCommandControl *pctl;
|
|
||||||
};
|
};
|
||||||
#ifdef PIPE_BUF
|
#ifdef PIPE_BUF
|
||||||
static_assert(sizeof(aeCommand) <= PIPE_BUF, "aeCommand must be small enough to send atomically through a pipe");
|
static_assert(sizeof(aeCommand) <= PIPE_BUF, "aeCommand must be small enough to send atomically through a pipe");
|
||||||
@ -152,19 +144,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case AE_ASYNC_OP::CreateFileEvent:
|
case AE_ASYNC_OP::CreateFileEvent:
|
||||||
{
|
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
|
||||||
if (cmd.pctl != nullptr)
|
|
||||||
{
|
|
||||||
cmd.pctl->mutexcv.lock();
|
|
||||||
std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData));
|
|
||||||
cmd.pctl->cv.notify_all();
|
|
||||||
cmd.pctl->mutexcv.unlock();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case AE_ASYNC_OP::PostFunction:
|
case AE_ASYNC_OP::PostFunction:
|
||||||
@ -178,19 +158,11 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
|||||||
|
|
||||||
case AE_ASYNC_OP::PostCppFunction:
|
case AE_ASYNC_OP::PostCppFunction:
|
||||||
{
|
{
|
||||||
if (cmd.pctl != nullptr)
|
|
||||||
cmd.pctl->mutexcv.lock();
|
|
||||||
|
|
||||||
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
||||||
if (cmd.fLock)
|
if (cmd.fLock)
|
||||||
ulock.lock();
|
ulock.lock();
|
||||||
(*cmd.pfn)();
|
(*cmd.pfn)();
|
||||||
|
|
||||||
if (cmd.pctl != nullptr)
|
|
||||||
{
|
|
||||||
cmd.pctl->cv.notify_all();
|
|
||||||
cmd.pctl->mutexcv.unlock();
|
|
||||||
}
|
|
||||||
delete cmd.pfn;
|
delete cmd.pfn;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -229,7 +201,7 @@ ssize_t safe_write(int fd, const void *pv, size_t cb)
|
|||||||
}
|
}
|
||||||
|
|
||||||
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||||
aeFileProc *proc, void *clientData, int fSynchronous)
|
aeFileProc *proc, void *clientData)
|
||||||
{
|
{
|
||||||
if (eventLoop == g_eventLoopThisThread)
|
if (eventLoop == g_eventLoopThisThread)
|
||||||
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
|
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
|
||||||
@ -242,13 +214,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
cmd.mask = mask;
|
cmd.mask = mask;
|
||||||
cmd.fproc = proc;
|
cmd.fproc = proc;
|
||||||
cmd.clientData = clientData;
|
cmd.clientData = clientData;
|
||||||
cmd.pctl = nullptr;
|
|
||||||
cmd.fLock = true;
|
cmd.fLock = true;
|
||||||
if (fSynchronous)
|
|
||||||
{
|
|
||||||
cmd.pctl = new aeCommandControl();
|
|
||||||
cmd.pctl->mutexcv.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
if (size != sizeof(cmd))
|
if (size != sizeof(cmd))
|
||||||
@ -257,16 +223,6 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
serverAssert(errno == EAGAIN);
|
serverAssert(errno == EAGAIN);
|
||||||
ret = AE_ERR;
|
ret = AE_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fSynchronous)
|
|
||||||
{
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
|
|
||||||
cmd.pctl->cv.wait(ulock);
|
|
||||||
ret = cmd.pctl->rval;
|
|
||||||
}
|
|
||||||
delete cmd.pctl;
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -289,7 +245,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
|
|||||||
return AE_OK;
|
return AE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock, bool fForceQueue)
|
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock, bool fForceQueue)
|
||||||
{
|
{
|
||||||
if (eventLoop == g_eventLoopThisThread && !fForceQueue)
|
if (eventLoop == g_eventLoopThisThread && !fForceQueue)
|
||||||
{
|
{
|
||||||
@ -300,13 +256,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
|||||||
aeCommand cmd = {};
|
aeCommand cmd = {};
|
||||||
cmd.op = AE_ASYNC_OP::PostCppFunction;
|
cmd.op = AE_ASYNC_OP::PostCppFunction;
|
||||||
cmd.pfn = new std::function<void()>(fn);
|
cmd.pfn = new std::function<void()>(fn);
|
||||||
cmd.pctl = nullptr;
|
|
||||||
cmd.fLock = fLock;
|
cmd.fLock = fLock;
|
||||||
if (fSynchronous)
|
|
||||||
{
|
|
||||||
cmd.pctl = new aeCommandControl();
|
|
||||||
cmd.pctl->mutexcv.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
if (!(!size || size == sizeof(cmd))) {
|
if (!(!size || size == sizeof(cmd))) {
|
||||||
@ -316,17 +266,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
|||||||
|
|
||||||
if (size == 0)
|
if (size == 0)
|
||||||
return AE_ERR;
|
return AE_ERR;
|
||||||
int ret = AE_OK;
|
|
||||||
if (fSynchronous)
|
return AE_OK;
|
||||||
{
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
|
|
||||||
cmd.pctl->cv.wait(ulock);
|
|
||||||
ret = cmd.pctl->rval;
|
|
||||||
}
|
|
||||||
delete cmd.pctl;
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
aeEventLoop *aeCreateEventLoop(int setsize) {
|
aeEventLoop *aeCreateEventLoop(int setsize) {
|
||||||
@ -904,9 +845,15 @@ void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep,
|
|||||||
eventLoop->aftersleepFlags = flags;
|
eventLoop->aftersleepFlags = flags;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
thread_local spin_worker tl_worker = nullptr;
|
||||||
|
void setAeLockSetThreadSpinWorker(spin_worker worker)
|
||||||
|
{
|
||||||
|
tl_worker = worker;
|
||||||
|
}
|
||||||
|
|
||||||
void aeAcquireLock()
|
void aeAcquireLock()
|
||||||
{
|
{
|
||||||
g_lock.lock();
|
g_lock.lock(tl_worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
int aeTryAcquireLock(int fWeak)
|
int aeTryAcquireLock(int fWeak)
|
||||||
|
5
src/ae.h
5
src/ae.h
@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
|
|||||||
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
|
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
} // EXTERN C
|
} // EXTERN C
|
||||||
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false);
|
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock = true, bool fForceQueue = false);
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
void aeDeleteEventLoop(aeEventLoop *eventLoop);
|
void aeDeleteEventLoop(aeEventLoop *eventLoop);
|
||||||
@ -144,7 +144,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
aeFileProc *proc, void *clientData);
|
aeFileProc *proc, void *clientData);
|
||||||
|
|
||||||
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||||
aeFileProc *proc, void *clientData, int fSynchronous);
|
aeFileProc *proc, void *clientData);
|
||||||
|
|
||||||
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
|
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
|
||||||
void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask);
|
void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask);
|
||||||
@ -164,6 +164,7 @@ aeEventLoop *aeGetCurrentEventLoop();
|
|||||||
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
||||||
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
|
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
|
||||||
|
|
||||||
|
void setAeLockSetThreadSpinWorker(spin_worker worker);
|
||||||
void aeAcquireLock();
|
void aeAcquireLock();
|
||||||
int aeTryAcquireLock(int fWeak);
|
int aeTryAcquireLock(int fWeak);
|
||||||
void aeReleaseLock();
|
void aeReleaseLock();
|
||||||
|
@ -741,7 +741,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
|
|||||||
/* In Redis commands are always executed in the context of a client, so in
|
/* In Redis commands are always executed in the context of a client, so in
|
||||||
* order to load the append only file we need to create a fake client. */
|
* order to load the append only file we need to create a fake client. */
|
||||||
struct client *createAOFClient(void) {
|
struct client *createAOFClient(void) {
|
||||||
struct client *c =(client*) zmalloc(sizeof(*c), MALLOC_LOCAL);
|
struct client *c = new client();
|
||||||
|
|
||||||
selectDb(c,0);
|
selectDb(c,0);
|
||||||
c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */
|
c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */
|
||||||
@ -752,7 +752,6 @@ struct client *createAOFClient(void) {
|
|||||||
c->querybuf_peak = 0;
|
c->querybuf_peak = 0;
|
||||||
c->argc = 0;
|
c->argc = 0;
|
||||||
c->argv = NULL;
|
c->argv = NULL;
|
||||||
c->argv_len_sum = 0;
|
|
||||||
c->bufpos = 0;
|
c->bufpos = 0;
|
||||||
c->flags = 0;
|
c->flags = 0;
|
||||||
c->fPendingAsyncWrite = FALSE;
|
c->fPendingAsyncWrite = FALSE;
|
||||||
@ -783,7 +782,7 @@ void freeFakeClientArgv(struct client *c) {
|
|||||||
for (j = 0; j < c->argc; j++)
|
for (j = 0; j < c->argc; j++)
|
||||||
decrRefCount(c->argv[j]);
|
decrRefCount(c->argv[j]);
|
||||||
zfree(c->argv);
|
zfree(c->argv);
|
||||||
c->argv_len_sum = 0;
|
c->argv_len_sumActive = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void freeFakeClient(struct client *c) {
|
void freeFakeClient(struct client *c) {
|
||||||
@ -793,7 +792,7 @@ void freeFakeClient(struct client *c) {
|
|||||||
freeClientMultiState(c);
|
freeClientMultiState(c);
|
||||||
fastlock_unlock(&c->lock);
|
fastlock_unlock(&c->lock);
|
||||||
fastlock_free(&c->lock);
|
fastlock_free(&c->lock);
|
||||||
zfree(c);
|
delete c;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Replay the append log file. On success C_OK is returned. On non fatal
|
/* Replay the append log file. On success C_OK is returned. On non fatal
|
||||||
|
@ -121,7 +121,7 @@ void processUnblockedClients(int iel) {
|
|||||||
* the code is conceptually more correct this way. */
|
* the code is conceptually more correct this way. */
|
||||||
if (!(c->flags & CLIENT_BLOCKED)) {
|
if (!(c->flags & CLIENT_BLOCKED)) {
|
||||||
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
||||||
processInputBuffer(c, CMD_CALL_FULL);
|
processInputBuffer(c, true /*fParse*/, CMD_CALL_FULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fastlock_unlock(&c->lock);
|
fastlock_unlock(&c->lock);
|
||||||
@ -525,7 +525,6 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
* lookup, invalidating the first one.
|
* lookup, invalidating the first one.
|
||||||
* See https://github.com/antirez/redis/pull/6554. */
|
* See https://github.com/antirez/redis/pull/6554. */
|
||||||
serverTL->fixed_time_expire++;
|
serverTL->fixed_time_expire++;
|
||||||
updateCachedTime(0);
|
|
||||||
|
|
||||||
/* Serve clients blocked on the key. */
|
/* Serve clients blocked on the key. */
|
||||||
robj *o = lookupKeyWrite(rl->db,rl->key);
|
robj *o = lookupKeyWrite(rl->db,rl->key);
|
||||||
|
@ -2619,6 +2619,7 @@ standardConfig configs[] = {
|
|||||||
|
|
||||||
/* Unsigned Long Long configs */
|
/* Unsigned Long Long configs */
|
||||||
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
|
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
|
||||||
|
createULongLongConfig("maxstorage", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxstorage, 0, MEMORY_CONFIG, NULL, NULL),
|
||||||
|
|
||||||
/* Size_t configs */
|
/* Size_t configs */
|
||||||
createSizeTConfig("hash-max-ziplist-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->hash_max_ziplist_entries, 512, INTEGER_CONFIG, NULL, NULL),
|
createSizeTConfig("hash-max-ziplist-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, g_pserver->hash_max_ziplist_entries, 512, INTEGER_CONFIG, NULL, NULL),
|
||||||
|
17
src/db.cpp
17
src/db.cpp
@ -2834,9 +2834,9 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
|
|||||||
|
|
||||||
void redisDbPersistentData::trackChanges(bool fBulk)
|
void redisDbPersistentData::trackChanges(bool fBulk)
|
||||||
{
|
{
|
||||||
m_fTrackingChanges++;
|
m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed);
|
||||||
if (fBulk)
|
if (fBulk)
|
||||||
m_fAllChanged++;
|
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);
|
||||||
}
|
}
|
||||||
|
|
||||||
void redisDbPersistentData::removeAllCachedValues()
|
void redisDbPersistentData::removeAllCachedValues()
|
||||||
@ -2959,24 +2959,27 @@ int dbnumFromDb(redisDb *db)
|
|||||||
serverPanic("invalid database pointer");
|
serverPanic("invalid database pointer");
|
||||||
}
|
}
|
||||||
|
|
||||||
void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c)
|
void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command)
|
||||||
{
|
{
|
||||||
if (m_spstorage == nullptr)
|
if (m_spstorage == nullptr)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
AeLocker lock;
|
||||||
|
|
||||||
std::vector<robj*> veckeys;
|
std::vector<robj*> veckeys;
|
||||||
lock.arm(c);
|
lock.arm(c);
|
||||||
getKeysResult* result = nullptr;
|
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||||
int numkeys = getKeysFromCommand(c->cmd, c->argv, c->argc, result);
|
auto cmd = lookupCommand(szFromObj(command.argv[0]));
|
||||||
|
int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
|
||||||
for (int ikey = 0; ikey < numkeys; ++ikey)
|
for (int ikey = 0; ikey < numkeys; ++ikey)
|
||||||
{
|
{
|
||||||
robj *objKey = c->argv[result->keys[ikey]];
|
robj *objKey = command.argv[result.keys[ikey]];
|
||||||
if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr)
|
if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr)
|
||||||
veckeys.push_back(objKey);
|
veckeys.push_back(objKey);
|
||||||
}
|
}
|
||||||
lock.disarm();
|
lock.disarm();
|
||||||
|
|
||||||
getKeysFreeResult(result);
|
getKeysFreeResult(&result);
|
||||||
|
|
||||||
std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
|
std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
|
||||||
for (robj *objKey : veckeys)
|
for (robj *objKey : veckeys)
|
||||||
|
197
src/dict.cpp
197
src/dict.cpp
@ -128,6 +128,7 @@ int _dictInit(dict *d, dictType *type,
|
|||||||
d->privdata = privDataPtr;
|
d->privdata = privDataPtr;
|
||||||
d->rehashidx = -1;
|
d->rehashidx = -1;
|
||||||
d->iterators = 0;
|
d->iterators = 0;
|
||||||
|
d->asyncdata = nullptr;
|
||||||
return DICT_OK;
|
return DICT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,15 +142,15 @@ int dictResize(dict *d)
|
|||||||
minimal = d->ht[0].used;
|
minimal = d->ht[0].used;
|
||||||
if (minimal < DICT_HT_INITIAL_SIZE)
|
if (minimal < DICT_HT_INITIAL_SIZE)
|
||||||
minimal = DICT_HT_INITIAL_SIZE;
|
minimal = DICT_HT_INITIAL_SIZE;
|
||||||
return dictExpand(d, minimal);
|
return dictExpand(d, minimal, false /*fShirnk*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Expand or create the hash table */
|
/* Expand or create the hash table */
|
||||||
int dictExpand(dict *d, unsigned long size)
|
int dictExpand(dict *d, unsigned long size, bool fShrink)
|
||||||
{
|
{
|
||||||
/* the size is invalid if it is smaller than the number of
|
/* the size is invalid if it is smaller than the number of
|
||||||
* elements already inside the hash table */
|
* elements already inside the hash table */
|
||||||
if (dictIsRehashing(d) || d->ht[0].used > size)
|
if (dictIsRehashing(d) || (d->ht[0].used > size && !fShrink) || size == 0)
|
||||||
return DICT_ERR;
|
return DICT_ERR;
|
||||||
|
|
||||||
dictht n; /* the new hash table */
|
dictht n; /* the new hash table */
|
||||||
@ -263,7 +264,7 @@ int dictMerge(dict *dst, dict *src)
|
|||||||
return DICT_OK;
|
return DICT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
dictExpand(dst, dictSize(dst)+dictSize(src)); // start dst rehashing if necessary
|
dictExpand(dst, dictSize(dst)+dictSize(src), false /* fShrink */); // start dst rehashing if necessary
|
||||||
auto &htDst = dictIsRehashing(dst) ? dst->ht[1] : dst->ht[0];
|
auto &htDst = dictIsRehashing(dst) ? dst->ht[1] : dst->ht[0];
|
||||||
for (int iht = 0; iht < 2; ++iht)
|
for (int iht = 0; iht < 2; ++iht)
|
||||||
{
|
{
|
||||||
@ -328,13 +329,13 @@ int dictMerge(dict *dst, dict *src)
|
|||||||
int dictRehash(dict *d, int n) {
|
int dictRehash(dict *d, int n) {
|
||||||
int empty_visits = n*10; /* Max number of empty buckets to visit. */
|
int empty_visits = n*10; /* Max number of empty buckets to visit. */
|
||||||
if (!dictIsRehashing(d)) return 0;
|
if (!dictIsRehashing(d)) return 0;
|
||||||
|
if (d->asyncdata) return 0;
|
||||||
|
|
||||||
while(n-- && d->ht[0].used != 0) {
|
while(n-- && d->ht[0].used != 0) {
|
||||||
dictEntry *de, *nextde;
|
dictEntry *de, *nextde;
|
||||||
|
|
||||||
/* Note that rehashidx can't overflow as we are sure there are more
|
/* Note that rehashidx can't overflow as we are sure there are more
|
||||||
* elements because ht[0].used != 0 */
|
* elements because ht[0].used != 0 */
|
||||||
assert(d->ht[0].size > (unsigned long)d->rehashidx);
|
|
||||||
while(d->ht[0].table[d->rehashidx] == NULL) {
|
while(d->ht[0].table[d->rehashidx] == NULL) {
|
||||||
d->rehashidx++;
|
d->rehashidx++;
|
||||||
if (--empty_visits == 0) return 1;
|
if (--empty_visits == 0) return 1;
|
||||||
@ -370,6 +371,144 @@ int dictRehash(dict *d, int n) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
|
||||||
|
if (!dictIsRehashing(d)) return 0;
|
||||||
|
|
||||||
|
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
|
||||||
|
|
||||||
|
int empty_visits = buckets * 10;
|
||||||
|
|
||||||
|
while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) {
|
||||||
|
dictEntry *de;
|
||||||
|
|
||||||
|
/* Note that rehashidx can't overflow as we are sure there are more
|
||||||
|
* elements because ht[0].used != 0 */
|
||||||
|
while(d->ht[0].table[d->rehashidx] == NULL) {
|
||||||
|
d->rehashidx++;
|
||||||
|
if (--empty_visits == 0) goto LDone;
|
||||||
|
if (d->rehashidx >= d->ht[0].size) goto LDone;
|
||||||
|
}
|
||||||
|
|
||||||
|
de = d->ht[0].table[d->rehashidx];
|
||||||
|
// We have to queue every node in the bucket, even if we go over our target size
|
||||||
|
while (de != nullptr) {
|
||||||
|
d->asyncdata->queue.emplace_back(de);
|
||||||
|
de = de->next;
|
||||||
|
}
|
||||||
|
d->rehashidx++;
|
||||||
|
}
|
||||||
|
|
||||||
|
LDone:
|
||||||
|
if (d->asyncdata->queue.empty()) {
|
||||||
|
// We didn't find any work to do (can happen if there is a large gap in the hash table)
|
||||||
|
auto asyncT = d->asyncdata;
|
||||||
|
d->asyncdata = d->asyncdata->next;
|
||||||
|
delete asyncT;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
return d->asyncdata;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dictRehashAsync(dictAsyncRehashCtl *ctl) {
|
||||||
|
for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) {
|
||||||
|
auto &wi = ctl->queue[idx];
|
||||||
|
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
|
||||||
|
}
|
||||||
|
ctl->hashIdx = ctl->queue.size();
|
||||||
|
ctl->done = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
|
||||||
|
size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size());
|
||||||
|
for (; ctl->hashIdx < max; ++ctl->hashIdx) {
|
||||||
|
auto &wi = ctl->queue[ctl->hashIdx];
|
||||||
|
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctl->hashIdx == ctl->queue.size()) ctl->done = true;
|
||||||
|
return ctl->hashIdx < ctl->queue.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
|
||||||
|
dict *d = ctl->dict;
|
||||||
|
assert(ctl->done);
|
||||||
|
|
||||||
|
// Unlink ourselves
|
||||||
|
bool fUnlinked = false;
|
||||||
|
dictAsyncRehashCtl **pprev = &d->asyncdata;
|
||||||
|
|
||||||
|
while (*pprev != nullptr) {
|
||||||
|
if (*pprev == ctl) {
|
||||||
|
*pprev = ctl->next;
|
||||||
|
fUnlinked = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pprev = &((*pprev)->next);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fUnlinked) {
|
||||||
|
if (ctl->next != nullptr && ctl->deGCList != nullptr) {
|
||||||
|
// An older work item may depend on our free list, so pass our free list to them
|
||||||
|
dictEntry **deGC = &ctl->next->deGCList;
|
||||||
|
while (*deGC != nullptr) deGC = &((*deGC)->next);
|
||||||
|
*deGC = ctl->deGCList;
|
||||||
|
ctl->deGCList = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fUnlinked && !ctl->release) {
|
||||||
|
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
|
||||||
|
for (auto &wi : ctl->queue) {
|
||||||
|
// We need to remove it from the source hash table, and store it in the dest.
|
||||||
|
// Note that it may have been deleted in the meantime and therefore not exist.
|
||||||
|
// In this case it will be in the garbage collection list
|
||||||
|
|
||||||
|
dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask];
|
||||||
|
while (*pdePrev != nullptr && *pdePrev != wi.de) {
|
||||||
|
pdePrev = &((*pdePrev)->next);
|
||||||
|
}
|
||||||
|
if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list
|
||||||
|
assert(*pdePrev == wi.de);
|
||||||
|
*pdePrev = wi.de->next;
|
||||||
|
// Now link it to the dest hash table
|
||||||
|
wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask];
|
||||||
|
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
|
||||||
|
d->ht[0].used--;
|
||||||
|
d->ht[1].used++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check if we already rehashed the whole table... */
|
||||||
|
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
|
||||||
|
zfree(d->ht[0].table);
|
||||||
|
d->ht[0] = d->ht[1];
|
||||||
|
_dictReset(&d->ht[1]);
|
||||||
|
d->rehashidx = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fFree) {
|
||||||
|
while (ctl->deGCList != nullptr) {
|
||||||
|
auto next = ctl->deGCList->next;
|
||||||
|
dictFreeKey(d, ctl->deGCList);
|
||||||
|
dictFreeVal(d, ctl->deGCList);
|
||||||
|
zfree(ctl->deGCList);
|
||||||
|
ctl->deGCList = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Was the dictionary free'd while we were in flight?
|
||||||
|
if (ctl->release) {
|
||||||
|
if (d->asyncdata != nullptr)
|
||||||
|
d->asyncdata->release = true;
|
||||||
|
else
|
||||||
|
dictRelease(d);
|
||||||
|
}
|
||||||
|
|
||||||
|
delete ctl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long long timeInMilliseconds(void) {
|
long long timeInMilliseconds(void) {
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
|
|
||||||
@ -527,9 +666,14 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
|
|||||||
else
|
else
|
||||||
d->ht[table].table[idx] = he->next;
|
d->ht[table].table[idx] = he->next;
|
||||||
if (!nofree) {
|
if (!nofree) {
|
||||||
dictFreeKey(d, he);
|
if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) {
|
||||||
dictFreeVal(d, he);
|
he->next = d->asyncdata->deGCList;
|
||||||
zfree(he);
|
d->asyncdata->deGCList = he->next;
|
||||||
|
} else {
|
||||||
|
dictFreeKey(d, he);
|
||||||
|
dictFreeVal(d, he);
|
||||||
|
zfree(he);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
d->ht[table].used--;
|
d->ht[table].used--;
|
||||||
return he;
|
return he;
|
||||||
@ -539,6 +683,8 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
|
|||||||
}
|
}
|
||||||
if (!dictIsRehashing(d)) break;
|
if (!dictIsRehashing(d)) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_dictExpandIfNeeded(d);
|
||||||
return NULL; /* not found */
|
return NULL; /* not found */
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -577,9 +723,14 @@ dictEntry *dictUnlink(dict *ht, const void *key) {
|
|||||||
* to dictUnlink(). It's safe to call this function with 'he' = NULL. */
|
* to dictUnlink(). It's safe to call this function with 'he' = NULL. */
|
||||||
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
|
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
|
||||||
if (he == NULL) return;
|
if (he == NULL) return;
|
||||||
dictFreeKey(d, he);
|
if (d->asyncdata) {
|
||||||
dictFreeVal(d, he);
|
he->next = d->asyncdata->deGCList;
|
||||||
zfree(he);
|
d->asyncdata->deGCList = he;
|
||||||
|
} else {
|
||||||
|
dictFreeKey(d, he);
|
||||||
|
dictFreeVal(d, he);
|
||||||
|
zfree(he);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Destroy an entire dictionary */
|
/* Destroy an entire dictionary */
|
||||||
@ -595,9 +746,14 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
|||||||
if ((he = ht->table[i]) == NULL) continue;
|
if ((he = ht->table[i]) == NULL) continue;
|
||||||
while(he) {
|
while(he) {
|
||||||
nextHe = he->next;
|
nextHe = he->next;
|
||||||
dictFreeKey(d, he);
|
if (d->asyncdata && i < d->rehashidx) {
|
||||||
dictFreeVal(d, he);
|
he->next = d->asyncdata->deGCList;
|
||||||
zfree(he);
|
d->asyncdata->deGCList = he;
|
||||||
|
} else {
|
||||||
|
dictFreeKey(d, he);
|
||||||
|
dictFreeVal(d, he);
|
||||||
|
zfree(he);
|
||||||
|
}
|
||||||
ht->used--;
|
ht->used--;
|
||||||
he = nextHe;
|
he = nextHe;
|
||||||
}
|
}
|
||||||
@ -612,6 +768,10 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
|||||||
/* Clear & Release the hash table */
|
/* Clear & Release the hash table */
|
||||||
void dictRelease(dict *d)
|
void dictRelease(dict *d)
|
||||||
{
|
{
|
||||||
|
if (d->asyncdata) {
|
||||||
|
d->asyncdata->release = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
_dictClear(d,&d->ht[0],NULL);
|
_dictClear(d,&d->ht[0],NULL);
|
||||||
_dictClear(d,&d->ht[1],NULL);
|
_dictClear(d,&d->ht[1],NULL);
|
||||||
zfree(d);
|
zfree(d);
|
||||||
@ -1111,7 +1271,7 @@ static int _dictExpandIfNeeded(dict *d)
|
|||||||
if (dictIsRehashing(d)) return DICT_OK;
|
if (dictIsRehashing(d)) return DICT_OK;
|
||||||
|
|
||||||
/* If the hash table is empty expand it to the initial size. */
|
/* If the hash table is empty expand it to the initial size. */
|
||||||
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
|
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE, false /*fShrink*/);
|
||||||
|
|
||||||
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
|
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
|
||||||
* table (global setting) or we should avoid it but the ratio between
|
* table (global setting) or we should avoid it but the ratio between
|
||||||
@ -1121,7 +1281,12 @@ static int _dictExpandIfNeeded(dict *d)
|
|||||||
(dict_can_resize ||
|
(dict_can_resize ||
|
||||||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
|
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
|
||||||
{
|
{
|
||||||
return dictExpand(d, d->ht[0].used*2);
|
return dictExpand(d, d->ht[0].used*2, false /*fShrink*/);
|
||||||
|
}
|
||||||
|
else if (d->ht[0].used > 0 && d->ht[0].used * 16 < d->ht[0].size && dict_can_resize)
|
||||||
|
{
|
||||||
|
// If the dictionary has shurnk a lot we'll need to shrink the hash table instead
|
||||||
|
return dictExpand(d, d->ht[0].used*2, true /*fShrink*/);
|
||||||
}
|
}
|
||||||
return DICT_OK;
|
return DICT_OK;
|
||||||
}
|
}
|
||||||
|
38
src/dict.h
38
src/dict.h
@ -36,6 +36,8 @@
|
|||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
#include <vector>
|
||||||
|
#include <atomic>
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
@ -81,12 +83,40 @@ typedef struct dictht {
|
|||||||
unsigned long used;
|
unsigned long used;
|
||||||
} dictht;
|
} dictht;
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
struct dictAsyncRehashCtl {
|
||||||
|
struct workItem {
|
||||||
|
dictEntry *de;
|
||||||
|
uint64_t hash;
|
||||||
|
workItem(dictEntry *de) {
|
||||||
|
this->de = de;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
static const int c_targetQueueSize = 512;
|
||||||
|
dictEntry *deGCList = nullptr;
|
||||||
|
struct dict *dict = nullptr;
|
||||||
|
std::vector<workItem> queue;
|
||||||
|
size_t hashIdx = 0;
|
||||||
|
bool release = false;
|
||||||
|
dictAsyncRehashCtl *next = nullptr;
|
||||||
|
std::atomic<bool> done { false };
|
||||||
|
|
||||||
|
dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
|
||||||
|
queue.reserve(c_targetQueueSize);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
#else
|
||||||
|
struct dictAsyncRehashCtl;
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef struct dict {
|
typedef struct dict {
|
||||||
dictType *type;
|
dictType *type;
|
||||||
void *privdata;
|
void *privdata;
|
||||||
dictht ht[2];
|
dictht ht[2];
|
||||||
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
||||||
unsigned long iterators; /* number of iterators currently running */
|
unsigned long iterators; /* number of iterators currently running */
|
||||||
|
dictAsyncRehashCtl *asyncdata;
|
||||||
} dict;
|
} dict;
|
||||||
|
|
||||||
/* If safe is set to 1 this is a safe iterator, that means, you can call
|
/* If safe is set to 1 this is a safe iterator, that means, you can call
|
||||||
@ -157,7 +187,7 @@ typedef void (dictScanBucketFunction)(void *privdata, dictEntry **bucketref);
|
|||||||
|
|
||||||
/* API */
|
/* API */
|
||||||
dict *dictCreate(dictType *type, void *privDataPtr);
|
dict *dictCreate(dictType *type, void *privDataPtr);
|
||||||
int dictExpand(dict *d, unsigned long size);
|
int dictExpand(dict *d, unsigned long size, bool fShrink = false);
|
||||||
int dictAdd(dict *d, void *key, void *val);
|
int dictAdd(dict *d, void *key, void *val);
|
||||||
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
|
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
|
||||||
dictEntry *dictAddOrFind(dict *d, void *key);
|
dictEntry *dictAddOrFind(dict *d, void *key);
|
||||||
@ -193,6 +223,12 @@ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t h
|
|||||||
void dictForceRehash(dict *d);
|
void dictForceRehash(dict *d);
|
||||||
int dictMerge(dict *dst, dict *src);
|
int dictMerge(dict *dst, dict *src);
|
||||||
|
|
||||||
|
/* Async Rehash Functions */
|
||||||
|
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize);
|
||||||
|
void dictRehashAsync(dictAsyncRehashCtl *ctl);
|
||||||
|
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes);
|
||||||
|
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree);
|
||||||
|
|
||||||
/* Hash table types */
|
/* Hash table types */
|
||||||
extern dictType dictTypeHeapStringCopyKey;
|
extern dictType dictTypeHeapStringCopyKey;
|
||||||
extern dictType dictTypeHeapStrings;
|
extern dictType dictTypeHeapStrings;
|
||||||
|
@ -500,6 +500,9 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
|
|||||||
int ckeysFailed = 0;
|
int ckeysFailed = 0;
|
||||||
int keys_freed = 0;
|
int keys_freed = 0;
|
||||||
|
|
||||||
|
if (g_pserver->maxstorage && g_pserver->m_pstorageFactory != nullptr && g_pserver->m_pstorageFactory->totalDiskspaceUsed() >= g_pserver->maxstorage)
|
||||||
|
goto cant_free_storage;
|
||||||
|
|
||||||
/* When clients are paused the dataset should be static not just from the
|
/* When clients are paused the dataset should be static not just from the
|
||||||
* POV of clients not being able to write, but also from the POV of
|
* POV of clients not being able to write, but also from the POV of
|
||||||
* expires and evictions of keys not being performed. */
|
* expires and evictions of keys not being performed. */
|
||||||
@ -726,8 +729,10 @@ cant_free:
|
|||||||
latencyEndMonitor(lazyfree_latency);
|
latencyEndMonitor(lazyfree_latency);
|
||||||
latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
|
latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
|
||||||
}
|
}
|
||||||
|
|
||||||
latencyEndMonitor(latency);
|
latencyEndMonitor(latency);
|
||||||
latencyAddSampleIfNeeded("eviction-cycle",latency);
|
latencyAddSampleIfNeeded("eviction-cycle",latency);
|
||||||
|
cant_free_storage:
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
|||||||
executeCronJobExpireHook(keyCopy, val);
|
executeCronJobExpireHook(keyCopy, val);
|
||||||
sdsfree(keyCopy);
|
sdsfree(keyCopy);
|
||||||
decrRefCount(val);
|
decrRefCount(val);
|
||||||
}, false, true /*fLock*/, true /*fForceQueue*/);
|
}, true /*fLock*/, true /*fForceQueue*/);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -338,7 +338,7 @@ extern "C" void fastlock_init(struct fastlock *lock, const char *name)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#ifndef ASM_SPINLOCK
|
#ifndef ASM_SPINLOCK
|
||||||
extern "C" void fastlock_lock(struct fastlock *lock)
|
extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker)
|
||||||
{
|
{
|
||||||
int pidOwner;
|
int pidOwner;
|
||||||
__atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE);
|
__atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE);
|
||||||
@ -356,20 +356,32 @@ extern "C" void fastlock_lock(struct fastlock *lock)
|
|||||||
__atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED);
|
__atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED);
|
||||||
unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000;
|
unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000;
|
||||||
|
|
||||||
for (;;)
|
if (worker != nullptr) {
|
||||||
{
|
for (;;) {
|
||||||
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
|
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
|
||||||
if ((ticketT.u & 0xffff) == myticket)
|
if ((ticketT.u & 0xffff) == myticket)
|
||||||
break;
|
break;
|
||||||
|
if (!worker())
|
||||||
|
goto LNormalLoop;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LNormalLoop:
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
|
||||||
|
if ((ticketT.u & 0xffff) == myticket)
|
||||||
|
break;
|
||||||
|
|
||||||
#if defined(__i386__) || defined(__amd64__)
|
#if defined(__i386__) || defined(__amd64__)
|
||||||
__asm__ __volatile__ ("pause");
|
__asm__ __volatile__ ("pause");
|
||||||
#elif defined(__aarch64__)
|
#elif defined(__aarch64__)
|
||||||
__asm__ __volatile__ ("yield");
|
__asm__ __volatile__ ("yield");
|
||||||
#endif
|
#endif
|
||||||
if ((++cloops % loopLimit) == 0)
|
|
||||||
{
|
if ((++cloops % loopLimit) == 0)
|
||||||
fastlock_sleep(lock, tid, ticketT.u, myticket);
|
{
|
||||||
|
fastlock_sleep(lock, tid, ticketT.u, myticket);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,10 +6,16 @@
|
|||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef int (*spin_worker)();
|
||||||
|
|
||||||
/* Begin C API */
|
/* Begin C API */
|
||||||
struct fastlock;
|
struct fastlock;
|
||||||
void fastlock_init(struct fastlock *lock, const char *name);
|
void fastlock_init(struct fastlock *lock, const char *name);
|
||||||
void fastlock_lock(struct fastlock *lock);
|
#ifdef __cplusplus
|
||||||
|
void fastlock_lock(struct fastlock *lock, spin_worker worker = nullptr);
|
||||||
|
#else
|
||||||
|
void fastlock_lock(struct fastlock *lock, spin_worker worker);
|
||||||
|
#endif
|
||||||
int fastlock_trylock(struct fastlock *lock, int fWeak);
|
int fastlock_trylock(struct fastlock *lock, int fWeak);
|
||||||
void fastlock_unlock(struct fastlock *lock);
|
void fastlock_unlock(struct fastlock *lock);
|
||||||
void fastlock_free(struct fastlock *lock);
|
void fastlock_free(struct fastlock *lock);
|
||||||
@ -56,9 +62,9 @@ struct fastlock
|
|||||||
fastlock_init(this, name);
|
fastlock_init(this, name);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void lock()
|
inline void lock(spin_worker worker = nullptr)
|
||||||
{
|
{
|
||||||
fastlock_lock(this);
|
fastlock_lock(this, worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool try_lock(bool fWeak = false)
|
inline bool try_lock(bool fWeak = false)
|
||||||
|
@ -22,14 +22,22 @@ fastlock_lock:
|
|||||||
# [rdi+64] ...
|
# [rdi+64] ...
|
||||||
# uint16_t active
|
# uint16_t active
|
||||||
# uint16_t avail
|
# uint16_t avail
|
||||||
|
#
|
||||||
|
# RSI points to a spin function to call, or NULL
|
||||||
|
|
||||||
# First get our TID and put it in ecx
|
# First get our TID and put it in ecx
|
||||||
push rdi # we need our struct pointer (also balance the stack for the call)
|
sub rsp, 24 # We only use 16 bytes, but we also need the stack aligned
|
||||||
.cfi_adjust_cfa_offset 8
|
.cfi_adjust_cfa_offset 24
|
||||||
|
mov [rsp], rdi # we need our struct pointer (also balance the stack for the call)
|
||||||
|
mov [rsp+8], rsi # backup the spin function
|
||||||
|
|
||||||
call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining)
|
call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining)
|
||||||
mov esi, eax # back it up in esi
|
mov esi, eax # back it up in esi
|
||||||
pop rdi # get our pointer back
|
|
||||||
.cfi_adjust_cfa_offset -8
|
mov rdi, [rsp] # Restore spin struct
|
||||||
|
mov r8, [rsp+8] # restore the function (in a different register)
|
||||||
|
add rsp, 24
|
||||||
|
.cfi_adjust_cfa_offset -24
|
||||||
|
|
||||||
cmp [rdi], esi # Is the TID we got back the owner of the lock?
|
cmp [rdi], esi # Is the TID we got back the owner of the lock?
|
||||||
je .LLocked # Don't spin in that case
|
je .LLocked # Don't spin in that case
|
||||||
@ -47,6 +55,8 @@ fastlock_lock:
|
|||||||
# ax now contains the ticket
|
# ax now contains the ticket
|
||||||
# OK Start the wait loop
|
# OK Start the wait loop
|
||||||
xor ecx, ecx
|
xor ecx, ecx
|
||||||
|
test r8, r8
|
||||||
|
jnz .LLoopFunction
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
.LLoop:
|
.LLoop:
|
||||||
mov edx, [rdi+64]
|
mov edx, [rdi+64]
|
||||||
@ -83,6 +93,40 @@ fastlock_lock:
|
|||||||
mov [rdi], esi # lock->m_pidOwner = gettid()
|
mov [rdi], esi # lock->m_pidOwner = gettid()
|
||||||
inc dword ptr [rdi+4] # lock->m_depth++
|
inc dword ptr [rdi+4] # lock->m_depth++
|
||||||
ret
|
ret
|
||||||
|
|
||||||
|
.LLoopFunction:
|
||||||
|
sub rsp, 40
|
||||||
|
.cfi_adjust_cfa_offset 40
|
||||||
|
xor ecx, ecx
|
||||||
|
mov [rsp], rcx
|
||||||
|
mov [rsp+8], r8
|
||||||
|
mov [rsp+16], rdi
|
||||||
|
mov [rsp+24], rsi
|
||||||
|
mov [rsp+32], eax
|
||||||
|
.LLoopFunctionCore:
|
||||||
|
mov edx, [rdi+64]
|
||||||
|
cmp dx, ax
|
||||||
|
je .LExitLoopFunction
|
||||||
|
mov r8, [rsp+8]
|
||||||
|
call r8
|
||||||
|
test eax, eax
|
||||||
|
jz .LExitLoopFunctionForNormal
|
||||||
|
mov eax, [rsp+32] # restore clobbered eax
|
||||||
|
mov rdi, [rsp+16]
|
||||||
|
jmp .LLoopFunctionCore
|
||||||
|
.LExitLoopFunction:
|
||||||
|
mov rsi, [rsp+24]
|
||||||
|
add rsp, 40
|
||||||
|
.cfi_adjust_cfa_offset -40
|
||||||
|
jmp .LLocked
|
||||||
|
.LExitLoopFunctionForNormal:
|
||||||
|
xor ecx, ecx
|
||||||
|
mov rdi, [rsp+16]
|
||||||
|
mov rsi, [rsp+24]
|
||||||
|
mov eax, [rsp+32]
|
||||||
|
add rsp, 40
|
||||||
|
.cfi_adjust_cfa_offset -40
|
||||||
|
jmp .LLoop
|
||||||
.cfi_endproc
|
.cfi_endproc
|
||||||
|
|
||||||
.ALIGN 16
|
.ALIGN 16
|
||||||
@ -146,10 +190,11 @@ fastlock_unlock:
|
|||||||
mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore)
|
mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore)
|
||||||
mov esi, [rdi+64] # get current active (this one)
|
mov esi, [rdi+64] # get current active (this one)
|
||||||
inc esi # bump it to the next thread
|
inc esi # bump it to the next thread
|
||||||
|
sfence # ensure whatever was written in the lock is visible
|
||||||
mov word ptr [rdi+64], si # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
|
mov word ptr [rdi+64], si # give up our ticket (note: lock is not required here because the spinlock itself guards this variable)
|
||||||
mfence # sync other threads
|
|
||||||
# At this point the lock is removed, however we must wake up any pending futexs
|
# At this point the lock is removed, however we must wake up any pending futexs
|
||||||
mov edx, [rdi+64+4] # load the futex mask
|
mov rdx, [rdi+64] # load the futex mask, note we intentionally also read the ticket we just wrote to ensure this is ordered with the above mov
|
||||||
|
shr rdx, 32 # isolate the mask
|
||||||
bt edx, esi # is the next thread waiting on a futex?
|
bt edx, esi # is the next thread waiting on a futex?
|
||||||
jc unlock_futex # unlock the futex if necessary
|
jc unlock_futex # unlock the futex if necessary
|
||||||
ret # if not we're done.
|
ret # if not we're done.
|
||||||
|
@ -5046,7 +5046,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
|||||||
zfree(ctx);
|
zfree(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool g_fModuleThread = false;
|
thread_local bool g_fModuleThread = false;
|
||||||
/* Acquire the server lock before executing a thread safe API call.
|
/* Acquire the server lock before executing a thread safe API call.
|
||||||
* This is not needed for `RedisModule_Reply*` calls when there is
|
* This is not needed for `RedisModule_Reply*` calls when there is
|
||||||
* a blocked client connected to the thread safe context. */
|
* a blocked client connected to the thread safe context. */
|
||||||
@ -5105,7 +5105,14 @@ void moduleAcquireGIL(int fServerThread) {
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
s_mutexModule.lock();
|
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
|
||||||
|
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
|
||||||
|
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
|
||||||
|
// As a result, a deadlock has occured.
|
||||||
|
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
|
||||||
|
// in order to prevent this deadlock from occuring.
|
||||||
|
while (!s_mutexModule.try_lock())
|
||||||
|
s_cv.wait(lock);
|
||||||
++s_cAcquisitionsModule;
|
++s_cAcquisitionsModule;
|
||||||
fModuleGILWlocked++;
|
fModuleGILWlocked++;
|
||||||
}
|
}
|
||||||
@ -5644,6 +5651,9 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client
|
|||||||
* (If the time it takes to execute 'callback' is negligible the two
|
* (If the time it takes to execute 'callback' is negligible the two
|
||||||
* statements above mean the same) */
|
* statements above mean the same) */
|
||||||
RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
|
RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
|
||||||
|
static uint64_t pending_key;
|
||||||
|
static mstime_t pending_period = -1;
|
||||||
|
|
||||||
RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL);
|
RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL);
|
||||||
timer->module = ctx->module;
|
timer->module = ctx->module;
|
||||||
timer->callback = callback;
|
timer->callback = callback;
|
||||||
@ -5662,32 +5672,40 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool fNeedPost = (pending_period < 0); // If pending_period is already set, then a PostFunction is in flight and we don't need to set a new one
|
||||||
|
if (pending_period < 0 || period < pending_period) {
|
||||||
|
pending_period = period;
|
||||||
|
pending_key = key;
|
||||||
|
}
|
||||||
|
|
||||||
/* We need to install the main event loop timer if it's not already
|
/* We need to install the main event loop timer if it's not already
|
||||||
* installed, or we may need to refresh its period if we just installed
|
* installed, or we may need to refresh its period if we just installed
|
||||||
* a timer that will expire sooner than any other else (i.e. the timer
|
* a timer that will expire sooner than any other else (i.e. the timer
|
||||||
* we just installed is the first timer in the Timers rax). */
|
* we just installed is the first timer in the Timers rax). */
|
||||||
if (aeTimer != -1) {
|
if (fNeedPost) {
|
||||||
raxIterator ri;
|
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
|
||||||
raxStart(&ri,Timers);
|
if (aeTimer != -1) {
|
||||||
raxSeek(&ri,"^",NULL,0);
|
raxIterator ri;
|
||||||
raxNext(&ri);
|
raxStart(&ri,Timers);
|
||||||
if (memcmp(ri.key,&key,sizeof(key)) == 0) {
|
raxSeek(&ri,"^",NULL,0);
|
||||||
/* This is the first key, we need to re-install the timer according
|
raxNext(&ri);
|
||||||
* to the just added event. */
|
if (memcmp(ri.key,&pending_key,sizeof(key)) == 0) {
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
|
/* This is the first key, we need to re-install the timer according
|
||||||
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
|
* to the just added event. */
|
||||||
}, true /* synchronous */, false /* fLock */);
|
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
|
||||||
aeTimer = -1;
|
aeTimer = -1;
|
||||||
}
|
}
|
||||||
raxStop(&ri);
|
raxStop(&ri);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If we have no main timer (the old one was invalidated, or this is the
|
/* If we have no main timer (the old one was invalidated, or this is the
|
||||||
* first module timer we have), install one. */
|
* first module timer we have), install one. */
|
||||||
if (aeTimer == -1) {
|
if (aeTimer == -1) {
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
|
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,pending_period,moduleTimerHandler,NULL,NULL);
|
||||||
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL);
|
}
|
||||||
}, true /* synchronous */, false /* fLock */);
|
|
||||||
|
pending_period = -1;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return key;
|
return key;
|
||||||
|
@ -102,7 +102,7 @@ void linkClient(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
client *createClient(connection *conn, int iel) {
|
client *createClient(connection *conn, int iel) {
|
||||||
client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL);
|
client *c = new client;
|
||||||
serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar)));
|
serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar)));
|
||||||
|
|
||||||
c->iel = iel;
|
c->iel = iel;
|
||||||
@ -124,7 +124,6 @@ client *createClient(connection *conn, int iel) {
|
|||||||
uint64_t client_id;
|
uint64_t client_id;
|
||||||
client_id = g_pserver->next_client_id.fetch_add(1);
|
client_id = g_pserver->next_client_id.fetch_add(1);
|
||||||
c->iel = iel;
|
c->iel = iel;
|
||||||
fastlock_init(&c->lock, "client");
|
|
||||||
c->id = client_id;
|
c->id = client_id;
|
||||||
c->resp = 2;
|
c->resp = 2;
|
||||||
c->conn = conn;
|
c->conn = conn;
|
||||||
@ -137,7 +136,6 @@ client *createClient(connection *conn, int iel) {
|
|||||||
c->reqtype = 0;
|
c->reqtype = 0;
|
||||||
c->argc = 0;
|
c->argc = 0;
|
||||||
c->argv = NULL;
|
c->argv = NULL;
|
||||||
c->argv_len_sum = 0;
|
|
||||||
c->cmd = c->lastcmd = NULL;
|
c->cmd = c->lastcmd = NULL;
|
||||||
c->puser = DefaultUser;
|
c->puser = DefaultUser;
|
||||||
c->multibulklen = 0;
|
c->multibulklen = 0;
|
||||||
@ -157,6 +155,7 @@ client *createClient(connection *conn, int iel) {
|
|||||||
c->reploff = 0;
|
c->reploff = 0;
|
||||||
c->reploff_skipped = 0;
|
c->reploff_skipped = 0;
|
||||||
c->read_reploff = 0;
|
c->read_reploff = 0;
|
||||||
|
c->reploff_cmd = 0;
|
||||||
c->repl_ack_off = 0;
|
c->repl_ack_off = 0;
|
||||||
c->repl_ack_time = 0;
|
c->repl_ack_time = 0;
|
||||||
c->slave_listening_port = 0;
|
c->slave_listening_port = 0;
|
||||||
@ -203,6 +202,13 @@ client *createClient(connection *conn, int iel) {
|
|||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t client::argv_len_sum() const {
|
||||||
|
size_t sum = 0;
|
||||||
|
for (auto &cmd : vecqueuedcmd)
|
||||||
|
sum += cmd.argv_len_sum;
|
||||||
|
return sum + argv_len_sumActive;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function puts the client in the queue of clients that should write
|
/* This function puts the client in the queue of clients that should write
|
||||||
* their output buffers to the socket. Note that it does not *yet* install
|
* their output buffers to the socket. Note that it does not *yet* install
|
||||||
* the write handler, to start clients are put in a queue of clients that need
|
* the write handler, to start clients are put in a queue of clients that need
|
||||||
@ -495,7 +501,7 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
|
/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
|
||||||
void afterErrorReply(client *c, const char *s, size_t len) {
|
void afterErrorReply(client *c, const char *s, size_t len, int severity = ERR_CRITICAL) {
|
||||||
/* Sometimes it could be normal that a replica replies to a master with
|
/* Sometimes it could be normal that a replica replies to a master with
|
||||||
* an error and this function gets called. Actually the error will never
|
* an error and this function gets called. Actually the error will never
|
||||||
* be sent because addReply*() against master clients has no effect...
|
* be sent because addReply*() against master clients has no effect...
|
||||||
@ -523,9 +529,30 @@ void afterErrorReply(client *c, const char *s, size_t len) {
|
|||||||
|
|
||||||
if (len > 4096) len = 4096;
|
if (len > 4096) len = 4096;
|
||||||
const char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
|
const char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
|
||||||
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
|
switch (severity) {
|
||||||
"to its %s: '%.*s' after processing the command "
|
case ERR_NOTICE:
|
||||||
"'%s'", from, to, (int)len, s, cmdname);
|
serverLog(LL_NOTICE,"== NOTICE == This %s is rejecting a command "
|
||||||
|
"from its %s: '%.*s' after processing the command "
|
||||||
|
"'%s'", from, to, (int)len, s, cmdname);
|
||||||
|
break;
|
||||||
|
case ERR_WARNING:
|
||||||
|
serverLog(LL_WARNING,"== WARNING == This %s is rejecting a command "
|
||||||
|
"from its %s: '%.*s' after processing the command "
|
||||||
|
"'%s'", from, to, (int)len, s, cmdname);
|
||||||
|
break;
|
||||||
|
case ERR_ERROR:
|
||||||
|
serverLog(LL_WARNING,"== ERROR == This %s is sending an error "
|
||||||
|
"to its %s: '%.*s' after processing the command "
|
||||||
|
"'%s'", from, to, (int)len, s, cmdname);
|
||||||
|
break;
|
||||||
|
case ERR_CRITICAL:
|
||||||
|
default:
|
||||||
|
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
|
||||||
|
"to its %s: '%.*s' after processing the command "
|
||||||
|
"'%s'", from, to, (int)len, s, cmdname);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (ctype == CLIENT_TYPE_MASTER && g_pserver->repl_backlog &&
|
if (ctype == CLIENT_TYPE_MASTER && g_pserver->repl_backlog &&
|
||||||
g_pserver->repl_backlog_histlen > 0)
|
g_pserver->repl_backlog_histlen > 0)
|
||||||
{
|
{
|
||||||
@ -537,9 +564,9 @@ void afterErrorReply(client *c, const char *s, size_t len) {
|
|||||||
|
|
||||||
/* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
|
/* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
|
||||||
* Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
|
* Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
|
||||||
void addReplyErrorObject(client *c, robj *err) {
|
void addReplyErrorObject(client *c, robj *err, int severity) {
|
||||||
addReply(c, err);
|
addReply(c, err);
|
||||||
afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2); /* Ignore trailing \r\n */
|
afterErrorReply(c, szFromObj(err), sdslen(szFromObj(err))-2, severity); /* Ignore trailing \r\n */
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyError(client *c, const char *err) {
|
void addReplyError(client *c, const char *err) {
|
||||||
@ -1322,7 +1349,7 @@ static void freeClientArgv(client *c) {
|
|||||||
decrRefCount(c->argv[j]);
|
decrRefCount(c->argv[j]);
|
||||||
c->argc = 0;
|
c->argc = 0;
|
||||||
c->cmd = NULL;
|
c->cmd = NULL;
|
||||||
c->argv_len_sum = 0;
|
c->argv_len_sumActive = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void disconnectSlavesExcept(unsigned char *uuid)
|
void disconnectSlavesExcept(unsigned char *uuid)
|
||||||
@ -1553,12 +1580,12 @@ bool freeClient(client *c) {
|
|||||||
zfree(c->replyAsync);
|
zfree(c->replyAsync);
|
||||||
if (c->name) decrRefCount(c->name);
|
if (c->name) decrRefCount(c->name);
|
||||||
zfree(c->argv);
|
zfree(c->argv);
|
||||||
c->argv_len_sum = 0;
|
c->argv_len_sumActive = 0;
|
||||||
freeClientMultiState(c);
|
freeClientMultiState(c);
|
||||||
sdsfree(c->peerid);
|
sdsfree(c->peerid);
|
||||||
ulock.unlock();
|
ulock.unlock();
|
||||||
fastlock_free(&c->lock);
|
fastlock_free(&c->lock);
|
||||||
zfree(c);
|
delete c;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1897,9 +1924,6 @@ void resetClient(client *c) {
|
|||||||
redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
|
redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
|
||||||
|
|
||||||
freeClientArgv(c);
|
freeClientArgv(c);
|
||||||
c->reqtype = 0;
|
|
||||||
c->multibulklen = 0;
|
|
||||||
c->bulklen = -1;
|
|
||||||
|
|
||||||
/* We clear the ASKING flag as well if we are not inside a MULTI, and
|
/* We clear the ASKING flag as well if we are not inside a MULTI, and
|
||||||
* if what we just executed is not the ASKING command itself. */
|
* if what we just executed is not the ASKING command itself. */
|
||||||
@ -2021,16 +2045,13 @@ int processInlineBuffer(client *c) {
|
|||||||
|
|
||||||
/* Setup argv array on client structure */
|
/* Setup argv array on client structure */
|
||||||
if (argc) {
|
if (argc) {
|
||||||
if (c->argv) zfree(c->argv);
|
/* Create redis objects for all arguments. */
|
||||||
c->argv = (robj**)zmalloc(sizeof(robj*)*argc, MALLOC_LOCAL);
|
c->vecqueuedcmd.emplace_back(argc);
|
||||||
c->argv_len_sum = 0;
|
auto &cmd = c->vecqueuedcmd.back();
|
||||||
}
|
for (cmd.argc = 0, j = 0; j < argc; j++) {
|
||||||
|
cmd.argv[cmd.argc++] = createObject(OBJ_STRING,argv[j]);
|
||||||
/* Create redis objects for all arguments. */
|
cmd.argv_len_sum += sdslen(argv[j]);
|
||||||
for (c->argc = 0, j = 0; j < argc; j++) {
|
}
|
||||||
c->argv[c->argc] = createObject(OBJ_STRING,argv[j]);
|
|
||||||
c->argc++;
|
|
||||||
c->argv_len_sum += sdslen(argv[j]);
|
|
||||||
}
|
}
|
||||||
sds_free(argv);
|
sds_free(argv);
|
||||||
return C_OK;
|
return C_OK;
|
||||||
@ -2120,9 +2141,7 @@ int processMultibulkBuffer(client *c) {
|
|||||||
c->multibulklen = ll;
|
c->multibulklen = ll;
|
||||||
|
|
||||||
/* Setup argv array on client structure */
|
/* Setup argv array on client structure */
|
||||||
if (c->argv) zfree(c->argv);
|
c->vecqueuedcmd.emplace_back(c->multibulklen);
|
||||||
c->argv = (robj**)zmalloc(sizeof(robj*)*c->multibulklen, MALLOC_LOCAL);
|
|
||||||
c->argv_len_sum = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
serverAssertWithInfo(c,NULL,c->multibulklen > 0);
|
serverAssertWithInfo(c,NULL,c->multibulklen > 0);
|
||||||
@ -2190,21 +2209,22 @@ int processMultibulkBuffer(client *c) {
|
|||||||
/* Optimization: if the buffer contains JUST our bulk element
|
/* Optimization: if the buffer contains JUST our bulk element
|
||||||
* instead of creating a new object by *copying* the sds we
|
* instead of creating a new object by *copying* the sds we
|
||||||
* just use the current sds string. */
|
* just use the current sds string. */
|
||||||
|
auto &cmd = c->vecqueuedcmd.back();
|
||||||
if (c->qb_pos == 0 &&
|
if (c->qb_pos == 0 &&
|
||||||
c->bulklen >= PROTO_MBULK_BIG_ARG &&
|
c->bulklen >= PROTO_MBULK_BIG_ARG &&
|
||||||
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
|
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
|
||||||
{
|
{
|
||||||
c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
|
cmd.argv[cmd.argc++] = createObject(OBJ_STRING,c->querybuf);
|
||||||
c->argv_len_sum += c->bulklen;
|
cmd.argv_len_sum += c->bulklen;
|
||||||
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
|
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
|
||||||
/* Assume that if we saw a fat argument we'll see another one
|
/* Assume that if we saw a fat argument we'll see another one
|
||||||
* likely... */
|
* likely... */
|
||||||
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
|
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
|
||||||
sdsclear(c->querybuf);
|
sdsclear(c->querybuf);
|
||||||
} else {
|
} else {
|
||||||
c->argv[c->argc++] =
|
cmd.argv[cmd.argc++] =
|
||||||
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
|
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
|
||||||
c->argv_len_sum += c->bulklen;
|
cmd.argv_len_sum += c->bulklen;
|
||||||
c->qb_pos += c->bulklen+2;
|
c->qb_pos += c->bulklen+2;
|
||||||
}
|
}
|
||||||
c->bulklen = -1;
|
c->bulklen = -1;
|
||||||
@ -2228,7 +2248,8 @@ void commandProcessed(client *c, int flags) {
|
|||||||
long long prev_offset = c->reploff;
|
long long prev_offset = c->reploff;
|
||||||
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
|
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
|
||||||
/* Update the applied replication offset of our master. */
|
/* Update the applied replication offset of our master. */
|
||||||
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
serverAssert(c->reploff <= c->reploff_cmd);
|
||||||
|
c->reploff = c->reploff_cmd;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Don't reset the client structure for clients blocked in a
|
/* Don't reset the client structure for clients blocked in a
|
||||||
@ -2285,34 +2306,34 @@ int processCommandAndResetClient(client *c, int flags) {
|
|||||||
return deadclient ? C_ERR : C_OK;
|
return deadclient ? C_ERR : C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called every time, in the client structure 'c', there is
|
static bool FClientReady(client *c) {
|
||||||
* more query buffer to process, because we read more data from the socket
|
/* Return if clients are paused. */
|
||||||
* or because a client was blocked and later reactivated, so there could be
|
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) return false;
|
||||||
* pending query buffer, already representing a full command, to process. */
|
|
||||||
void processInputBuffer(client *c, int callFlags) {
|
/* Immediately abort if the client is in the middle of something. */
|
||||||
AssertCorrectThread(c);
|
if (c->flags & CLIENT_BLOCKED) return false;
|
||||||
|
|
||||||
|
/* Don't process input from the master while there is a busy script
|
||||||
|
* condition on the replica. We want just to accumulate the replication
|
||||||
|
* stream (instead of replying -BUSY like we do with other clients) and
|
||||||
|
* later resume the processing. */
|
||||||
|
if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) return false;
|
||||||
|
|
||||||
|
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
|
||||||
|
* written to the client. Make sure to not let the reply grow after
|
||||||
|
* this flag has been set (i.e. don't process more commands).
|
||||||
|
*
|
||||||
|
* The same applies for clients we want to terminate ASAP. */
|
||||||
|
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void parseClientCommandBuffer(client *c) {
|
||||||
|
if (!FClientReady(c))
|
||||||
|
return;
|
||||||
|
|
||||||
/* Keep processing while there is something in the input buffer */
|
while(c->qb_pos < sdslen(c->querybuf)) {
|
||||||
while(c->qb_pos < sdslen(c->querybuf)) {
|
|
||||||
/* Return if clients are paused. */
|
|
||||||
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
|
|
||||||
|
|
||||||
/* Immediately abort if the client is in the middle of something. */
|
|
||||||
if (c->flags & CLIENT_BLOCKED) break;
|
|
||||||
|
|
||||||
/* Don't process input from the master while there is a busy script
|
|
||||||
* condition on the replica. We want just to accumulate the replication
|
|
||||||
* stream (instead of replying -BUSY like we do with other clients) and
|
|
||||||
* later resume the processing. */
|
|
||||||
if (g_pserver->lua_timedout && c->flags & CLIENT_MASTER) break;
|
|
||||||
|
|
||||||
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
|
|
||||||
* written to the client. Make sure to not let the reply grow after
|
|
||||||
* this flag has been set (i.e. don't process more commands).
|
|
||||||
*
|
|
||||||
* The same applies for clients we want to terminate ASAP. */
|
|
||||||
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
|
|
||||||
|
|
||||||
/* Determine request type when unknown. */
|
/* Determine request type when unknown. */
|
||||||
if (!c->reqtype) {
|
if (!c->reqtype) {
|
||||||
if (c->querybuf[c->qb_pos] == '*') {
|
if (c->querybuf[c->qb_pos] == '*') {
|
||||||
@ -2322,6 +2343,7 @@ void processInputBuffer(client *c, int callFlags) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t cqueries = c->vecqueuedcmd.size();
|
||||||
if (c->reqtype == PROTO_REQ_INLINE) {
|
if (c->reqtype == PROTO_REQ_INLINE) {
|
||||||
if (processInlineBuffer(c) != C_OK) break;
|
if (processInlineBuffer(c) != C_OK) break;
|
||||||
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
|
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
|
||||||
@ -2329,6 +2351,61 @@ void processInputBuffer(client *c, int callFlags) {
|
|||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown request type");
|
serverPanic("Unknown request type");
|
||||||
}
|
}
|
||||||
|
if (!c->vecqueuedcmd.empty() && (c->vecqueuedcmd.back().argc <= 0 || c->vecqueuedcmd.back().argv == nullptr)) {
|
||||||
|
c->vecqueuedcmd.pop_back();
|
||||||
|
} else if (!c->vecqueuedcmd.empty()) {
|
||||||
|
if (c->flags & CLIENT_MASTER) c->vecqueuedcmd.back().reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
|
||||||
|
serverAssert(c->vecqueuedcmd.back().reploff >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Prefetch if we have a storage provider and we're not in the global lock */
|
||||||
|
if (cqueries < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) {
|
||||||
|
auto &query = c->vecqueuedcmd.back();
|
||||||
|
if (query.argc > 0 && query.argc == query.argcMax) {
|
||||||
|
c->db->prefetchKeysAsync(c, query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c->reqtype = 0;
|
||||||
|
c->multibulklen = 0;
|
||||||
|
c->bulklen = -1;
|
||||||
|
c->reqtype = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Trim to pos */
|
||||||
|
if (c->qb_pos) {
|
||||||
|
sdsrange(c->querybuf,c->qb_pos,-1);
|
||||||
|
c->qb_pos = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This function is called every time, in the client structure 'c', there is
|
||||||
|
* more query buffer to process, because we read more data from the socket
|
||||||
|
* or because a client was blocked and later reactivated, so there could be
|
||||||
|
* pending query buffer, already representing a full command, to process. */
|
||||||
|
void processInputBuffer(client *c, bool fParse, int callFlags) {
|
||||||
|
AssertCorrectThread(c);
|
||||||
|
|
||||||
|
if (fParse)
|
||||||
|
parseClientCommandBuffer(c);
|
||||||
|
|
||||||
|
/* Keep processing while there is something in the input buffer */
|
||||||
|
while (!c->vecqueuedcmd.empty()) {
|
||||||
|
/* Return if we're still parsing this command */
|
||||||
|
auto &cmd = c->vecqueuedcmd.front();
|
||||||
|
if (cmd.argc != cmd.argcMax) break;
|
||||||
|
|
||||||
|
if (!FClientReady(c)) break;
|
||||||
|
|
||||||
|
zfree(c->argv);
|
||||||
|
c->argc = cmd.argc;
|
||||||
|
c->argv = cmd.argv;
|
||||||
|
cmd.argv = nullptr;
|
||||||
|
c->argv_len_sumActive = cmd.argv_len_sum;
|
||||||
|
cmd.argv_len_sum = 0;
|
||||||
|
c->reploff_cmd = cmd.reploff;
|
||||||
|
serverAssert(c->argv != nullptr);
|
||||||
|
|
||||||
|
c->vecqueuedcmd.erase(c->vecqueuedcmd.begin());
|
||||||
|
|
||||||
/* Multibulk processing could see a <= 0 length. */
|
/* Multibulk processing could see a <= 0 length. */
|
||||||
if (c->argc == 0) {
|
if (c->argc == 0) {
|
||||||
@ -2343,12 +2420,6 @@ void processInputBuffer(client *c, int callFlags) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Trim to pos */
|
|
||||||
if (c->qb_pos) {
|
|
||||||
sdsrange(c->querybuf,c->qb_pos,-1);
|
|
||||||
c->qb_pos = 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void readQueryFromClient(connection *conn) {
|
void readQueryFromClient(connection *conn) {
|
||||||
@ -2429,6 +2500,8 @@ void readQueryFromClient(connection *conn) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
parseClientCommandBuffer(c);
|
||||||
|
|
||||||
serverTL->vecclientsProcess.push_back(c);
|
serverTL->vecclientsProcess.push_back(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2440,7 +2513,7 @@ void processClients()
|
|||||||
/* There is more data in the client input buffer, continue parsing it
|
/* There is more data in the client input buffer, continue parsing it
|
||||||
* in case to check if there is a full command to execute. */
|
* in case to check if there is a full command to execute. */
|
||||||
std::unique_lock<fastlock> ul(c->lock);
|
std::unique_lock<fastlock> ul(c->lock);
|
||||||
processInputBuffer(c, CMD_CALL_FULL);
|
processInputBuffer(c, false /*fParse*/, CMD_CALL_FULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (listLength(serverTL->clients_pending_asyncwrite))
|
if (listLength(serverTL->clients_pending_asyncwrite))
|
||||||
@ -2547,7 +2620,7 @@ sds catClientInfoString(sds s, client *client) {
|
|||||||
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
|
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
|
||||||
* i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
|
* i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
|
||||||
* spot problematic clients. */
|
* spot problematic clients. */
|
||||||
total_mem += client->argv_len_sum;
|
total_mem += client->argv_len_sum();
|
||||||
if (client->argv)
|
if (client->argv)
|
||||||
total_mem += zmalloc_size(client->argv);
|
total_mem += zmalloc_size(client->argv);
|
||||||
|
|
||||||
@ -2566,7 +2639,7 @@ sds catClientInfoString(sds s, client *client) {
|
|||||||
(client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
|
(client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
|
||||||
(unsigned long long) sdslen(client->querybuf),
|
(unsigned long long) sdslen(client->querybuf),
|
||||||
(unsigned long long) sdsavail(client->querybuf),
|
(unsigned long long) sdsavail(client->querybuf),
|
||||||
(unsigned long long) client->argv_len_sum,
|
(unsigned long long) client->argv_len_sum(),
|
||||||
(unsigned long long) client->bufpos,
|
(unsigned long long) client->bufpos,
|
||||||
(unsigned long long) listLength(client->reply),
|
(unsigned long long) listLength(client->reply),
|
||||||
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
|
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
|
||||||
@ -3123,10 +3196,10 @@ void rewriteClientCommandVector(client *c, int argc, ...) {
|
|||||||
/* Replace argv and argc with our new versions. */
|
/* Replace argv and argc with our new versions. */
|
||||||
c->argv = argv;
|
c->argv = argv;
|
||||||
c->argc = argc;
|
c->argc = argc;
|
||||||
c->argv_len_sum = 0;
|
c->argv_len_sumActive = 0;
|
||||||
for (j = 0; j < c->argc; j++)
|
for (j = 0; j < c->argc; j++)
|
||||||
if (c->argv[j])
|
if (c->argv[j])
|
||||||
c->argv_len_sum += getStringObjectLen(c->argv[j]);
|
c->argv_len_sumActive += getStringObjectLen(c->argv[j]);
|
||||||
c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0]));
|
c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0]));
|
||||||
serverAssertWithInfo(c,NULL,c->cmd != NULL);
|
serverAssertWithInfo(c,NULL,c->cmd != NULL);
|
||||||
va_end(ap);
|
va_end(ap);
|
||||||
@ -3139,10 +3212,10 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
|
|||||||
zfree(c->argv);
|
zfree(c->argv);
|
||||||
c->argv = argv;
|
c->argv = argv;
|
||||||
c->argc = argc;
|
c->argc = argc;
|
||||||
c->argv_len_sum = 0;
|
c->argv_len_sumActive = 0;
|
||||||
for (j = 0; j < c->argc; j++)
|
for (j = 0; j < c->argc; j++)
|
||||||
if (c->argv[j])
|
if (c->argv[j])
|
||||||
c->argv_len_sum += getStringObjectLen(c->argv[j]);
|
c->argv_len_sumActive += getStringObjectLen(c->argv[j]);
|
||||||
c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0]));
|
c->cmd = lookupCommandOrOriginal((sds)ptrFromObj(c->argv[0]));
|
||||||
serverAssertWithInfo(c,NULL,c->cmd != NULL);
|
serverAssertWithInfo(c,NULL,c->cmd != NULL);
|
||||||
}
|
}
|
||||||
@ -3167,8 +3240,8 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
|
|||||||
c->argv[i] = NULL;
|
c->argv[i] = NULL;
|
||||||
}
|
}
|
||||||
oldval = c->argv[i];
|
oldval = c->argv[i];
|
||||||
if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
|
if (oldval) c->argv_len_sumActive -= getStringObjectLen(oldval);
|
||||||
if (newval) c->argv_len_sum += getStringObjectLen(newval);
|
if (newval) c->argv_len_sumActive += getStringObjectLen(newval);
|
||||||
c->argv[i] = newval;
|
c->argv[i] = newval;
|
||||||
incrRefCount(newval);
|
incrRefCount(newval);
|
||||||
if (oldval) decrRefCount(oldval);
|
if (oldval) decrRefCount(oldval);
|
||||||
@ -3302,6 +3375,8 @@ void flushSlavesOutputBuffers(void) {
|
|||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
|
|
||||||
|
flushReplBacklogToClients();
|
||||||
|
|
||||||
listRewind(g_pserver->slaves,&li);
|
listRewind(g_pserver->slaves,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *replica = (client*)listNodeValue(ln);
|
client *replica = (client*)listNodeValue(ln);
|
||||||
@ -3470,6 +3545,16 @@ void processEventsWhileBlocked(int iel) {
|
|||||||
locker.arm(nullptr);
|
locker.arm(nullptr);
|
||||||
locker.release();
|
locker.release();
|
||||||
|
|
||||||
|
// Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here)
|
||||||
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
|
redisDb *db = g_pserver->db[idb];
|
||||||
|
while (db->dictUnsafeKeyOnly()->asyncdata != nullptr) {
|
||||||
|
if (!db->dictUnsafeKeyOnly()->asyncdata->done)
|
||||||
|
break;
|
||||||
|
dictCompleteRehashAsync(db->dictUnsafeKeyOnly()->asyncdata, false /*fFree*/);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Restore it so the calling code is not confused
|
// Restore it so the calling code is not confused
|
||||||
if (fReplBacklog && !serverTL->el->stop) {
|
if (fReplBacklog && !serverTL->el->stop) {
|
||||||
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
||||||
|
@ -2341,10 +2341,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
|
|||||||
(g_pserver->loading_process_events_interval_keys &&
|
(g_pserver->loading_process_events_interval_keys &&
|
||||||
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
|
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
|
||||||
{
|
{
|
||||||
/* The DB can take some non trivial amount of time to load. Update
|
|
||||||
* our cached time since it is used to create and update the last
|
|
||||||
* interaction time with clients and for other important things. */
|
|
||||||
updateCachedTime(0);
|
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listRewind(g_pserver->masters, &li);
|
listRewind(g_pserver->masters, &li);
|
||||||
|
@ -1902,6 +1902,7 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
int use_diskless_load = useDisklessLoad();
|
int use_diskless_load = useDisklessLoad();
|
||||||
const dbBackup *diskless_load_backup = NULL;
|
const dbBackup *diskless_load_backup = NULL;
|
||||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||||
|
rsi.fForceSetKey = !!g_pserver->fActiveReplica;
|
||||||
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC :
|
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC :
|
||||||
EMPTYDB_NO_FLAGS;
|
EMPTYDB_NO_FLAGS;
|
||||||
off_t left;
|
off_t left;
|
||||||
@ -3432,6 +3433,13 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
|
|||||||
* offsets, including pending transactions, already populated arguments,
|
* offsets, including pending transactions, already populated arguments,
|
||||||
* pending outputs to the master. */
|
* pending outputs to the master. */
|
||||||
sdsclear(mi->master->querybuf);
|
sdsclear(mi->master->querybuf);
|
||||||
|
if (!mi->master->vecqueuedcmd.empty()) {
|
||||||
|
// Clear out everything except for partially parsed commands (which we'll cache)
|
||||||
|
auto cmd = std::move(mi->master->vecqueuedcmd.front());
|
||||||
|
mi->master->vecqueuedcmd.clear();
|
||||||
|
if (cmd.argc != cmd.argcMax)
|
||||||
|
mi->master->vecqueuedcmd.emplace_back(std::move(cmd));
|
||||||
|
}
|
||||||
sdsclear(mi->master->pending_querybuf);
|
sdsclear(mi->master->pending_querybuf);
|
||||||
mi->master->read_reploff = mi->master->reploff;
|
mi->master->read_reploff = mi->master->reploff;
|
||||||
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
||||||
@ -4307,10 +4315,12 @@ void replicaReplayCommand(client *c)
|
|||||||
cFake->authenticated = c->authenticated;
|
cFake->authenticated = c->authenticated;
|
||||||
cFake->puser = c->puser;
|
cFake->puser = c->puser;
|
||||||
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
|
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
|
||||||
|
cFake->read_reploff = sdslen(cFake->querybuf);
|
||||||
|
cFake->reploff = 0;
|
||||||
selectDb(cFake, c->db->id);
|
selectDb(cFake, c->db->id);
|
||||||
auto ccmdPrev = serverTL->commandsExecuted;
|
auto ccmdPrev = serverTL->commandsExecuted;
|
||||||
cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP;
|
cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP;
|
||||||
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
processInputBuffer(cFake, true /*fParse*/, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
||||||
cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP);
|
cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP);
|
||||||
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
||||||
cFake->lock.unlock();
|
cFake->lock.unlock();
|
||||||
|
186
src/server.cpp
186
src/server.cpp
@ -690,7 +690,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
* failure detection, and a loading server is considered to be
|
* failure detection, and a loading server is considered to be
|
||||||
* not available. */
|
* not available. */
|
||||||
{"ping",pingCommand,-1,
|
{"ping",pingCommand,-1,
|
||||||
"ok-stale fast @connection @replication",
|
"ok-stale ok-loading fast @connection @replication",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"echo",echoCommand,2,
|
{"echo",echoCommand,2,
|
||||||
@ -1558,14 +1558,14 @@ void tryResizeHashTables(int dbid) {
|
|||||||
* table will use two tables for a long time. So we try to use 1 millisecond
|
* table will use two tables for a long time. So we try to use 1 millisecond
|
||||||
* of CPU time at every call of this function to perform some rehashing.
|
* of CPU time at every call of this function to perform some rehashing.
|
||||||
*
|
*
|
||||||
* The function returns 1 if some rehashing was performed, otherwise 0
|
* The function returns the number of rehashes if some rehashing was performed, otherwise 0
|
||||||
* is returned. */
|
* is returned. */
|
||||||
int redisDbPersistentData::incrementallyRehash() {
|
int redisDbPersistentData::incrementallyRehash() {
|
||||||
/* Keys dictionary */
|
/* Keys dictionary */
|
||||||
if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) {
|
if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) {
|
||||||
dictRehashMilliseconds(m_pdict,1);
|
int result = dictRehashMilliseconds(m_pdict,1);
|
||||||
dictRehashMilliseconds(m_pdictTombstone,1);
|
result += dictRehashMilliseconds(m_pdictTombstone,1);
|
||||||
return 1; /* already used our millisecond for this loop... */
|
return result; /* already used our millisecond for this loop... */
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1749,7 +1749,7 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
|
|||||||
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
|
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
|
||||||
|
|
||||||
int clientsCronTrackExpansiveClients(client *c) {
|
int clientsCronTrackExpansiveClients(client *c) {
|
||||||
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum;
|
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum();
|
||||||
size_t out_usage = getClientOutputBufferMemoryUsage(c);
|
size_t out_usage = getClientOutputBufferMemoryUsage(c);
|
||||||
int i = g_pserver->unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
|
int i = g_pserver->unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
|
||||||
int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
|
int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
|
||||||
@ -1786,7 +1786,7 @@ int clientsCronTrackClientsMemUsage(client *c) {
|
|||||||
mem += getClientOutputBufferMemoryUsage(c);
|
mem += getClientOutputBufferMemoryUsage(c);
|
||||||
mem += sdsZmallocSize(c->querybuf);
|
mem += sdsZmallocSize(c->querybuf);
|
||||||
mem += zmalloc_size(c);
|
mem += zmalloc_size(c);
|
||||||
mem += c->argv_len_sum;
|
mem += c->argv_len_sum();
|
||||||
if (c->argv) mem += zmalloc_size(c->argv);
|
if (c->argv) mem += zmalloc_size(c->argv);
|
||||||
/* Now that we have the memory used by the client, remove the old
|
/* Now that we have the memory used by the client, remove the old
|
||||||
* value from the old category, and add it back. */
|
* value from the old category, and add it back. */
|
||||||
@ -1884,22 +1884,32 @@ bool expireOwnKeys()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int hash_spin_worker() {
|
||||||
|
auto ctl = serverTL->rehashCtl;
|
||||||
|
return dictRehashSomeAsync(ctl, 1);
|
||||||
|
}
|
||||||
|
|
||||||
/* This function handles 'background' operations we are required to do
|
/* This function handles 'background' operations we are required to do
|
||||||
* incrementally in Redis databases, such as active key expiring, resizing,
|
* incrementally in Redis databases, such as active key expiring, resizing,
|
||||||
* rehashing. */
|
* rehashing. */
|
||||||
void databasesCron(void) {
|
void databasesCron(bool fMainThread) {
|
||||||
/* Expire keys by random sampling. Not required for slaves
|
serverAssert(GlobalLocksAcquired());
|
||||||
* as master will synthesize DELs for us. */
|
static int rehashes_per_ms = 0;
|
||||||
if (g_pserver->active_expire_enabled) {
|
static int async_rehashes = 0;
|
||||||
if (expireOwnKeys()) {
|
if (fMainThread) {
|
||||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
/* Expire keys by random sampling. Not required for slaves
|
||||||
} else {
|
* as master will synthesize DELs for us. */
|
||||||
expireSlaveKeys();
|
if (g_pserver->active_expire_enabled) {
|
||||||
|
if (expireOwnKeys()) {
|
||||||
|
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
||||||
|
} else {
|
||||||
|
expireSlaveKeys();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/* Defrag keys gradually. */
|
/* Defrag keys gradually. */
|
||||||
activeDefragCycle();
|
activeDefragCycle();
|
||||||
|
}
|
||||||
|
|
||||||
/* Perform hash tables rehashing if needed, but only if there are no
|
/* Perform hash tables rehashing if needed, but only if there are no
|
||||||
* other processes saving the DB on disk. Otherwise rehashing is bad
|
* other processes saving the DB on disk. Otherwise rehashing is bad
|
||||||
@ -1916,28 +1926,70 @@ void databasesCron(void) {
|
|||||||
/* Don't test more DBs than we have. */
|
/* Don't test more DBs than we have. */
|
||||||
if (dbs_per_call > cserver.dbnum) dbs_per_call = cserver.dbnum;
|
if (dbs_per_call > cserver.dbnum) dbs_per_call = cserver.dbnum;
|
||||||
|
|
||||||
/* Resize */
|
if (fMainThread) {
|
||||||
for (j = 0; j < dbs_per_call; j++) {
|
/* Resize */
|
||||||
tryResizeHashTables(resize_db % cserver.dbnum);
|
for (j = 0; j < dbs_per_call; j++) {
|
||||||
resize_db++;
|
tryResizeHashTables(resize_db % cserver.dbnum);
|
||||||
|
resize_db++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Rehash */
|
/* Rehash */
|
||||||
if (g_pserver->activerehashing) {
|
if (g_pserver->activerehashing) {
|
||||||
for (j = 0; j < dbs_per_call; j++) {
|
for (j = 0; j < dbs_per_call; j++) {
|
||||||
int work_done = g_pserver->db[rehash_db]->incrementallyRehash();
|
if (serverTL->rehashCtl != nullptr) {
|
||||||
if (work_done) {
|
if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) {
|
||||||
/* If the function did some work, stop here, we'll do
|
break;
|
||||||
* more at the next cron loop. */
|
} else {
|
||||||
|
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
|
||||||
|
serverTL->rehashCtl = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serverAssert(serverTL->rehashCtl == nullptr);
|
||||||
|
::dict *dict = g_pserver->db[rehash_db]->dictUnsafeKeyOnly();
|
||||||
|
/* Are we async rehashing? And if so is it time to re-calibrate? */
|
||||||
|
/* The recalibration limit is a prime number to ensure balancing across threads */
|
||||||
|
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1) {
|
||||||
|
serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms);
|
||||||
|
++async_rehashes;
|
||||||
|
}
|
||||||
|
if (serverTL->rehashCtl)
|
||||||
break;
|
break;
|
||||||
} else {
|
|
||||||
/* If this db didn't need rehash, we'll try the next one. */
|
// Before starting anything new, can we end the rehash of a blocked thread?
|
||||||
|
if (dict->asyncdata != nullptr) {
|
||||||
|
auto asyncdata = dict->asyncdata;
|
||||||
|
if (asyncdata->done) {
|
||||||
|
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
|
||||||
|
serverAssert(dict->asyncdata != asyncdata);
|
||||||
|
break; // completion can be expensive, don't do anything else
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rehashes_per_ms = g_pserver->db[rehash_db]->incrementallyRehash();
|
||||||
|
async_rehashes = 0;
|
||||||
|
if (rehashes_per_ms > 0) {
|
||||||
|
/* If the function did some work, stop here, we'll do
|
||||||
|
* more at the next cron loop. */
|
||||||
|
if (!cserver.active_defrag_enabled) {
|
||||||
|
serverLog(LL_VERBOSE, "Calibrated rehashes per ms: %d", rehashes_per_ms);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
} else if (dict->asyncdata == nullptr) {
|
||||||
|
/* If this db didn't need rehash and we have none in flight, we'll try the next one. */
|
||||||
rehash_db++;
|
rehash_db++;
|
||||||
rehash_db %= cserver.dbnum;
|
rehash_db %= cserver.dbnum;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (serverTL->rehashCtl) {
|
||||||
|
setAeLockSetThreadSpinWorker(hash_spin_worker);
|
||||||
|
} else {
|
||||||
|
setAeLockSetThreadSpinWorker(nullptr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We take a cached value of the unix time in the global state because with
|
/* We take a cached value of the unix time in the global state because with
|
||||||
@ -1950,7 +2002,7 @@ void databasesCron(void) {
|
|||||||
* info or not using the 'update_daylight_info' argument. Normally we update
|
* info or not using the 'update_daylight_info' argument. Normally we update
|
||||||
* such info only when calling this function from serverCron() but not when
|
* such info only when calling this function from serverCron() but not when
|
||||||
* calling it from call(). */
|
* calling it from call(). */
|
||||||
void updateCachedTime(int update_daylight_info) {
|
void updateCachedTime() {
|
||||||
long long t = ustime();
|
long long t = ustime();
|
||||||
__atomic_store(&g_pserver->ustime, &t, __ATOMIC_RELAXED);
|
__atomic_store(&g_pserver->ustime, &t, __ATOMIC_RELAXED);
|
||||||
t /= 1000;
|
t /= 1000;
|
||||||
@ -1963,12 +2015,10 @@ void updateCachedTime(int update_daylight_info) {
|
|||||||
* context is safe since we will never fork() while here, in the main
|
* context is safe since we will never fork() while here, in the main
|
||||||
* thread. The logging function will call a thread safe version of
|
* thread. The logging function will call a thread safe version of
|
||||||
* localtime that has no locks. */
|
* localtime that has no locks. */
|
||||||
if (update_daylight_info) {
|
struct tm tm;
|
||||||
struct tm tm;
|
time_t ut = g_pserver->unixtime;
|
||||||
time_t ut = g_pserver->unixtime;
|
localtime_r(&ut,&tm);
|
||||||
localtime_r(&ut,&tm);
|
__atomic_store(&g_pserver->daylight_active, &tm.tm_isdst, __ATOMIC_RELAXED);
|
||||||
__atomic_store(&g_pserver->daylight_active, &tm.tm_isdst, __ATOMIC_RELAXED);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void checkChildrenDone(void) {
|
void checkChildrenDone(void) {
|
||||||
@ -2120,9 +2170,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
* handler if we don't return here fast enough. */
|
* handler if we don't return here fast enough. */
|
||||||
if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period);
|
if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period);
|
||||||
|
|
||||||
/* Update the time cache. */
|
|
||||||
updateCachedTime(1);
|
|
||||||
|
|
||||||
/* Unpause clients if enough time has elapsed */
|
/* Unpause clients if enough time has elapsed */
|
||||||
unpauseClientsIfNecessary();
|
unpauseClientsIfNecessary();
|
||||||
|
|
||||||
@ -2232,7 +2279,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
clientsCron(IDX_EVENT_LOOP_MAIN);
|
clientsCron(IDX_EVENT_LOOP_MAIN);
|
||||||
|
|
||||||
/* Handle background operations on Redis databases. */
|
/* Handle background operations on Redis databases. */
|
||||||
databasesCron();
|
databasesCron(true /* fMainThread */);
|
||||||
|
|
||||||
/* Start a scheduled AOF rewrite if this was requested by the user while
|
/* Start a scheduled AOF rewrite if this was requested by the user while
|
||||||
* a BGSAVE was in progress. */
|
* a BGSAVE was in progress. */
|
||||||
@ -2413,6 +2460,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
|||||||
processUnblockedClients(iel);
|
processUnblockedClients(iel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Handle background operations on Redis databases. */
|
||||||
|
databasesCron(false /* fMainThread */);
|
||||||
|
|
||||||
/* Unpause clients if enough time has elapsed */
|
/* Unpause clients if enough time has elapsed */
|
||||||
unpauseClientsIfNecessary();
|
unpauseClientsIfNecessary();
|
||||||
|
|
||||||
@ -2537,6 +2587,18 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
|
|
||||||
int aof_state = g_pserver->aof_state;
|
int aof_state = g_pserver->aof_state;
|
||||||
|
|
||||||
|
mstime_t commit_latency;
|
||||||
|
latencyStartMonitor(commit_latency);
|
||||||
|
if (g_pserver->m_pstorageFactory != nullptr)
|
||||||
|
{
|
||||||
|
locker.disarm();
|
||||||
|
for (redisDb *db : vecdb)
|
||||||
|
db->commitChanges();
|
||||||
|
locker.arm();
|
||||||
|
}
|
||||||
|
latencyEndMonitor(commit_latency);
|
||||||
|
latencyAddSampleIfNeeded("storage-commit", commit_latency);
|
||||||
|
|
||||||
/* We try to handle writes at the end so we don't have to reacquire the lock,
|
/* We try to handle writes at the end so we don't have to reacquire the lock,
|
||||||
but if there is a pending async close we need to ensure the writes happen
|
but if there is a pending async close we need to ensure the writes happen
|
||||||
first so perform it here */
|
first so perform it here */
|
||||||
@ -2547,16 +2609,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
locker.arm();
|
locker.arm();
|
||||||
fSentReplies = true;
|
fSentReplies = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
mstime_t commit_latency;
|
|
||||||
latencyStartMonitor(commit_latency);
|
|
||||||
if (g_pserver->m_pstorageFactory != nullptr)
|
|
||||||
{
|
|
||||||
for (redisDb *db : vecdb)
|
|
||||||
db->commitChanges();
|
|
||||||
}
|
|
||||||
latencyEndMonitor(commit_latency);
|
|
||||||
latencyAddSampleIfNeeded("storage-commit", commit_latency);
|
|
||||||
|
|
||||||
if (!serverTL->gcEpoch.isReset())
|
if (!serverTL->gcEpoch.isReset())
|
||||||
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
|
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
|
||||||
@ -2598,10 +2650,8 @@ void afterSleep(struct aeEventLoop *eventLoop) {
|
|||||||
|
|
||||||
serverAssert(serverTL->gcEpoch.isReset());
|
serverAssert(serverTL->gcEpoch.isReset());
|
||||||
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
||||||
aeAcquireLock();
|
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
g_pserver->db[idb]->trackChanges(false);
|
g_pserver->db[idb]->trackChanges(false);
|
||||||
aeReleaseLock();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* =========================== Server initialization ======================== */
|
/* =========================== Server initialization ======================== */
|
||||||
@ -2755,7 +2805,7 @@ void initMasterInfo(redisMaster *master)
|
|||||||
void initServerConfig(void) {
|
void initServerConfig(void) {
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
updateCachedTime(true);
|
updateCachedTime();
|
||||||
getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE);
|
getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE);
|
||||||
g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0';
|
g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0';
|
||||||
changeReplicationId();
|
changeReplicationId();
|
||||||
@ -3002,7 +3052,7 @@ int setOOMScoreAdj(int process_class) {
|
|||||||
int val;
|
int val;
|
||||||
char buf[64];
|
char buf[64];
|
||||||
|
|
||||||
val = g_pserver->oom_score_adj_base + g_pserver->oom_score_adj_values[process_class];
|
val = g_pserver->oom_score_adj_values[process_class];
|
||||||
if (g_pserver->oom_score_adj == OOM_SCORE_RELATIVE)
|
if (g_pserver->oom_score_adj == OOM_SCORE_RELATIVE)
|
||||||
val += g_pserver->oom_score_adj_base;
|
val += g_pserver->oom_score_adj_base;
|
||||||
if (val > 1000) val = 1000;
|
if (val > 1000) val = 1000;
|
||||||
@ -3858,8 +3908,8 @@ void call(client *c, int flags) {
|
|||||||
|
|
||||||
/* Call the command. */
|
/* Call the command. */
|
||||||
dirty = g_pserver->dirty;
|
dirty = g_pserver->dirty;
|
||||||
updateCachedTime(0);
|
|
||||||
incrementMvccTstamp();
|
incrementMvccTstamp();
|
||||||
|
__atomic_load(&g_pserver->ustime, &start, __ATOMIC_SEQ_CST);
|
||||||
start = g_pserver->ustime;
|
start = g_pserver->ustime;
|
||||||
try {
|
try {
|
||||||
c->cmd->proc(c);
|
c->cmd->proc(c);
|
||||||
@ -3871,7 +3921,9 @@ void call(client *c, int flags) {
|
|||||||
addReplyError(c, sz);
|
addReplyError(c, sz);
|
||||||
}
|
}
|
||||||
serverTL->commandsExecuted++;
|
serverTL->commandsExecuted++;
|
||||||
duration = ustime()-start;
|
ustime_t end;
|
||||||
|
__atomic_load(&g_pserver->ustime, &end, __ATOMIC_SEQ_CST);
|
||||||
|
duration = end-start;
|
||||||
dirty = g_pserver->dirty-dirty;
|
dirty = g_pserver->dirty-dirty;
|
||||||
if (dirty < 0) dirty = 0;
|
if (dirty < 0) dirty = 0;
|
||||||
|
|
||||||
@ -4028,13 +4080,14 @@ void call(client *c, int flags) {
|
|||||||
* If there's a transaction is flags it as dirty, and if the command is EXEC,
|
* If there's a transaction is flags it as dirty, and if the command is EXEC,
|
||||||
* it aborts the transaction.
|
* it aborts the transaction.
|
||||||
* Note: 'reply' is expected to end with \r\n */
|
* Note: 'reply' is expected to end with \r\n */
|
||||||
void rejectCommand(client *c, robj *reply) {
|
void rejectCommand(client *c, robj *reply, int severity = ERR_CRITICAL) {
|
||||||
flagTransaction(c);
|
flagTransaction(c);
|
||||||
if (c->cmd && c->cmd->proc == execCommand) {
|
if (c->cmd && c->cmd->proc == execCommand) {
|
||||||
execCommandAbort(c, szFromObj(reply));
|
execCommandAbort(c, szFromObj(reply));
|
||||||
} else {
|
}
|
||||||
|
else {
|
||||||
/* using addReplyError* rather than addReply so that the error can be logged. */
|
/* using addReplyError* rather than addReply so that the error can be logged. */
|
||||||
addReplyErrorObject(c, reply);
|
addReplyErrorObject(c, reply, severity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4283,7 +4336,7 @@ int processCommand(client *c, int callFlags) {
|
|||||||
/* Active Replicas can execute read only commands, and optionally write commands */
|
/* Active Replicas can execute read only commands, and optionally write commands */
|
||||||
if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad)))
|
if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad)))
|
||||||
{
|
{
|
||||||
rejectCommand(c, shared.loadingerr);
|
rejectCommand(c, shared.loadingerr, ERR_WARNING);
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4345,7 +4398,7 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
|
|||||||
std::lock_guard<decltype(this->lock)> lock(this->lock);
|
std::lock_guard<decltype(this->lock)> lock(this->lock);
|
||||||
fn(this);
|
fn(this);
|
||||||
--casyncOpsPending;
|
--casyncOpsPending;
|
||||||
}, false, fLock) == AE_OK;
|
}, fLock) == AE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*================================== Shutdown =============================== */
|
/*================================== Shutdown =============================== */
|
||||||
@ -4474,6 +4527,8 @@ int prepareForShutdown(int flags) {
|
|||||||
/* Best effort flush of replica output buffers, so that we hopefully
|
/* Best effort flush of replica output buffers, so that we hopefully
|
||||||
* send them pending writes. */
|
* send them pending writes. */
|
||||||
flushSlavesOutputBuffers();
|
flushSlavesOutputBuffers();
|
||||||
|
g_pserver->repl_batch_idxStart = -1;
|
||||||
|
g_pserver->repl_batch_offStart = -1;
|
||||||
|
|
||||||
/* Close the listening sockets. Apparently this allows faster restarts. */
|
/* Close the listening sockets. Apparently this allows faster restarts. */
|
||||||
closeListeningSockets(1);
|
closeListeningSockets(1);
|
||||||
@ -5484,7 +5539,7 @@ int linuxMadvFreeForkBugCheck(void) {
|
|||||||
const long map_size = 3 * 4096;
|
const long map_size = 3 * 4096;
|
||||||
|
|
||||||
/* Create a memory map that's in our full control (not one used by the allocator). */
|
/* Create a memory map that's in our full control (not one used by the allocator). */
|
||||||
p = mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
|
p = (char*)mmap(NULL, map_size, PROT_READ, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
|
||||||
serverAssert(p != MAP_FAILED);
|
serverAssert(p != MAP_FAILED);
|
||||||
|
|
||||||
q = p + 4096;
|
q = p + 4096;
|
||||||
@ -6012,6 +6067,13 @@ void OnTerminate()
|
|||||||
serverPanic("std::teminate() called");
|
serverPanic("std::teminate() called");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *timeThreadMain(void*) {
|
||||||
|
while (true) {
|
||||||
|
updateCachedTime();
|
||||||
|
usleep(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void *workerThreadMain(void *parg)
|
void *workerThreadMain(void *parg)
|
||||||
{
|
{
|
||||||
int iel = (int)((int64_t)parg);
|
int iel = (int)((int64_t)parg);
|
||||||
@ -6359,6 +6421,8 @@ int main(int argc, char **argv) {
|
|||||||
setOOMScoreAdj(-1);
|
setOOMScoreAdj(-1);
|
||||||
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
|
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
|
||||||
|
|
||||||
|
pthread_create(&cserver.time_thread_id, nullptr, timeThreadMain, nullptr);
|
||||||
|
|
||||||
pthread_attr_t tattr;
|
pthread_attr_t tattr;
|
||||||
pthread_attr_init(&tattr);
|
pthread_attr_init(&tattr);
|
||||||
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
|
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
|
||||||
|
82
src/server.h
82
src/server.h
@ -611,6 +611,12 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
|||||||
#define LL_WARNING 3
|
#define LL_WARNING 3
|
||||||
#define LL_RAW (1<<10) /* Modifier to log without timestamp */
|
#define LL_RAW (1<<10) /* Modifier to log without timestamp */
|
||||||
|
|
||||||
|
/* Error severity levels */
|
||||||
|
#define ERR_CRITICAL 0
|
||||||
|
#define ERR_ERROR 1
|
||||||
|
#define ERR_WARNING 2
|
||||||
|
#define ERR_NOTICE 3
|
||||||
|
|
||||||
/* Supervision options */
|
/* Supervision options */
|
||||||
#define SUPERVISED_NONE 0
|
#define SUPERVISED_NONE 0
|
||||||
#define SUPERVISED_AUTODETECT 1
|
#define SUPERVISED_AUTODETECT 1
|
||||||
@ -1130,7 +1136,7 @@ public:
|
|||||||
bool removeCachedValue(const char *key);
|
bool removeCachedValue(const char *key);
|
||||||
void removeAllCachedValues();
|
void removeAllCachedValues();
|
||||||
|
|
||||||
void prefetchKeysAsync(class AeLocker &locker, client *c);
|
void prefetchKeysAsync(client *c, struct parsed_command &command);
|
||||||
|
|
||||||
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||||
|
|
||||||
@ -1166,8 +1172,8 @@ private:
|
|||||||
// Keyspace
|
// Keyspace
|
||||||
dict *m_pdict = nullptr; /* The keyspace for this DB */
|
dict *m_pdict = nullptr; /* The keyspace for this DB */
|
||||||
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
|
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
|
||||||
int m_fTrackingChanges = 0; // Note: Stack based
|
std::atomic<int> m_fTrackingChanges {0}; // Note: Stack based
|
||||||
int m_fAllChanged = 0;
|
std::atomic<int> m_fAllChanged {0};
|
||||||
std::set<changedesc, changedescCmp> m_setchanged;
|
std::set<changedesc, changedescCmp> m_setchanged;
|
||||||
size_t m_cnewKeysPending = 0;
|
size_t m_cnewKeysPending = 0;
|
||||||
std::shared_ptr<StorageCache> m_spstorage = nullptr;
|
std::shared_ptr<StorageCache> m_spstorage = nullptr;
|
||||||
@ -1437,7 +1443,53 @@ typedef struct {
|
|||||||
need more reserved IDs use UINT64_MAX-1,
|
need more reserved IDs use UINT64_MAX-1,
|
||||||
-2, ... and so forth. */
|
-2, ... and so forth. */
|
||||||
|
|
||||||
typedef struct client {
|
struct parsed_command {
|
||||||
|
robj** argv = nullptr;
|
||||||
|
int argc = 0;
|
||||||
|
int argcMax;
|
||||||
|
long long reploff = 0;
|
||||||
|
size_t argv_len_sum = 0; /* Sum of lengths of objects in argv list. */
|
||||||
|
|
||||||
|
parsed_command(int maxargs) {
|
||||||
|
argv = (robj**)zmalloc(sizeof(robj*)*maxargs);
|
||||||
|
argcMax = maxargs;
|
||||||
|
}
|
||||||
|
|
||||||
|
parsed_command &operator=(parsed_command &&o) {
|
||||||
|
argv = o.argv;
|
||||||
|
argc = o.argc;
|
||||||
|
argcMax = o.argcMax;
|
||||||
|
reploff = o.reploff;
|
||||||
|
o.argv = nullptr;
|
||||||
|
o.argc = 0;
|
||||||
|
o.argcMax = 0;
|
||||||
|
o.reploff = 0;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
parsed_command(parsed_command &o) = delete;
|
||||||
|
parsed_command(parsed_command &&o) {
|
||||||
|
argv = o.argv;
|
||||||
|
argc = o.argc;
|
||||||
|
argcMax = o.argcMax;
|
||||||
|
reploff = o.reploff;
|
||||||
|
o.argv = nullptr;
|
||||||
|
o.argc = 0;
|
||||||
|
o.argcMax = 0;
|
||||||
|
o.reploff = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
~parsed_command() {
|
||||||
|
if (argv != nullptr) {
|
||||||
|
for (int i = 0; i < argc; ++i) {
|
||||||
|
decrRefCount(argv[i]);
|
||||||
|
}
|
||||||
|
zfree(argv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct client {
|
||||||
uint64_t id; /* Client incremental unique ID. */
|
uint64_t id; /* Client incremental unique ID. */
|
||||||
connection *conn;
|
connection *conn;
|
||||||
int resp; /* RESP protocol version. Can be 2 or 3. */
|
int resp; /* RESP protocol version. Can be 2 or 3. */
|
||||||
@ -1450,9 +1502,6 @@ typedef struct client {
|
|||||||
replication stream that we are receiving from
|
replication stream that we are receiving from
|
||||||
the master. */
|
the master. */
|
||||||
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
|
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
|
||||||
int argc; /* Num of arguments of current command. */
|
|
||||||
robj **argv; /* Arguments of current command. */
|
|
||||||
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
|
|
||||||
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
|
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
|
||||||
user *puser; /* User associated with this connection. If the
|
user *puser; /* User associated with this connection. If the
|
||||||
user is set to NULL the connection can do
|
user is set to NULL the connection can do
|
||||||
@ -1482,6 +1531,7 @@ typedef struct client {
|
|||||||
long long read_reploff; /* Read replication offset if this is a master. */
|
long long read_reploff; /* Read replication offset if this is a master. */
|
||||||
long long reploff; /* Applied replication offset if this is a master. */
|
long long reploff; /* Applied replication offset if this is a master. */
|
||||||
long long reploff_skipped; /* Repl backlog we did not send to this client */
|
long long reploff_skipped; /* Repl backlog we did not send to this client */
|
||||||
|
long long reploff_cmd; /* The replication offset of the executing command, reploff gets set to this after the execution completes */
|
||||||
long long repl_ack_off; /* Replication ack offset, if this is a replica. */
|
long long repl_ack_off; /* Replication ack offset, if this is a replica. */
|
||||||
long long repl_ack_time;/* Replication ack time, if this is a replica. */
|
long long repl_ack_time;/* Replication ack time, if this is a replica. */
|
||||||
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
||||||
@ -1539,12 +1589,17 @@ typedef struct client {
|
|||||||
uint64_t mvccCheckpoint = 0; // the MVCC checkpoint of our last write
|
uint64_t mvccCheckpoint = 0; // the MVCC checkpoint of our last write
|
||||||
|
|
||||||
int iel; /* the event loop index we're registered with */
|
int iel; /* the event loop index we're registered with */
|
||||||
struct fastlock lock;
|
struct fastlock lock {"client"};
|
||||||
int master_error;
|
int master_error;
|
||||||
|
std::vector<parsed_command> vecqueuedcmd;
|
||||||
|
int argc;
|
||||||
|
robj **argv;
|
||||||
|
size_t argv_len_sumActive = 0;
|
||||||
|
|
||||||
// post a function from a non-client thread to run on its client thread
|
// post a function from a non-client thread to run on its client thread
|
||||||
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
|
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
|
||||||
} client;
|
size_t argv_len_sum() const;
|
||||||
|
};
|
||||||
|
|
||||||
struct saveparam {
|
struct saveparam {
|
||||||
time_t seconds;
|
time_t seconds;
|
||||||
@ -1822,6 +1877,7 @@ struct redisServerThreadVars {
|
|||||||
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
|
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
|
||||||
bool fRetrySetAofEvent = false;
|
bool fRetrySetAofEvent = false;
|
||||||
std::vector<client*> vecclientsProcess;
|
std::vector<client*> vecclientsProcess;
|
||||||
|
dictAsyncRehashCtl *rehashCtl = nullptr;
|
||||||
|
|
||||||
int getRdbKeySaveDelay();
|
int getRdbKeySaveDelay();
|
||||||
private:
|
private:
|
||||||
@ -1864,6 +1920,7 @@ struct redisServerConst {
|
|||||||
pid_t pid; /* Main process pid. */
|
pid_t pid; /* Main process pid. */
|
||||||
time_t stat_starttime; /* Server start time */
|
time_t stat_starttime; /* Server start time */
|
||||||
pthread_t main_thread_id; /* Main thread id */
|
pthread_t main_thread_id; /* Main thread id */
|
||||||
|
pthread_t time_thread_id;
|
||||||
char *configfile; /* Absolute config file path, or NULL */
|
char *configfile; /* Absolute config file path, or NULL */
|
||||||
char *executable; /* Absolute executable file path. */
|
char *executable; /* Absolute executable file path. */
|
||||||
char **exec_argv; /* Executable argv vector (copy). */
|
char **exec_argv; /* Executable argv vector (copy). */
|
||||||
@ -2174,6 +2231,7 @@ struct redisServer {
|
|||||||
/* Limits */
|
/* Limits */
|
||||||
unsigned int maxclients; /* Max number of simultaneous clients */
|
unsigned int maxclients; /* Max number of simultaneous clients */
|
||||||
unsigned long long maxmemory; /* Max number of memory bytes to use */
|
unsigned long long maxmemory; /* Max number of memory bytes to use */
|
||||||
|
unsigned long long maxstorage; /* Max number of bytes to use in a storage provider */
|
||||||
int maxmemory_policy; /* Policy for key eviction */
|
int maxmemory_policy; /* Policy for key eviction */
|
||||||
int maxmemory_samples; /* Precision of random sampling */
|
int maxmemory_samples; /* Precision of random sampling */
|
||||||
int lfu_log_factor; /* LFU logarithmic counter factor. */
|
int lfu_log_factor; /* LFU logarithmic counter factor. */
|
||||||
@ -2509,7 +2567,7 @@ void setDeferredMapLen(client *c, void *node, long length);
|
|||||||
void setDeferredSetLen(client *c, void *node, long length);
|
void setDeferredSetLen(client *c, void *node, long length);
|
||||||
void setDeferredAttributeLen(client *c, void *node, long length);
|
void setDeferredAttributeLen(client *c, void *node, long length);
|
||||||
void setDeferredPushLen(client *c, void *node, long length);
|
void setDeferredPushLen(client *c, void *node, long length);
|
||||||
void processInputBuffer(client *c, int callFlags);
|
void processInputBuffer(client *c, bool fParse, int callFlags);
|
||||||
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
@ -2529,7 +2587,7 @@ void addReplyBulkLongLong(client *c, long long ll);
|
|||||||
void addReply(client *c, robj_roptr obj);
|
void addReply(client *c, robj_roptr obj);
|
||||||
void addReplySds(client *c, sds s);
|
void addReplySds(client *c, sds s);
|
||||||
void addReplyBulkSds(client *c, sds s);
|
void addReplyBulkSds(client *c, sds s);
|
||||||
void addReplyErrorObject(client *c, robj *err);
|
void addReplyErrorObject(client *c, robj *err, int severity);
|
||||||
void addReplyErrorSds(client *c, sds err);
|
void addReplyErrorSds(client *c, sds err);
|
||||||
void addReplyError(client *c, const char *err);
|
void addReplyError(client *c, const char *err);
|
||||||
void addReplyStatus(client *c, const char *status);
|
void addReplyStatus(client *c, const char *status);
|
||||||
@ -2908,7 +2966,7 @@ void populateCommandTable(void);
|
|||||||
void resetCommandTableStats(void);
|
void resetCommandTableStats(void);
|
||||||
void adjustOpenFilesLimit(void);
|
void adjustOpenFilesLimit(void);
|
||||||
void closeListeningSockets(int unlink_unix_socket);
|
void closeListeningSockets(int unlink_unix_socket);
|
||||||
void updateCachedTime(int update_daylight_info);
|
void updateCachedTime();
|
||||||
void resetServerStats(void);
|
void resetServerStats(void);
|
||||||
void activeDefragCycle(void);
|
void activeDefragCycle(void);
|
||||||
unsigned int getLRUClock(void);
|
unsigned int getLRUClock(void);
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
start_server {tags {"introspection"}} {
|
start_server {tags {"introspection"}} {
|
||||||
test {CLIENT LIST} {
|
test {CLIENT LIST} {
|
||||||
r client list
|
r client list
|
||||||
} {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*}
|
} {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*}
|
||||||
|
|
||||||
test {MONITOR can log executed commands} {
|
test {MONITOR can log executed commands} {
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
|
@ -246,7 +246,7 @@ test_slave_buffers {slave buffer are counted correctly} 1000000 10 0 1
|
|||||||
# test again with fewer (and bigger) commands without pipeline, but with eviction
|
# test again with fewer (and bigger) commands without pipeline, but with eviction
|
||||||
test_slave_buffers "replica buffer don't induce eviction" 100000 100 1 0
|
test_slave_buffers "replica buffer don't induce eviction" 100000 100 1 0
|
||||||
|
|
||||||
start_server {tags {"maxmemory"}} {
|
start_server {tags {"maxmemory"} overrides {server-threads 1}} {
|
||||||
test {client tracking don't cause eviction feedback loop} {
|
test {client tracking don't cause eviction feedback loop} {
|
||||||
r config set maxmemory 0
|
r config set maxmemory 0
|
||||||
r config set maxmemory-policy allkeys-lru
|
r config set maxmemory-policy allkeys-lru
|
||||||
@ -308,4 +308,4 @@ start_server {tags {"maxmemory"}} {
|
|||||||
if {$::verbose} { puts "evicted: $evicted" }
|
if {$::verbose} { puts "evicted: $evicted" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}; #run_solo
|
}; #run_solo
|
||||||
|
@ -132,6 +132,7 @@ tags "modules" {
|
|||||||
}
|
}
|
||||||
|
|
||||||
$replica replicaof no one
|
$replica replicaof no one
|
||||||
|
after 300
|
||||||
|
|
||||||
test {Test role-master hook} {
|
test {Test role-master hook} {
|
||||||
assert_equal [r hooks.event_count role-replica] 1
|
assert_equal [r hooks.event_count role-replica] 1
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
start_server {tags {"obuf-limits"}} {
|
start_server {tags {"obuf-limits"} overrides { server-threads 1 }} {
|
||||||
test {Client output buffer hard limit is enforced} {
|
test {Client output buffer hard limit is enforced} {
|
||||||
r config set client-output-buffer-limit {pubsub 100000 0 0}
|
r config set client-output-buffer-limit {pubsub 100000 0 0}
|
||||||
set rd1 [redis_deferring_client]
|
set rd1 [redis_deferring_client]
|
||||||
|
@ -448,8 +448,8 @@ start_server {tags {"scripting"}} {
|
|||||||
set start [clock clicks -milliseconds]
|
set start [clock clicks -milliseconds]
|
||||||
$rd eval {redis.call('set',KEYS[1],'y'); for i=1,1500000 do redis.call('ping') end return 'ok'} 1 x
|
$rd eval {redis.call('set',KEYS[1],'y'); for i=1,1500000 do redis.call('ping') end return 'ok'} 1 x
|
||||||
$rd flush
|
$rd flush
|
||||||
after 100
|
after 200
|
||||||
catch {r ping} err
|
catch {r echo "foo"} err
|
||||||
assert_match {BUSY*} $err
|
assert_match {BUSY*} $err
|
||||||
$rd read
|
$rd read
|
||||||
set elapsed [expr [clock clicks -milliseconds]-$start]
|
set elapsed [expr [clock clicks -milliseconds]-$start]
|
||||||
@ -457,8 +457,8 @@ start_server {tags {"scripting"}} {
|
|||||||
set start [clock clicks -milliseconds]
|
set start [clock clicks -milliseconds]
|
||||||
$rd debug loadaof
|
$rd debug loadaof
|
||||||
$rd flush
|
$rd flush
|
||||||
after 100
|
after 200
|
||||||
catch {r ping} err
|
catch {r echo "foo"} err
|
||||||
assert_match {LOADING*} $err
|
assert_match {LOADING*} $err
|
||||||
$rd read
|
$rd read
|
||||||
set elapsed [expr [clock clicks -milliseconds]-$start]
|
set elapsed [expr [clock clicks -milliseconds]-$start]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user