call aeThreadOnline() earlier

This commit is contained in:
Vivek Saini 2022-04-20 20:46:48 +00:00
parent 12ce8bcd2c
commit d103046e17
4 changed files with 3 additions and 7 deletions

View File

@ -28,6 +28,7 @@ void AsyncWorkQueue::WorkerThreadMain()
if (m_workqueue.empty()) if (m_workqueue.empty())
m_cvWakeup.wait(lock); m_cvWakeup.wait(lock);
aeThreadOnline();
while (!m_workqueue.empty()) while (!m_workqueue.empty())
{ {
WorkItem task = std::move(m_workqueue.front()); WorkItem task = std::move(m_workqueue.front());
@ -43,14 +44,13 @@ void AsyncWorkQueue::WorkerThreadMain()
lock.unlock(); lock.unlock();
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
if (listLength(serverTL->clients_pending_asyncwrite)) { if (listLength(serverTL->clients_pending_asyncwrite)) {
aeThreadOnline();
aeAcquireLock(); aeAcquireLock();
ProcessPendingAsyncWrites(); ProcessPendingAsyncWrites();
aeReleaseLock(); aeReleaseLock();
aeThreadOffline();
} }
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch.reset(); serverTL->gcEpoch.reset();
aeThreadOffline();
} }
listRelease(vars.clients_pending_asyncwrite); listRelease(vars.clients_pending_asyncwrite);

View File

@ -1068,9 +1068,7 @@ void keysCommand(client *c) {
blockClient(c, BLOCKED_ASYNC); blockClient(c, BLOCKED_ASYNC);
redisDb *db = c->db; redisDb *db = c->db;
g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{ g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{
aeThreadOnline();
keysCommandCore(c, snapshot, patternCopy); keysCommandCore(c, snapshot, patternCopy);
aeThreadOffline();
sdsfree(patternCopy); sdsfree(patternCopy);
aePostFunction(el, [c, db, snapshot]{ aePostFunction(el, [c, db, snapshot]{
aeReleaseLock(); // we need to lock with coordination of the client aeReleaseLock(); // we need to lock with coordination of the client

View File

@ -3706,12 +3706,12 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
int retval; int retval;
rio rdb; rio rdb;
aeThreadOnline();
serverAssert(serverTL == nullptr); serverAssert(serverTL == nullptr);
redisServerThreadVars vars; redisServerThreadVars vars;
serverTL = &vars; serverTL = &vars;
vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
aeThreadOnline();
rioInitWithFd(&rdb,args->rdb_pipe_write); rioInitWithFd(&rdb,args->rdb_pipe_write);
retval = rdbSaveRioWithEOFMark(&rdb,args->rgpdb,NULL,&args->rsi); retval = rdbSaveRioWithEOFMark(&rdb,args->rgpdb,NULL,&args->rsi);

View File

@ -1196,7 +1196,6 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
size_t cbData = 0; size_t cbData = 0;
size_t cbLastUpdate = 0; size_t cbLastUpdate = 0;
auto &replBuf = *spreplBuf; auto &replBuf = *spreplBuf;
aeThreadOnline();
// Databases // Databases
replBuf.addArrayLen(cserver.dbnum); replBuf.addArrayLen(cserver.dbnum);
for (int idb = 0; idb < cserver.dbnum; ++idb) { for (int idb = 0; idb < cserver.dbnum; ++idb) {
@ -1244,7 +1243,6 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
replBuf.putSlavesOnline(); replBuf.putSlavesOnline();
aeReleaseLock(); aeReleaseLock();
} }
aeThreadOffline();
}); });
return retval; return retval;