diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index fe02e5212..fddfa840b 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -43,9 +43,11 @@ void AsyncWorkQueue::WorkerThreadMain() lock.unlock(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); if (listLength(serverTL->clients_pending_asyncwrite)) { + aeThreadOnline(); aeAcquireLock(); ProcessPendingAsyncWrites(); aeReleaseLock(); + aeThreadOffline(); } g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); serverTL->gcEpoch.reset(); diff --git a/src/db.cpp b/src/db.cpp index a115c887b..2b1321bcf 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1068,7 +1068,9 @@ void keysCommand(client *c) { blockClient(c, BLOCKED_ASYNC); redisDb *db = c->db; g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{ + aeThreadOnline(); keysCommandCore(c, snapshot, patternCopy); + aeThreadOffline(); sdsfree(patternCopy); aePostFunction(el, [c, db, snapshot]{ aeReleaseLock(); // we need to lock with coordination of the client diff --git a/src/rdb.cpp b/src/rdb.cpp index 2f5f73cd5..8e0bca922 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1552,6 +1552,7 @@ struct rdbSaveThreadArgs void *rdbSaveThread(void *vargs) { + aeThreadOnline(); serverAssert(!g_pserver->rdbThreadVars.fDone); rdbSaveThreadArgs *args = reinterpret_cast(vargs); serverAssert(serverTL == nullptr); @@ -1577,7 +1578,7 @@ void *rdbSaveThread(void *vargs) "%s: %zd MB of memory used by copy-on-write", "RDB",cbDiff/(1024*1024)); } - + aeThreadOffline(); g_pserver->rdbThreadVars.fDone = true; return (retval == C_OK) ? (void*)0 : (void*)1; } @@ -3659,6 +3660,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) serverTL = &vars; vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); + aeThreadOnline(); rioInitWithFd(&rdb,args->rdb_pipe_write); retval = rdbSaveRioWithEOFMark(&rdb,args->rgpdb,NULL,&args->rsi); @@ -3684,7 +3686,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]); g_pserver->garbageCollector.endEpoch(vars.gcEpoch); - + aeThreadOffline(); close(args->safe_to_exit_pipe); zfree(args); diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 397b557c6..34e19c372 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -1017,7 +1017,11 @@ static void benchmark(const char *title, const char *cmd, int len) { createMissingClients(c); config.start = mstime(); - if (!config.num_threads) aeMain(config.el); + if (!config.num_threads) { + aeThreadOnline(); + aeMain(config.el); + aeThreadOffline(); + } else startBenchmarkThreads(); config.totlatency = mstime()-config.start; @@ -1057,7 +1061,9 @@ static void freeBenchmarkThreads() { static void *execBenchmarkThread(void *ptr) { benchmarkThread *thread = (benchmarkThread *) ptr; + aeThreadOnline(); aeMain(thread->el); + aeThreadOffline(); return NULL; } @@ -1696,7 +1702,7 @@ int main(int argc, const char **argv) { int len; client c; - + aeThreadOnline(); storage_init(NULL, 0); srandom(time(NULL) ^ getpid()); @@ -1749,6 +1755,7 @@ int main(int argc, const char **argv) { cliSecureInit(); } #endif + aeThreadOffline(); if (config.cluster_mode) { // We only include the slot placeholder {tag} if cluster mode is enabled diff --git a/src/replication.cpp b/src/replication.cpp index 74e77c307..c68e3f09c 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1196,7 +1196,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { size_t cbData = 0; size_t cbLastUpdate = 0; auto &replBuf = *spreplBuf; - + aeThreadOnline(); // Databases replBuf.addArrayLen(cserver.dbnum); for (int idb = 0; idb < cserver.dbnum; ++idb) { @@ -1244,6 +1244,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { replBuf.putSlavesOnline(); aeReleaseLock(); } + aeThreadOffline(); }); return retval; diff --git a/src/server.cpp b/src/server.cpp index 87213de04..1f94566b4 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6855,6 +6855,7 @@ int redisFork(int purpose) { latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ + aeReleaseForkLock(); g_pserver->in_fork_child = purpose; setOOMScoreAdj(CONFIG_OOM_BGCHILD); setupChildSignalHandlers(); @@ -7258,9 +7259,11 @@ void *workerThreadMain(void *parg) if (iel != IDX_EVENT_LOOP_MAIN) { + aeThreadOnline(); aeAcquireLock(); initNetworkingThread(iel, cserver.cthreads > 1); aeReleaseLock(); + aeThreadOffline(); } moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run