Integrate readwritelock with Pro Code

This commit is contained in:
Vivek Saini 2022-04-14 17:15:19 +00:00
parent 9486f16857
commit a4a886428f
6 changed files with 22 additions and 5 deletions

View File

@ -43,9 +43,11 @@ void AsyncWorkQueue::WorkerThreadMain()
lock.unlock();
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
if (listLength(serverTL->clients_pending_asyncwrite)) {
aeThreadOnline();
aeAcquireLock();
ProcessPendingAsyncWrites();
aeReleaseLock();
aeThreadOffline();
}
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch.reset();

View File

@ -1068,7 +1068,9 @@ void keysCommand(client *c) {
blockClient(c, BLOCKED_ASYNC);
redisDb *db = c->db;
g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{
aeThreadOnline();
keysCommandCore(c, snapshot, patternCopy);
aeThreadOffline();
sdsfree(patternCopy);
aePostFunction(el, [c, db, snapshot]{
aeReleaseLock(); // we need to lock with coordination of the client

View File

@ -1552,6 +1552,7 @@ struct rdbSaveThreadArgs
void *rdbSaveThread(void *vargs)
{
aeThreadOnline();
serverAssert(!g_pserver->rdbThreadVars.fDone);
rdbSaveThreadArgs *args = reinterpret_cast<rdbSaveThreadArgs*>(vargs);
serverAssert(serverTL == nullptr);
@ -1577,7 +1578,7 @@ void *rdbSaveThread(void *vargs)
"%s: %zd MB of memory used by copy-on-write",
"RDB",cbDiff/(1024*1024));
}
aeThreadOffline();
g_pserver->rdbThreadVars.fDone = true;
return (retval == C_OK) ? (void*)0 : (void*)1;
}
@ -3659,6 +3660,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
serverTL = &vars;
vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
aeThreadOnline();
rioInitWithFd(&rdb,args->rdb_pipe_write);
retval = rdbSaveRioWithEOFMark(&rdb,args->rgpdb,NULL,&args->rsi);
@ -3684,7 +3686,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]);
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
aeThreadOffline();
close(args->safe_to_exit_pipe);
zfree(args);

View File

@ -1017,7 +1017,11 @@ static void benchmark(const char *title, const char *cmd, int len) {
createMissingClients(c);
config.start = mstime();
if (!config.num_threads) aeMain(config.el);
if (!config.num_threads) {
aeThreadOnline();
aeMain(config.el);
aeThreadOffline();
}
else startBenchmarkThreads();
config.totlatency = mstime()-config.start;
@ -1057,7 +1061,9 @@ static void freeBenchmarkThreads() {
static void *execBenchmarkThread(void *ptr) {
benchmarkThread *thread = (benchmarkThread *) ptr;
aeThreadOnline();
aeMain(thread->el);
aeThreadOffline();
return NULL;
}
@ -1696,7 +1702,7 @@ int main(int argc, const char **argv) {
int len;
client c;
aeThreadOnline();
storage_init(NULL, 0);
srandom(time(NULL) ^ getpid());
@ -1749,6 +1755,7 @@ int main(int argc, const char **argv) {
cliSecureInit();
}
#endif
aeThreadOffline();
if (config.cluster_mode) {
// We only include the slot placeholder {tag} if cluster mode is enabled

View File

@ -1196,7 +1196,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
size_t cbData = 0;
size_t cbLastUpdate = 0;
auto &replBuf = *spreplBuf;
aeThreadOnline();
// Databases
replBuf.addArrayLen(cserver.dbnum);
for (int idb = 0; idb < cserver.dbnum; ++idb) {
@ -1244,6 +1244,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
replBuf.putSlavesOnline();
aeReleaseLock();
}
aeThreadOffline();
});
return retval;

View File

@ -6855,6 +6855,7 @@ int redisFork(int purpose) {
latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000);
if ((childpid = fork()) == 0) {
/* Child */
aeReleaseForkLock();
g_pserver->in_fork_child = purpose;
setOOMScoreAdj(CONFIG_OOM_BGCHILD);
setupChildSignalHandlers();
@ -7258,9 +7259,11 @@ void *workerThreadMain(void *parg)
if (iel != IDX_EVENT_LOOP_MAIN)
{
aeThreadOnline();
aeAcquireLock();
initNetworkingThread(iel, cserver.cthreads > 1);
aeReleaseLock();
aeThreadOffline();
}
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run