Incorrect locking in replication error paths
Former-commit-id: 1c6b57314747787958ad215826296627f9050f59
This commit is contained in:
parent
51976e2be8
commit
cc080de221
@ -33,6 +33,7 @@
|
|||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include "cluster.h"
|
#include "cluster.h"
|
||||||
#include "bio.h"
|
#include "bio.h"
|
||||||
|
#include "aelocker.h"
|
||||||
|
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
@ -1284,6 +1285,7 @@ void sendBulkToSlave(connection *conn) {
|
|||||||
serverAssert(FCorrectThread(replica));
|
serverAssert(FCorrectThread(replica));
|
||||||
char buf[PROTO_IOBUF_LEN];
|
char buf[PROTO_IOBUF_LEN];
|
||||||
ssize_t nwritten, buflen;
|
ssize_t nwritten, buflen;
|
||||||
|
AeLocker aeLock;
|
||||||
std::unique_lock<fastlock> ul(replica->lock);
|
std::unique_lock<fastlock> ul(replica->lock);
|
||||||
|
|
||||||
/* Before sending the RDB file, we send the preamble as configured by the
|
/* Before sending the RDB file, we send the preamble as configured by the
|
||||||
@ -1295,6 +1297,8 @@ void sendBulkToSlave(connection *conn) {
|
|||||||
serverLog(LL_VERBOSE,
|
serverLog(LL_VERBOSE,
|
||||||
"Write error sending RDB preamble to replica: %s",
|
"Write error sending RDB preamble to replica: %s",
|
||||||
connGetLastError(conn));
|
connGetLastError(conn));
|
||||||
|
ul.unlock();
|
||||||
|
aeLock.arm(nullptr);
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -1315,6 +1319,8 @@ void sendBulkToSlave(connection *conn) {
|
|||||||
if (buflen <= 0) {
|
if (buflen <= 0) {
|
||||||
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
|
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
|
||||||
(buflen == 0) ? "premature EOF" : strerror(errno));
|
(buflen == 0) ? "premature EOF" : strerror(errno));
|
||||||
|
ul.unlock();
|
||||||
|
aeLock.arm(nullptr);
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -1322,6 +1328,8 @@ void sendBulkToSlave(connection *conn) {
|
|||||||
if (connGetState(conn) != CONN_STATE_CONNECTED) {
|
if (connGetState(conn) != CONN_STATE_CONNECTED) {
|
||||||
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
|
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
|
||||||
connGetLastError(conn));
|
connGetLastError(conn));
|
||||||
|
ul.unlock();
|
||||||
|
aeLock.arm(nullptr);
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
@ -1516,6 +1524,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
|||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *replica = (client*)ln->value;
|
client *replica = (client*)ln->value;
|
||||||
|
|
||||||
|
std::unique_lock<fastlock> ul(replica->lock);
|
||||||
|
|
||||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
|
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
|
||||||
startbgsave = 1;
|
startbgsave = 1;
|
||||||
mincapa = (mincapa == -1) ? replica->slave_capa :
|
mincapa = (mincapa == -1) ? replica->slave_capa :
|
||||||
@ -1562,6 +1572,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
|||||||
replica->repl_ack_time = g_pserver->unixtime; /* Timeout otherwise. */
|
replica->repl_ack_time = g_pserver->unixtime; /* Timeout otherwise. */
|
||||||
} else {
|
} else {
|
||||||
if (bgsaveerr != C_OK) {
|
if (bgsaveerr != C_OK) {
|
||||||
|
ul.unlock();
|
||||||
if (FCorrectThread(replica))
|
if (FCorrectThread(replica))
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
else
|
else
|
||||||
@ -1571,6 +1582,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
|||||||
}
|
}
|
||||||
if ((replica->repldbfd = open(g_pserver->rdb_filename,O_RDONLY)) == -1 ||
|
if ((replica->repldbfd = open(g_pserver->rdb_filename,O_RDONLY)) == -1 ||
|
||||||
redis_fstat(replica->repldbfd,&buf) == -1) {
|
redis_fstat(replica->repldbfd,&buf) == -1) {
|
||||||
|
ul.unlock();
|
||||||
if (FCorrectThread(replica))
|
if (FCorrectThread(replica))
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
else
|
else
|
||||||
@ -1588,6 +1600,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
|||||||
{
|
{
|
||||||
connSetWriteHandler(replica->conn,NULL);
|
connSetWriteHandler(replica->conn,NULL);
|
||||||
if (connSetWriteHandler(replica->conn,sendBulkToSlave) == C_ERR) {
|
if (connSetWriteHandler(replica->conn,sendBulkToSlave) == C_ERR) {
|
||||||
|
ul.unlock();
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -3808,6 +3821,7 @@ void replicationCron(void) {
|
|||||||
listRewind(g_pserver->slaves,&li);
|
listRewind(g_pserver->slaves,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *replica = (client*)ln->value;
|
client *replica = (client*)ln->value;
|
||||||
|
std::unique_lock<fastlock> ul(replica->lock);
|
||||||
|
|
||||||
if (replica->replstate != SLAVE_STATE_ONLINE) continue;
|
if (replica->replstate != SLAVE_STATE_ONLINE) continue;
|
||||||
if (replica->flags & CLIENT_PRE_PSYNC) continue;
|
if (replica->flags & CLIENT_PRE_PSYNC) continue;
|
||||||
@ -3816,9 +3830,15 @@ void replicationCron(void) {
|
|||||||
serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
|
serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
|
||||||
replicationGetSlaveName(replica));
|
replicationGetSlaveName(replica));
|
||||||
if (FCorrectThread(replica))
|
if (FCorrectThread(replica))
|
||||||
freeClient(replica);
|
{
|
||||||
|
ul.release();
|
||||||
|
if (!freeClient(replica))
|
||||||
|
replica->lock.unlock(); // we didn't free so we have undo the lock we just released
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
freeClientAsync(replica);
|
freeClientAsync(replica);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user