From e3b2ef962bfdce7e9ec9e9eecaf0740a1826658c Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 27 Jan 2020 19:59:04 -0500 Subject: [PATCH] reenable multithreading after merge Former-commit-id: 9fbb9a551e83ddfc66894fba688dae7c9c3c7ae1 --- src/aelocker.h | 5 +++++ src/blocked.cpp | 16 ++++++++-------- src/connection.cpp | 17 +++++++++++++---- src/connection.h | 18 ++++++++++-------- src/networking.cpp | 26 +++++++++++++++++++------- src/replication.cpp | 8 ++++---- src/server.h | 6 ++++++ 7 files changed, 65 insertions(+), 31 deletions(-) diff --git a/src/aelocker.h b/src/aelocker.h index eca15f491..ef757d2d2 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -11,6 +11,11 @@ public: void arm(client *c) // if a client is passed, then the client is already locked { + if (m_fArmed) + return; + + serverAssertDebug(!GlobalLocksAcquired()); + if (c != nullptr) { serverAssert(!m_fArmed); diff --git a/src/blocked.cpp b/src/blocked.cpp index bf659ba62..a466c9c64 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -92,7 +92,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int } if (tval < 0) { - addReplyError(c,"timeout is negative"); + addReplyErrorAsync(c,"timeout is negative"); return C_ERR; } @@ -210,9 +210,9 @@ void replyToBlockedClientTimedOut(client *c) { if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || c->btype == BLOCKED_STREAM) { - addReplyNullArray(c); + addReplyNullArrayAsync(c); } else if (c->btype == BLOCKED_WAIT) { - addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset)); + addReplyLongLongAsync(c,replicationCountAcksByOffset(c->bpop.reploffset)); } else if (c->btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c); } else { @@ -397,7 +397,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { /* If the group was not found, send an error * to the consumer. */ if (!group) { - addReplyError(receiver, + addReplyErrorAsync(receiver, "-NOGROUP the consumer group this client " "was blocked on no longer exists"); unblockClient(receiver); @@ -427,12 +427,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { * extracted from it. Wrapped in a single-item * array, since we have just one key. */ if (receiver->resp == 2) { - addReplyArrayLen(receiver,1); - addReplyArrayLen(receiver,2); + addReplyArrayLenAsync(receiver,1); + addReplyArrayLenAsync(receiver,2); } else { - addReplyMapLen(receiver,1); + addReplyMapLenAsync(receiver,1); } - addReplyBulk(receiver,rl->key); + addReplyBulkAsync(receiver,rl->key); streamPropInfo pi = { rl->key, diff --git a/src/connection.cpp b/src/connection.cpp index a3dad90f2..b819c982a 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -197,7 +197,7 @@ static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_hand * always called before and not after the read handler in a single event * loop. */ -static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { +static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier, bool fThreadSafe) { if (func == conn->write_handler) return C_OK; conn->write_handler = func; @@ -205,10 +205,15 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu conn->flags |= CONN_FLAG_WRITE_BARRIER; else conn->flags &= ~CONN_FLAG_WRITE_BARRIER; + + int flags = AE_WRITABLE; + if (fThreadSafe) + flags |= AE_WRITE_THREADSAFE; + if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE); else - if (aeCreateFileEvent(serverTL->el,conn->fd,AE_WRITABLE, + if (aeCreateFileEvent(serverTL->el,conn->fd,flags, conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; } @@ -216,15 +221,19 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu /* Register a read handler, to be called when the connection is readable. * If NULL, the existing handler is removed. */ -static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { +static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe) { if (func == conn->read_handler) return C_OK; + int flags = AE_READABLE; + if (fThreadSafe) + flags |= AE_READ_THREADSAFE; + conn->read_handler = func; if (!conn->read_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_READABLE); else if (aeCreateFileEvent(serverTL->el,conn->fd, - AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; + flags,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; } diff --git a/src/connection.h b/src/connection.h index f211ad7e8..651df06bd 100644 --- a/src/connection.h +++ b/src/connection.h @@ -48,6 +48,8 @@ typedef enum { #define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */ #define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */ #define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */ +#define CONN_FLAG_READ_THREADSAFE (1<<3) +#define CONN_FLAG_WRITE_THREADSAFE (1<<4) typedef void (*ConnectionCallbackFunc)(struct connection *conn); @@ -58,8 +60,8 @@ typedef struct ConnectionType { int (*read)(struct connection *conn, void *buf, size_t buf_len); void (*close)(struct connection *conn); int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); - int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier); - int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler); + int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier, bool fThreadSafe); + int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler, bool fThreadSafe); const char *(*get_last_error)(struct connection *conn); int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout); ssize_t (*sync_write)(struct connection *conn, const char *ptr, ssize_t size, long long timeout); @@ -144,15 +146,15 @@ static inline int connRead(connection *conn, void *buf, size_t buf_len) { /* Register a write handler, to be called when the connection is writable. * If NULL, the existing handler is removed. */ -static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) { - return conn->type->set_write_handler(conn, func, 0); +static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe = false) { + return conn->type->set_write_handler(conn, func, 0, fThreadSafe); } /* Register a read handler, to be called when the connection is readable. * If NULL, the existing handler is removed. */ -static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) { - return conn->type->set_read_handler(conn, func); +static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe = false) { + return conn->type->set_read_handler(conn, func, fThreadSafe); } /* Set a write handler, and possibly enable a write barrier, this flag is @@ -160,8 +162,8 @@ static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc fu * With barroer enabled, we never fire the event if the read handler already * fired in the same event loop iteration. Useful when you want to persist * things to disk before sending replies, and want to do that in a group fashion. */ -static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier) { - return conn->type->set_write_handler(conn, func, barrier); +static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier, bool fThreadSafe = false) { + return conn->type->set_write_handler(conn, func, barrier, fThreadSafe); } static inline void connClose(connection *conn) { diff --git a/src/networking.cpp b/src/networking.cpp index e296fbbec..744c6d6cb 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -104,7 +104,7 @@ client *createClient(connection *conn, int iel) { connEnableTcpNoDelay(conn); if (cserver.tcpkeepalive) connKeepAlive(conn,cserver.tcpkeepalive); - connSetReadHandler(conn, readQueryFromClient); + connSetReadHandler(conn, readQueryFromClient, true); connSetPrivateData(conn, c); } @@ -814,14 +814,25 @@ void addReplyBool(client *c, int b) { * RESP2 had it, so API-wise we have this call, that will emit the correct * RESP2 protocol, however for RESP3 the reply will always be just the * Null type "_\r\n". */ -void addReplyNullArray(client *c) { +void addReplyNullArrayCore(client *c, bool fAsync) +{ if (c->resp == 2) { - addReplyProto(c,"*-1\r\n",5); + addReplyProtoCore(c,"*-1\r\n",5,fAsync); } else { - addReplyProto(c,"_\r\n",3); + addReplyProtoCore(c,"_\r\n",3,fAsync); } } +void addReplyNullArray(client *c) +{ + addReplyNullArrayCore(c, false); +} + +void addReplyNullArrayAsync(client *c) +{ + addReplyNullArrayCore(c, true); +} + /* Create the length prefix of a bulk reply, example: $2234 */ void addReplyBulkLenCore(client *c, robj_roptr obj, bool fAsync) { size_t len = stringObjectLen(obj); @@ -1688,7 +1699,7 @@ void ProcessPendingAsyncWrites() std::lock_guardlock)> lock(c->lock); serverAssert(c->casyncOpsPending > 0); c->casyncOpsPending--; - connSetWriteHandler(c->conn, sendReplyToClient); + connSetWriteHandler(c->conn, sendReplyToClient, true); }, false) == AE_ERR ) { @@ -1753,7 +1764,7 @@ int handleClientsWithPendingWrites(int iel) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { - if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags) == C_ERR) + if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) freeClientAsync(c); } } @@ -1817,7 +1828,7 @@ void unprotectClient(client *c) { AssertCorrectThread(c); if (c->flags & CLIENT_PROTECTED) { c->flags &= ~CLIENT_PROTECTED; - connSetReadHandler(c->conn,readQueryFromClient); + connSetReadHandler(c->conn,readQueryFromClient, true); if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); } } @@ -3096,6 +3107,7 @@ int processEventsWhileBlocked(int iel) { c->lock.unlock(); } aeReleaseLock(); + serverAssertDebug(!GlobalLocksAcquired()); while (iterations--) { int events = 0; events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); diff --git a/src/replication.cpp b/src/replication.cpp index 19067a007..f71e5837b 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1106,7 +1106,7 @@ void putSlaveOnline(client *replica) { replica->replstate = SLAVE_STATE_ONLINE; replica->repl_put_online_on_ack = 0; replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */ - if (connSetWriteHandler(replica->conn, sendReplyToClient) == C_ERR) { + if (connSetWriteHandler(replica->conn, sendReplyToClient, true) == C_ERR) { serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); freeClient(replica); return; @@ -1565,7 +1565,7 @@ void replicationCreateMasterClient(redisMaster *mi, connection *conn, int dbid) if (conn) { serverAssert(connGetPrivateData(mi->master->conn) == mi->master); - connSetReadHandler(mi->master->conn, readQueryFromClient); + connSetReadHandler(mi->master->conn, readQueryFromClient, true); } mi->master->flags |= CLIENT_MASTER; mi->master->authenticated = 1; @@ -3147,7 +3147,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, connection *conn) { /* Re-add to the list of clients. */ linkClient(mi->master); serverAssert(connGetPrivateData(mi->master->conn) == mi->master); - if (connSetReadHandler(mi->master->conn, readQueryFromClient)) { + if (connSetReadHandler(mi->master->conn, readQueryFromClient, true)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(mi->master); /* Close ASAP. */ } @@ -3155,7 +3155,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, connection *conn) { /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ if (clientHasPendingReplies(mi->master)) { - if (connSetWriteHandler(mi->master->conn, sendReplyToClient)) { + if (connSetWriteHandler(mi->master->conn, sendReplyToClient, true)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); freeClientAsync(mi->master); /* Close ASAP. */ } diff --git a/src/server.h b/src/server.h index fcd54313a..aa0cb4cf0 100644 --- a/src/server.h +++ b/src/server.h @@ -594,6 +594,11 @@ public: /* We can print the stacktrace, so our assert is defined this way: */ #define serverAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_serverAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1))) #define serverAssert(_e) ((_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),_exit(1))) +#ifdef _DEBUG +#define serverAssertDebug(_e) serverAssert(_e) +#else +#define serverAssertDebug(_e) +#endif #define serverPanic(...) _serverPanic(__FILE__,__LINE__,__VA_ARGS__),_exit(1) /*----------------------------------------------------------------------------- @@ -2113,6 +2118,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(connection *conn); void addReplyNull(client *c, robj_roptr objOldProtocol = nullptr); void addReplyNullArray(client *c); +void addReplyNullArrayAsync(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len);