Yet more multithreading fixes

Former-commit-id: 9f23062ebdf389f0c95e1f4ab22c36ca96060e1f
This commit is contained in:
John Sully 2020-01-28 21:42:55 -05:00
parent 5da19e0206
commit 9440c60954
4 changed files with 56 additions and 16 deletions

View File

@ -29,6 +29,7 @@
#include "server.h" #include "server.h"
#include "connhelpers.h" #include "connhelpers.h"
#include "aelocker.h"
/* The connections module provides a lean abstraction of network connections /* The connections module provides a lean abstraction of network connections
* to avoid direct socket and async event management across the Redis code base. * to avoid direct socket and async event management across the Redis code base.
@ -206,14 +207,15 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu
else else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER; conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
int flags = AE_WRITABLE;
if (fThreadSafe) if (fThreadSafe)
flags |= AE_WRITE_THREADSAFE; conn->flags |= CONN_FLAG_WRITE_THREADSAFE;
else
conn->flags &= ~CONN_FLAG_WRITE_THREADSAFE;
if (!conn->write_handler) if (!conn->write_handler)
aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE); aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE);
else else
if (aeCreateFileEvent(serverTL->el,conn->fd,flags, if (aeCreateFileEvent(serverTL->el,conn->fd,AE_WRITABLE|AE_WRITE_THREADSAFE,
conn->type->ae_handler,conn) == AE_ERR) return C_ERR; conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK; return C_OK;
} }
@ -224,16 +226,17 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe) { static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe) {
if (func == conn->read_handler) return C_OK; if (func == conn->read_handler) return C_OK;
int flags = AE_READABLE;
if (fThreadSafe) if (fThreadSafe)
flags |= AE_READ_THREADSAFE; conn->flags |= CONN_FLAG_READ_THREADSAFE;
else
conn->flags &= ~CONN_FLAG_READ_THREADSAFE;
conn->read_handler = func; conn->read_handler = func;
if (!conn->read_handler) if (!conn->read_handler)
aeDeleteFileEvent(serverTL->el,conn->fd,AE_READABLE); aeDeleteFileEvent(serverTL->el,conn->fd,AE_READABLE);
else else
if (aeCreateFileEvent(serverTL->el,conn->fd, if (aeCreateFileEvent(serverTL->el,conn->fd,
flags,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; AE_READABLE|AE_READ_THREADSAFE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK; return C_OK;
} }
@ -281,15 +284,27 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
/* Handle normal I/O flows */ /* Handle normal I/O flows */
if (!invert && call_read) { if (!invert && call_read) {
AeLocker lock;
if (!(conn->flags & CONN_FLAG_READ_THREADSAFE))
lock.arm(nullptr);
if (!callHandler(conn, conn->read_handler)) return; if (!callHandler(conn, conn->read_handler)) return;
} }
/* Fire the writable event. */ /* Fire the writable event. */
if (call_write) { if (call_write) {
AeLocker lock;
if (!(conn->flags & CONN_FLAG_WRITE_THREADSAFE))
lock.arm(nullptr);
if (!callHandler(conn, conn->write_handler)) return; if (!callHandler(conn, conn->write_handler)) return;
} }
/* If we have to invert the call, fire the readable event now /* If we have to invert the call, fire the readable event now
* after the writable one. */ * after the writable one. */
if (invert && call_read) { if (invert && call_read) {
AeLocker lock;
if (!(conn->flags & CONN_FLAG_READ_THREADSAFE))
lock.arm(nullptr);
if (!callHandler(conn, conn->read_handler)) return; if (!callHandler(conn, conn->read_handler)) return;
} }
} }

View File

@ -2641,9 +2641,11 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
g_pserver->rdb_child_pid = childpid; g_pserver->rdb_child_pid = childpid;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET; g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
close(g_pserver->rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */ close(g_pserver->rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
} }
});
} }
return (childpid == -1) ? C_ERR : C_OK; return (childpid == -1) ? C_ERR : C_OK;
} }

View File

