Add standardized way to post client dependent functions

Former-commit-id: fb50163b47cf660911dc7f67809c15ba1394613a
This commit is contained in:
John Sully 2020-05-24 02:46:39 -04:00
parent f517eaf182
commit b892d3deff
3 changed files with 14 additions and 16 deletions

View File

@ -1478,7 +1478,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
* setup write handler (and disable pipe read handler, below) */ * setup write handler (and disable pipe read handler, below) */
if (nwritten != g_pserver->rdb_pipe_bufflen) { if (nwritten != g_pserver->rdb_pipe_bufflen) {
g_pserver->rdb_pipe_numconns_writing++; g_pserver->rdb_pipe_numconns_writing++;
aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [conn] { slave->postFunction([conn](client *) {
connSetWriteHandler(conn, rdbPipeWriteHandler); connSetWriteHandler(conn, rdbPipeWriteHandler);
}); });
} }
@ -1607,21 +1607,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
} }
else else
{ {
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { replica->postFunction([](client *replica) {
// Because the client could have been closed while the lambda waited to run we need to
// verify the replica is still connected
listIter li;
listNode *ln;
listRewind(g_pserver->slaves,&li);
bool fFound = false;
while ((ln = listNext(&li))) {
if (listNodeValue(ln) == replica) {
fFound = true;
break;
}
}
if (!fFound)
return;
connSetWriteHandler(replica->conn,NULL); connSetWriteHandler(replica->conn,NULL);
if (connSetWriteHandler(replica->conn,sendBulkToSlave) == C_ERR) { if (connSetWriteHandler(replica->conn,sendBulkToSlave) == C_ERR) {
freeClient(replica); freeClient(replica);

View File

@ -3887,6 +3887,15 @@ int processCommand(client *c, int callFlags) {
return C_OK; return C_OK;
} }
bool client::postFunction(std::function<void(client *)> fn) {
this->casyncOpsPending++;
return aePostFunction(g_pserver->rgthreadvar[this->iel].el, [this, fn]{
std::lock_guard<decltype(this->lock)> lock(this->lock);
--casyncOpsPending;
fn(this);
}) == AE_OK;
}
/*================================== Shutdown =============================== */ /*================================== Shutdown =============================== */
/* Close listening sockets. Also unlink the unix domain socket if /* Close listening sockets. Also unlink the unix domain socket if

View File

@ -1335,6 +1335,9 @@ typedef struct client {
int iel; /* the event loop index we're registered with */ int iel; /* the event loop index we're registered with */
struct fastlock lock; struct fastlock lock;
int master_error; int master_error;
// post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn);
} client; } client;
struct saveparam { struct saveparam {