More threading fixes from merge

Former-commit-id: 4a980f4ddbebe3f62703aa3de67c93cdffb6b4b8
This commit is contained in:
John Sully 2020-01-28 17:54:00 -05:00
parent d9c070408b
commit 1adc5e9832
4 changed files with 13 additions and 5 deletions

View File

@ -92,6 +92,7 @@ void linkClient(client *c) {
client *createClient(connection *conn, int iel) { client *createClient(connection *conn, int iel) {
client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL); client *c = (client*)zmalloc(sizeof(client), MALLOC_LOCAL);
serverAssert(conn == nullptr || (iel == (serverTL - g_pserver->rgthreadvar)));
c->iel = iel; c->iel = iel;
/* passing NULL as conn it is possible to create a non connected client. /* passing NULL as conn it is possible to create a non connected client.
@ -1080,6 +1081,8 @@ void clientAcceptHandler(connection *conn) {
static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) { static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) {
client *c; client *c;
UNUSED(ip); UNUSED(ip);
AeLocker locker;
locker.arm(nullptr);
/* Admission control will happen before a client is created and connAccept() /* Admission control will happen before a client is created and connAccept()
* called, because we don't want to even start transport-level negotiation * called, because we don't want to even start transport-level negotiation

View File

@ -1203,6 +1203,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
void rdbPipeWriteHandler(struct connection *conn) { void rdbPipeWriteHandler(struct connection *conn) {
serverAssert(g_pserver->rdb_pipe_bufflen>0); serverAssert(g_pserver->rdb_pipe_bufflen>0);
client *slave = (client*)connGetPrivateData(conn); client *slave = (client*)connGetPrivateData(conn);
AssertCorrectThread(slave);
int nwritten; int nwritten;
if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff + slave->repldboff, if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff + slave->repldboff,
g_pserver->rdb_pipe_bufflen - slave->repldboff)) == -1) g_pserver->rdb_pipe_bufflen - slave->repldboff)) == -1)
@ -1211,7 +1212,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return; /* equivalent to EAGAIN */ return; /* equivalent to EAGAIN */
serverLog(LL_WARNING,"Write error sending DB to replica: %s", serverLog(LL_WARNING,"Write error sending DB to replica: %s",
connGetLastError(conn)); connGetLastError(conn));
freeClient(slave); freeClientAsync(slave);
return; return;
} else { } else {
slave->repldboff += nwritten; slave->repldboff += nwritten;
@ -1293,11 +1294,13 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
continue; continue;
client *slave = (client*)connGetPrivateData(conn); client *slave = (client*)connGetPrivateData(conn);
// Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will
// be sending anything while in this replication state so it is OK
if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) { if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) { if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s", serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s",
connGetLastError(conn)); connGetLastError(conn));
freeClient(slave); freeClientAsync(slave);
g_pserver->rdb_pipe_conns[i] = NULL; g_pserver->rdb_pipe_conns[i] = NULL;
continue; continue;
} }
@ -1311,7 +1314,9 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
* setup write handler (and disable pipe read handler, below) */ * setup write handler (and disable pipe read handler, below) */
if (nwritten != g_pserver->rdb_pipe_bufflen) { if (nwritten != g_pserver->rdb_pipe_bufflen) {
g_pserver->rdb_pipe_numconns_writing++; g_pserver->rdb_pipe_numconns_writing++;
connSetWriteHandler(conn, rdbPipeWriteHandler); aePostFunction(g_pserver->rgthreadvar[slave->iel].el, [conn] {
connSetWriteHandler(conn, rdbPipeWriteHandler);
});
} }
stillAlive++; stillAlive++;
} }

View File

@ -241,7 +241,7 @@ public:
#define C_ERR -1 #define C_ERR -1
/* Static server configuration */ /* Static server configuration */
#define CONFIG_DEFAULT_HZ 10 /* Time interrupt calls/sec. */ #define CONFIG_DEFAULT_HZ 20 /* Time interrupt calls/sec. */
#define CONFIG_MIN_HZ 1 #define CONFIG_MIN_HZ 1
#define CONFIG_MAX_HZ 500 #define CONFIG_MAX_HZ 500
#define MAX_CLIENTS_PER_CLOCK_TICK 200 /* HZ is adapted based on that. */ #define MAX_CLIENTS_PER_CLOCK_TICK 200 /* HZ is adapted based on that. */

View File

@ -5,7 +5,7 @@ proc log_file_matches {log pattern} {
string match $pattern $content string match $pattern $content
} }
start_server {tags {"repl"}} { start_server {tags {"repl"} overrides {hz 100}} {
set slave [srv 0 client] set slave [srv 0 client]
set slave_host [srv 0 host] set slave_host [srv 0 host]
set slave_port [srv 0 port] set slave_port [srv 0 port]