@ -1129,6 +1129,8 @@ void putSlaveOnline(client *replica) {
} }
void sendBulkToSlave(connection *conn) { void sendBulkToSlave(connection *conn) {
serverAssert(GlobalLocksAcquired());
client *replica = (client*)connGetPrivateData(conn); client *replica = (client*)connGetPrivateData(conn);
serverAssert(FCorrectThread(replica)); serverAssert(FCorrectThread(replica));
char buf[PROTO_IOBUF_LEN]; char buf[PROTO_IOBUF_LEN];
@ -1192,9 +1194,11 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
g_pserver->rdb_pipe_numconns_writing--; g_pserver->rdb_pipe_numconns_writing--;
/* if there are no more writes for now for this conn, or write error: */ /* if there are no more writes for now for this conn, or write error: */
if (g_pserver->rdb_pipe_numconns_writing == 0) { if (g_pserver->rdb_pipe_numconns_writing == 0) {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) { if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event."); serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
} }
});
} }
} }
@ -1205,6 +1209,12 @@ void rdbPipeWriteHandler(struct connection *conn) {
client *slave = (client*)connGetPrivateData(conn); client *slave = (client*)connGetPrivateData(conn);
AssertCorrectThread(slave); AssertCorrectThread(slave);
int nwritten; int nwritten;
if (slave->flags & CLIENT_CLOSE_ASAP) {
rdbPipeWriteHandlerConnRemoved(conn);
return;
}
if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff + slave->repldboff, if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff + slave->repldboff,
g_pserver->rdb_pipe_bufflen - slave->repldboff)) == -1) g_pserver->rdb_pipe_bufflen - slave->repldboff)) == -1)
{ {
@ -1246,6 +1256,9 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
UNUSED(mask); UNUSED(mask);
UNUSED(clientData); UNUSED(clientData);
UNUSED(eventLoop); UNUSED(eventLoop);
serverAssert(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el == eventLoop);
int i; int i;
if (!g_pserver->rdb_pipe_buff) if (!g_pserver->rdb_pipe_buff)
g_pserver->rdb_pipe_buff = (char*)zmalloc(PROTO_IOBUF_LEN); g_pserver->rdb_pipe_buff = (char*)zmalloc(PROTO_IOBUF_LEN);
@ -1262,7 +1275,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
if (!conn) if (!conn)
continue; continue;
client *slave = (client*)connGetPrivateData(conn); client *slave = (client*)connGetPrivateData(conn);
freeClient(slave); freeClientAsync(slave);
g_pserver->rdb_pipe_conns[i] = NULL; g_pserver->rdb_pipe_conns[i] = NULL;
} }
killRDBChild(); killRDBChild();
@ -1294,6 +1307,10 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
continue; continue;
client *slave = (client*)connGetPrivateData(conn); client *slave = (client*)connGetPrivateData(conn);
serverAssert(slave->conn == conn);
if (slave->flags & CLIENT_CLOSE_ASAP)
continue;
// Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will // Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will
// be sending anything while in this replication state so it is OK // be sending anything while in this replication state so it is OK
if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) { if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) {
@ -1314,8 +1331,12 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
* setup write handler (and disable pipe read handler, below) */ * setup write handler (and disable pipe read handler, below) */
if (nwritten != g_pserver->rdb_pipe_bufflen) { if (nwritten != g_pserver->rdb_pipe_bufflen) {
g_pserver->rdb_pipe_numconns_writing++; g_pserver->rdb_pipe_numconns_writing++;
aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [conn] { slave->casyncOpsPending++;
connSetWriteHandler(conn, rdbPipeWriteHandler); aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [slave] {
slave->casyncOpsPending--;
if (slave->flags & CLIENT_CLOSE_ASAP)
return;
connSetWriteHandler(slave->conn, rdbPipeWriteHandler);
}); });
} }
stillAlive++; stillAlive++;

View File

@ -2127,6 +2127,8 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
int iel = ielFromEventLoop(eventLoop); int iel = ielFromEventLoop(eventLoop);
serverAssert(iel != IDX_EVENT_LOOP_MAIN); serverAssert(iel != IDX_EVENT_LOOP_MAIN);
updateCachedTime(1);
/* If another threads unblocked one of our clients, and this thread has been idle /* If another threads unblocked one of our clients, and this thread has been idle
then beforeSleep won't have a chance to process the unblocking. So we also then beforeSleep won't have a chance to process the unblocking. So we also
process them here in the cron job to ensure they don't starve. process them here in the cron job to ensure they don't starve.