diff --git a/src/replication.cpp b/src/replication.cpp index a0ca766fc..36a6cb109 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -33,6 +33,7 @@ #include "server.h" #include "cluster.h" #include "bio.h" +#include "aelocker.h" #include #include @@ -1284,6 +1285,7 @@ void sendBulkToSlave(connection *conn) { serverAssert(FCorrectThread(replica)); char buf[PROTO_IOBUF_LEN]; ssize_t nwritten, buflen; + AeLocker aeLock; std::unique_lock ul(replica->lock); /* Before sending the RDB file, we send the preamble as configured by the @@ -1295,6 +1297,8 @@ void sendBulkToSlave(connection *conn) { serverLog(LL_VERBOSE, "Write error sending RDB preamble to replica: %s", connGetLastError(conn)); + ul.unlock(); + aeLock.arm(nullptr); freeClient(replica); return; } @@ -1315,6 +1319,8 @@ void sendBulkToSlave(connection *conn) { if (buflen <= 0) { serverLog(LL_WARNING,"Read error sending DB to replica: %s", (buflen == 0) ? "premature EOF" : strerror(errno)); + ul.unlock(); + aeLock.arm(nullptr); freeClient(replica); return; } @@ -1322,6 +1328,8 @@ void sendBulkToSlave(connection *conn) { if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING,"Write error sending DB to replica: %s", connGetLastError(conn)); + ul.unlock(); + aeLock.arm(nullptr); freeClient(replica); } return; @@ -1516,6 +1524,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) while((ln = listNext(&li))) { client *replica = (client*)ln->value; + std::unique_lock ul(replica->lock); + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { startbgsave = 1; mincapa = (mincapa == -1) ? replica->slave_capa : @@ -1562,6 +1572,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) replica->repl_ack_time = g_pserver->unixtime; /* Timeout otherwise. */ } else { if (bgsaveerr != C_OK) { + ul.unlock(); if (FCorrectThread(replica)) freeClient(replica); else @@ -1571,6 +1582,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } if ((replica->repldbfd = open(g_pserver->rdb_filename,O_RDONLY)) == -1 || redis_fstat(replica->repldbfd,&buf) == -1) { + ul.unlock(); if (FCorrectThread(replica)) freeClient(replica); else @@ -1588,6 +1600,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { connSetWriteHandler(replica->conn,NULL); if (connSetWriteHandler(replica->conn,sendBulkToSlave) == C_ERR) { + ul.unlock(); freeClient(replica); continue; } @@ -3808,6 +3821,7 @@ void replicationCron(void) { listRewind(g_pserver->slaves,&li); while((ln = listNext(&li))) { client *replica = (client*)ln->value; + std::unique_lock ul(replica->lock); if (replica->replstate != SLAVE_STATE_ONLINE) continue; if (replica->flags & CLIENT_PRE_PSYNC) continue; @@ -3816,9 +3830,15 @@ void replicationCron(void) { serverLog(LL_WARNING, "Disconnecting timedout replica: %s", replicationGetSlaveName(replica)); if (FCorrectThread(replica)) - freeClient(replica); + { + ul.release(); + if (!freeClient(replica)) + replica->lock.unlock(); // we didn't free so we have undo the lock we just released + } else + { freeClientAsync(replica); + } } } }