From 1adc5e9832afbb8fe31562c28fd791d8af3b553f Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 28 Jan 2020 17:54:00 -0500 Subject: [PATCH] More threading fixes from merge Former-commit-id: 4a980f4ddbebe3f62703aa3de67c93cdffb6b4b8 --- src/networking.cpp | 3 +++ src/replication.cpp | 11 ++++++++--- src/server.h | 2 +- tests/integration/replication.tcl | 2 +- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 744c6d6cb..8624c4e2c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -92,6 +92,7 @@ void linkClient(client *c) { client *createClient(connection *conn, int iel) { client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL); + serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar))); c->iel = iel; /* passing NULL as conn it is possible to create a non connected client. @@ -1080,6 +1081,8 @@ void clientAcceptHandler(connection *conn) { static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) { client *c; UNUSED(ip); + AeLocker locker; + locker.arm(nullptr); /* Admission control will happen before a client is created and connAccept() * called, because we don't want to even start transport-level negotiation diff --git a/src/replication.cpp b/src/replication.cpp index f71e5837b..853aaa4a3 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1203,6 +1203,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { void rdbPipeWriteHandler(struct connection *conn) { serverAssert(g_pserver->rdb_pipe_bufflen>0); client *slave = (client*)connGetPrivateData(conn); + AssertCorrectThread(slave); int nwritten; if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff + slave->repldboff, g_pserver->rdb_pipe_bufflen - slave->repldboff)) == -1) @@ -1211,7 +1212,7 @@ void rdbPipeWriteHandler(struct connection *conn) { return; /* equivalent to EAGAIN */ serverLog(LL_WARNING,"Write error sending DB to replica: %s", connGetLastError(conn)); - freeClient(slave); + freeClientAsync(slave); return; } else { slave->repldboff += nwritten; @@ -1293,11 +1294,13 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, continue; client *slave = (client*)connGetPrivateData(conn); + // Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will + // be sending anything while in this replication state so it is OK if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) { if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s", connGetLastError(conn)); - freeClient(slave); + freeClientAsync(slave); g_pserver->rdb_pipe_conns[i] = NULL; continue; } @@ -1311,7 +1314,9 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, * setup write handler (and disable pipe read handler, below) */ if (nwritten != g_pserver->rdb_pipe_bufflen) { g_pserver->rdb_pipe_numconns_writing++; - connSetWriteHandler(conn, rdbPipeWriteHandler); + aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [conn] { + connSetWriteHandler(conn, rdbPipeWriteHandler); + }); } stillAlive++; } diff --git a/src/server.h b/src/server.h index aa0cb4cf0..75b2419ac 100644 --- a/src/server.h +++ b/src/server.h @@ -241,7 +241,7 @@ public: #define C_ERR -1 /* Static server configuration */ -#define CONFIG_DEFAULT_HZ 10 /* Time interrupt calls/sec. */ +#define CONFIG_DEFAULT_HZ 20 /* Time interrupt calls/sec. */ #define CONFIG_MIN_HZ 1 #define CONFIG_MAX_HZ 500 #define MAX_CLIENTS_PER_CLOCK_TICK 200 /* HZ is adapted based on that. */ diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index f69002b36..2cbd7284b 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -5,7 +5,7 @@ proc log_file_matches {log pattern} { string match $pattern $content } -start_server {tags {"repl"}} { +start_server {tags {"repl"} overrides {hz 100}} { set slave [srv 0 client] set slave_host [srv 0 host] set slave_port [srv 0 port]