reenable multithreading after merge

Former-commit-id: 9fbb9a551e83ddfc66894fba688dae7c9c3c7ae1
This commit is contained in:
John Sully 2020-01-27 19:59:04 -05:00
parent 8a86276a86
commit e3b2ef962b
7 changed files with 65 additions and 31 deletions

View File

@ -11,6 +11,11 @@ public:
void arm(client *c) // if a client is passed, then the client is already locked
{
if (m_fArmed)
return;
serverAssertDebug(!GlobalLocksAcquired());
if (c != nullptr)
{
serverAssert(!m_fArmed);

View File

@ -92,7 +92,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
}
if (tval < 0) {
addReplyError(c,"timeout is negative");
addReplyErrorAsync(c,"timeout is negative");
return C_ERR;
}
@ -210,9 +210,9 @@ void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM) {
addReplyNullArray(c);
addReplyNullArrayAsync(c);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
addReplyLongLongAsync(c,replicationCountAcksByOffset(c->bpop.reploffset));
} else if (c->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
@ -397,7 +397,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
/* If the group was not found, send an error
* to the consumer. */
if (!group) {
addReplyError(receiver,
addReplyErrorAsync(receiver,
"-NOGROUP the consumer group this client "
"was blocked on no longer exists");
unblockClient(receiver);
@ -427,12 +427,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
* extracted from it. Wrapped in a single-item
* array, since we have just one key. */
if (receiver->resp == 2) {
addReplyArrayLen(receiver,1);
addReplyArrayLen(receiver,2);
addReplyArrayLenAsync(receiver,1);
addReplyArrayLenAsync(receiver,2);
} else {
addReplyMapLen(receiver,1);
addReplyMapLenAsync(receiver,1);
}
addReplyBulk(receiver,rl->key);
addReplyBulkAsync(receiver,rl->key);
streamPropInfo pi = {
rl->key,

View File

@ -197,7 +197,7 @@ static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_hand
* always called before and not after the read handler in a single event
* loop.
*/
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier, bool fThreadSafe) {
if (func == conn->write_handler) return C_OK;
conn->write_handler = func;
@ -205,10 +205,15 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu
conn->flags |= CONN_FLAG_WRITE_BARRIER;
else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
int flags = AE_WRITABLE;
if (fThreadSafe)
flags |= AE_WRITE_THREADSAFE;
if (!conn->write_handler)
aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE);
else
if (aeCreateFileEvent(serverTL->el,conn->fd,AE_WRITABLE,
if (aeCreateFileEvent(serverTL->el,conn->fd,flags,
conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
@ -216,15 +221,19 @@ static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc fu
/* Register a read handler, to be called when the connection is readable.
* If NULL, the existing handler is removed.
*/
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe) {
if (func == conn->read_handler) return C_OK;
int flags = AE_READABLE;
if (fThreadSafe)
flags |= AE_READ_THREADSAFE;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(serverTL->el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(serverTL->el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
flags,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}

View File

@ -48,6 +48,8 @@ typedef enum {
#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */
#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */
#define CONN_FLAG_READ_THREADSAFE (1<<3)
#define CONN_FLAG_WRITE_THREADSAFE (1<<4)
typedef void (*ConnectionCallbackFunc)(struct connection *conn);
@ -58,8 +60,8 @@ typedef struct ConnectionType {
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier, bool fThreadSafe);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler, bool fThreadSafe);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
ssize_t (*sync_write)(struct connection *conn, const char *ptr, ssize_t size, long long timeout);
@ -144,15 +146,15 @@ static inline int connRead(connection *conn, void *buf, size_t buf_len) {
/* Register a write handler, to be called when the connection is writable.
* If NULL, the existing handler is removed.
*/
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_write_handler(conn, func, 0);
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe = false) {
return conn->type->set_write_handler(conn, func, 0, fThreadSafe);
}
/* Register a read handler, to be called when the connection is readable.
* If NULL, the existing handler is removed.
*/
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_read_handler(conn, func);
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe = false) {
return conn->type->set_read_handler(conn, func, fThreadSafe);
}
/* Set a write handler, and possibly enable a write barrier, this flag is
@ -160,8 +162,8 @@ static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc fu
* With barroer enabled, we never fire the event if the read handler already
* fired in the same event loop iteration. Useful when you want to persist
* things to disk before sending replies, and want to do that in a group fashion. */
static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier) {
return conn->type->set_write_handler(conn, func, barrier);
static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier, bool fThreadSafe = false) {
return conn->type->set_write_handler(conn, func, barrier, fThreadSafe);
}
static inline void connClose(connection *conn) {

View File

@ -104,7 +104,7 @@ client *createClient(connection *conn, int iel) {
connEnableTcpNoDelay(conn);
if (cserver.tcpkeepalive)
connKeepAlive(conn,cserver.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetReadHandler(conn, readQueryFromClient, true);
connSetPrivateData(conn, c);
}
@ -814,14 +814,25 @@ void addReplyBool(client *c, int b) {
* RESP2 had it, so API-wise we have this call, that will emit the correct
* RESP2 protocol, however for RESP3 the reply will always be just the
* Null type "_\r\n". */
void addReplyNullArray(client *c) {
void addReplyNullArrayCore(client *c, bool fAsync)
{
if (c->resp == 2) {
addReplyProto(c,"*-1\r\n",5);
addReplyProtoCore(c,"*-1\r\n",5,fAsync);
} else {
addReplyProto(c,"_\r\n",3);
addReplyProtoCore(c,"_\r\n",3,fAsync);
}
}
void addReplyNullArray(client *c)
{
addReplyNullArrayCore(c, false);
}
void addReplyNullArrayAsync(client *c)
{
addReplyNullArrayCore(c, true);
}
/* Create the length prefix of a bulk reply, example: $2234 */
void addReplyBulkLenCore(client *c, robj_roptr obj, bool fAsync) {
size_t len = stringObjectLen(obj);
@ -1688,7 +1699,7 @@ void ProcessPendingAsyncWrites()
std::lock_guard<decltype(c->lock)> lock(c->lock);
serverAssert(c->casyncOpsPending > 0);
c->casyncOpsPending--;
connSetWriteHandler(c->conn, sendReplyToClient);
connSetWriteHandler(c->conn, sendReplyToClient, true);
}, false) == AE_ERR
)
{
@ -1753,7 +1764,7 @@ int handleClientsWithPendingWrites(int iel) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags) == C_ERR)
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR)
freeClientAsync(c);
}
}
@ -1817,7 +1828,7 @@ void unprotectClient(client *c) {
AssertCorrectThread(c);
if (c->flags & CLIENT_PROTECTED) {
c->flags &= ~CLIENT_PROTECTED;
connSetReadHandler(c->conn,readQueryFromClient);
connSetReadHandler(c->conn,readQueryFromClient, true);
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
}
}
@ -3096,6 +3107,7 @@ int processEventsWhileBlocked(int iel) {
c->lock.unlock();
}
aeReleaseLock();
serverAssertDebug(!GlobalLocksAcquired());
while (iterations--) {
int events = 0;
events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);

View File

@ -1106,7 +1106,7 @@ void putSlaveOnline(client *replica) {
replica->replstate = SLAVE_STATE_ONLINE;
replica->repl_put_online_on_ack = 0;
replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */
if (connSetWriteHandler(replica->conn, sendReplyToClient) == C_ERR) {
if (connSetWriteHandler(replica->conn, sendReplyToClient, true) == C_ERR) {
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
freeClient(replica);
return;
@ -1565,7 +1565,7 @@ void replicationCreateMasterClient(redisMaster *mi, connection *conn, int dbid)
if (conn)
{
serverAssert(connGetPrivateData(mi->master->conn) == mi->master);
connSetReadHandler(mi->master->conn, readQueryFromClient);
connSetReadHandler(mi->master->conn, readQueryFromClient, true);
}
mi->master->flags |= CLIENT_MASTER;
mi->master->authenticated = 1;
@ -3147,7 +3147,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, connection *conn) {
/* Re-add to the list of clients. */
linkClient(mi->master);
serverAssert(connGetPrivateData(mi->master->conn) == mi->master);
if (connSetReadHandler(mi->master->conn, readQueryFromClient)) {
if (connSetReadHandler(mi->master->conn, readQueryFromClient, true)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(mi->master); /* Close ASAP. */
}
@ -3155,7 +3155,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, connection *conn) {
/* We may also need to install the write handler as well if there is
* pending data in the write buffers. */
if (clientHasPendingReplies(mi->master)) {
if (connSetWriteHandler(mi->master->conn, sendReplyToClient)) {
if (connSetWriteHandler(mi->master->conn, sendReplyToClient, true)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(mi->master); /* Close ASAP. */
}

View File

@ -594,6 +594,11 @@ public:
/* We can print the stacktrace, so our assert is defined this way: */
#define serverAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_serverAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
#define serverAssert(_e) ((_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),_exit(1)))
#ifdef _DEBUG
#define serverAssertDebug(_e) serverAssert(_e)
#else
#define serverAssertDebug(_e)
#endif
#define serverPanic(...) _serverPanic(__FILE__,__LINE__,__VA_ARGS__),_exit(1)
/*-----------------------------------------------------------------------------
@ -2113,6 +2118,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(connection *conn);
void addReplyNull(client *c, robj_roptr objOldProtocol = nullptr);
void addReplyNullArray(client *c);
void addReplyNullArrayAsync(client *c);
void addReplyBool(client *c, int b);
void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext);
void addReplyProto(client *c, const char *s, size_t len);