From fcfff165da5caf157ebbb84dcbe72810cce3bdaa Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 24 May 2020 19:57:16 -0400 Subject: [PATCH 1/5] Enable TLS connections Former-commit-id: d05da0fabdfeb7eadce6546c7c1d85739b2794d7 --- src/Makefile | 3 +- src/connection.h | 6 ++ src/networking.cpp | 3 +- src/sentinel.cpp | 2 + src/server.cpp | 3 +- src/server.h | 1 + src/tls.cpp | 188 ++++++++++++++++++++++++++++++++++----------- 7 files changed, 157 insertions(+), 49 deletions(-) diff --git a/src/Makefile b/src/Makefile index da848d474..0eecd5500 100644 --- a/src/Makefile +++ b/src/Makefile @@ -236,7 +236,8 @@ ifeq ($(MALLOC),memkind) endif ifeq ($(BUILD_TLS),yes) - FINAL_CFLAGS+=-DUSE_OPENSSL $(OPENSSL_CFLAGS) + FINAL_CFLAGS+=-DUSE_OPENSSL $(OPENSSL_CXXFLAGS) + FINAL_CXXFLAGS+=-DUSE_OPENSSL $(OPENSSL_CXXFLAGS) FINAL_LDFLAGS+=$(OPENSSL_LDFLAGS) FINAL_LIBS += ../deps/hiredis/libhiredis_ssl.a -lssl -lcrypto endif diff --git a/src/connection.h b/src/connection.h index f5624b596..515229d6a 100644 --- a/src/connection.h +++ b/src/connection.h @@ -68,6 +68,7 @@ typedef struct ConnectionType { ssize_t (*sync_write)(struct connection *conn, const char *ptr, ssize_t size, long long timeout); ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); + void (*marshal_thread)(struct connection *conn); } ConnectionType; struct connection { @@ -198,6 +199,11 @@ static inline ssize_t connSyncReadLine(connection *conn, char *ptr, ssize_t size return conn->type->sync_readline(conn, ptr, size, timeout); } +static inline void connMarshalThread(connection *conn) { + if (conn->type->marshal_thread != nullptr) + conn->type->marshal_thread(conn); +} + connection *connCreateSocket(); connection *connCreateAcceptedSocket(int fd); diff --git a/src/networking.cpp b/src/networking.cpp index 58ca4f86b..8b1e889f6 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1313,7 +1313,8 @@ void acceptOnThread(connection *conn, int flags, char *cip) szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); } - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT]{ + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { + connMarshalThread(conn); acceptCommonHandler(conn,flags,szT,ielTarget); if (!g_fTestMode && !g_pserver->loading) rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); diff --git a/src/sentinel.cpp b/src/sentinel.cpp index 16d408ad4..2d116982e 100644 --- a/src/sentinel.cpp +++ b/src/sentinel.cpp @@ -32,7 +32,9 @@ #include "hiredis.h" #ifdef USE_OPENSSL #include "openssl/ssl.h" +extern "C" { #include "hiredis_ssl.h" +} #endif #include "async.h" diff --git a/src/server.cpp b/src/server.cpp index 8566dd7f0..654044699 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2940,7 +2940,7 @@ static void initNetworking(int fReusePort) } /* Abort if there are no listening sockets at all. */ - if (g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].tlsfd_count && g_pserver->sofd < 0) { + if (g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].tlsfd_count == 0 && g_pserver->sofd < 0) { serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } @@ -5305,6 +5305,7 @@ void *workerThreadMain(void *parg) int iel = (int)((int64_t)parg); serverLog(LOG_INFO, "Thread %d alive.", iel); serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global + tlsInitThread(); if (iel != IDX_EVENT_LOOP_MAIN) { diff --git a/src/server.h b/src/server.h index 628e1d3b9..f42968994 100644 --- a/src/server.h +++ b/src/server.h @@ -3061,6 +3061,7 @@ inline int FCorrectThread(client *c) /* TLS stuff */ void tlsInit(void); +void tlsInitThread(); int tlsConfigure(redisTLSContextConfig *ctx_config); diff --git a/src/tls.cpp b/src/tls.cpp index 28a74df9a..d297695cc 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -31,6 +31,8 @@ #include "server.h" #include "connhelpers.h" #include "adlist.h" +#include "aelocker.h" +#include #ifdef USE_OPENSSL @@ -53,6 +55,7 @@ extern ConnectionType CT_Socket; SSL_CTX *redis_tls_ctx; +fastlock g_ctxtlock("SSL CTX"); static int parseProtocolsConfig(const char *str) { int i, count = 0; @@ -91,7 +94,7 @@ static int parseProtocolsConfig(const char *str) { /* list of connections with pending data already read from the socket, but not * served to the reader yet. */ -static list *pending_list = NULL; +static thread_local list *pending_list = NULL; /** * OpenSSL global initialization and locking handling callbacks. @@ -147,10 +150,15 @@ void tlsInit(void) { serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator."); } - pending_list = listCreate(); - /* Server configuration */ - server.tls_auth_clients = 1; /* Secure by default */ + g_pserver->tls_auth_clients = 1; /* Secure by default */ + tlsInitThread(); +} + +void tlsInitThread(void) +{ + serverAssert(pending_list == nullptr); + pending_list = listCreate(); } /* Attempt to configure/reconfigure TLS. This operation is atomic and will @@ -159,6 +167,7 @@ void tlsInit(void) { int tlsConfigure(redisTLSContextConfig *ctx_config) { char errbuf[256]; SSL_CTX *ctx = NULL; + int protocols; if (!ctx_config->cert_file) { serverLog(LL_WARNING, "No tls-cert-file configured!"); @@ -184,7 +193,7 @@ int tlsConfigure(redisTLSContextConfig *ctx_config) { SSL_CTX_set_options(ctx, SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS); #endif - int protocols = parseProtocolsConfig(ctx_config->protocols); + protocols = parseProtocolsConfig(ctx_config->protocols); if (protocols == -1) goto error; if (!(protocols & REDIS_TLS_PROTO_TLSv1)) @@ -272,8 +281,11 @@ int tlsConfigure(redisTLSContextConfig *ctx_config) { } #endif + { + std::unique_lock ul(g_ctxtlock); SSL_CTX_free(redis_tls_ctx); redis_tls_ctx = ctx; + } return C_OK; @@ -289,7 +301,7 @@ error: #define TLSCONN_DEBUG(fmt, ...) #endif -ConnectionType CT_TLS; +extern ConnectionType CT_TLS; /* Normal socket connections have a simple events/handler correlation. * @@ -307,6 +319,7 @@ ConnectionType CT_TLS; */ typedef enum { + WANT_INVALID = 0, WANT_READ = 1, WANT_WRITE } WantIOType; @@ -321,16 +334,24 @@ typedef struct tls_connection { SSL *ssl; char *ssl_error; listNode *pending_list_node; + aeEventLoop *el; } tls_connection; connection *connCreateTLS(void) { - tls_connection *conn = zcalloc(sizeof(tls_connection)); + tls_connection *conn = (tls_connection*)zcalloc(sizeof(tls_connection), MALLOC_LOCAL); conn->c.type = &CT_TLS; conn->c.fd = -1; + std::unique_lock ul(g_ctxtlock); conn->ssl = SSL_new(redis_tls_ctx); + conn->el = serverTL->el; return (connection *) conn; } +void connTLSMarshalThread(connection *c) { + tls_connection *conn = (tls_connection*)c; + conn->el = serverTL->el; +} + connection *connCreateAcceptedTLS(int fd, int require_auth) { tls_connection *conn = (tls_connection *) connCreateTLS(); conn->c.fd = fd; @@ -374,7 +395,7 @@ static int handleSSLReturnCode(tls_connection *conn, int ret_value, WantIOType * /* Error! */ conn->c.last_errno = 0; if (conn->ssl_error) zfree(conn->ssl_error); - conn->ssl_error = zmalloc(512); + conn->ssl_error = (char*)zmalloc(512); ERR_error_string_n(ERR_get_error(), conn->ssl_error, 512); break; } @@ -386,17 +407,19 @@ static int handleSSLReturnCode(tls_connection *conn, int ret_value, WantIOType * } void registerSSLEvent(tls_connection *conn, WantIOType want) { - int mask = aeGetFileEvents(server.el, conn->c.fd); + int mask = aeGetFileEvents(serverTL->el, conn->c.fd); + + serverAssert(conn->el == serverTL->el); switch (want) { case WANT_READ: - if (mask & AE_WRITABLE) aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE); - if (!(mask & AE_READABLE)) aeCreateFileEvent(server.el, conn->c.fd, AE_READABLE, + if (mask & AE_WRITABLE) aeDeleteFileEvent(conn->el, conn->c.fd, AE_WRITABLE); + if (!(mask & AE_READABLE)) aeCreateFileEvent(conn->el, conn->c.fd, AE_READABLE|AE_READ_THREADSAFE, tlsEventHandler, conn); break; case WANT_WRITE: - if (mask & AE_READABLE) aeDeleteFileEvent(server.el, conn->c.fd, AE_READABLE); - if (!(mask & AE_WRITABLE)) aeCreateFileEvent(server.el, conn->c.fd, AE_WRITABLE, + if (mask & AE_READABLE) aeDeleteFileEvent(conn->el, conn->c.fd, AE_READABLE); + if (!(mask & AE_WRITABLE)) aeCreateFileEvent(conn->el, conn->c.fd, AE_WRITABLE|AE_WRITE_THREADSAFE, tlsEventHandler, conn); break; default: @@ -405,24 +428,38 @@ void registerSSLEvent(tls_connection *conn, WantIOType want) { } } -void updateSSLEvent(tls_connection *conn) { - int mask = aeGetFileEvents(server.el, conn->c.fd); +void updateSSLEventCore(tls_connection *conn) { + int mask = aeGetFileEvents(serverTL->el, conn->c.fd); int need_read = conn->c.read_handler || (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ); int need_write = conn->c.write_handler || (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE); + serverAssert(conn->el == serverTL->el); + if (need_read && !(mask & AE_READABLE)) - aeCreateFileEvent(server.el, conn->c.fd, AE_READABLE, tlsEventHandler, conn); + aeCreateFileEvent(serverTL->el, conn->c.fd, AE_READABLE|AE_READ_THREADSAFE, tlsEventHandler, conn); if (!need_read && (mask & AE_READABLE)) - aeDeleteFileEvent(server.el, conn->c.fd, AE_READABLE); + aeDeleteFileEvent(serverTL->el, conn->c.fd, AE_READABLE); if (need_write && !(mask & AE_WRITABLE)) - aeCreateFileEvent(server.el, conn->c.fd, AE_WRITABLE, tlsEventHandler, conn); + aeCreateFileEvent(serverTL->el, conn->c.fd, AE_WRITABLE|AE_WRITE_THREADSAFE, tlsEventHandler, conn); if (!need_write && (mask & AE_WRITABLE)) - aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE); + aeDeleteFileEvent(serverTL->el, conn->c.fd, AE_WRITABLE); } -static void tlsHandleEvent(tls_connection *conn, int mask) { +void updateSSLEvent(tls_connection *conn) { + if (conn->el != serverTL->el) { + aePostFunction(conn->el, [conn]{ + updateSSLEventCore(conn); + }); + } else { + updateSSLEventCore(conn); + } +} + +void tlsHandleEvent(tls_connection *conn, int mask) { int ret; + serverAssert(!GlobalLocksAcquired()); + serverAssert(conn->el == serverTL->el); TLSCONN_DEBUG("tlsEventHandler(): fd=%d, state=%d, mask=%d, r=%d, w=%d, flags=%d", fd, conn->c.state, mask, conn->c.read_handler != NULL, conn->c.write_handler != NULL, @@ -442,7 +479,7 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { } ret = SSL_connect(conn->ssl); if (ret <= 0) { - WantIOType want = 0; + WantIOType want = WANT_INVALID; if (!handleSSLReturnCode(conn, ret, &want)) { registerSSLEvent(conn, want); @@ -460,13 +497,17 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { } } + { + AeLocker locker; + locker.arm(nullptr); if (!callHandler((connection *) conn, conn->c.conn_handler)) return; + } conn->c.conn_handler = NULL; break; case CONN_STATE_ACCEPTING: ret = SSL_accept(conn->ssl); if (ret <= 0) { - WantIOType want = 0; + WantIOType want = WANT_INVALID; if (!handleSSLReturnCode(conn, ret, &want)) { /* Avoid hitting UpdateSSLEvent, which knows nothing * of what SSL_connect() wants and instead looks at our @@ -482,7 +523,11 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { conn->c.state = CONN_STATE_CONNECTED; } + { + AeLocker locker; + locker.arm(nullptr); if (!callHandler((connection *) conn, conn->c.conn_handler)) return; + } conn->c.conn_handler = NULL; break; case CONN_STATE_CONNECTED: @@ -506,12 +551,18 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { int invert = conn->c.flags & CONN_FLAG_WRITE_BARRIER; if (!invert && call_read) { + AeLocker lock; + if (!(conn->c.flags & CONN_FLAG_READ_THREADSAFE)) + lock.arm(nullptr); conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE; if (!callHandler((connection *) conn, conn->c.read_handler)) return; } /* Fire the writable event. */ if (call_write) { + AeLocker lock; + if (!(conn->c.flags & CONN_FLAG_WRITE_THREADSAFE)) + lock.arm(nullptr); conn->flags &= ~TLS_CONN_FLAG_WRITE_WANT_READ; if (!callHandler((connection *) conn, conn->c.write_handler)) return; } @@ -519,6 +570,9 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && call_read) { + AeLocker lock; + if (!(conn->c.flags & CONN_FLAG_READ_THREADSAFE)) + lock.arm(nullptr); conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE; if (!callHandler((connection *) conn, conn->c.read_handler)) return; } @@ -550,12 +604,12 @@ static void tlsHandleEvent(tls_connection *conn, int mask) { static void tlsEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) { UNUSED(el); UNUSED(fd); - tls_connection *conn = clientData; + tls_connection *conn = (tls_connection*)clientData; tlsHandleEvent(conn, mask); } -static void connTLSClose(connection *conn_) { - tls_connection *conn = (tls_connection *) conn_; +static void connTLSCloseCore(tls_connection *conn) { + serverAssert(conn->el == serverTL->el); if (conn->ssl) { SSL_free(conn->ssl); @@ -572,13 +626,26 @@ static void connTLSClose(connection *conn_) { conn->pending_list_node = NULL; } - CT_Socket.close(conn_); + CT_Socket.close(&conn->c); +} + +static void connTLSClose(connection *conn_) { + tls_connection *conn = (tls_connection *) conn_; + if (conn->el != serverTL->el) { + aePostFunction(conn->el, [conn]{ + connTLSCloseCore(conn); + }); + } else { + connTLSCloseCore(conn); + } } static int connTLSAccept(connection *_conn, ConnectionCallbackFunc accept_handler) { tls_connection *conn = (tls_connection *) _conn; int ret; + serverAssert(conn->el == serverTL->el); + if (conn->c.state != CONN_STATE_ACCEPTING) return C_ERR; ERR_clear_error(); @@ -587,7 +654,7 @@ static int connTLSAccept(connection *_conn, ConnectionCallbackFunc accept_handle ret = SSL_accept(conn->ssl); if (ret <= 0) { - WantIOType want = 0; + WantIOType want = WANT_INVALID; if (!handleSSLReturnCode(conn, ret, &want)) { registerSSLEvent(conn, want); /* We'll fire back */ return C_OK; @@ -607,6 +674,8 @@ static int connTLSAccept(connection *_conn, ConnectionCallbackFunc accept_handle static int connTLSConnect(connection *conn_, const char *addr, int port, const char *src_addr, ConnectionCallbackFunc connect_handler) { tls_connection *conn = (tls_connection *) conn_; + serverAssert(conn->el == serverTL->el); + if (conn->c.state != CONN_STATE_NONE) return C_ERR; ERR_clear_error(); @@ -628,7 +697,7 @@ static int connTLSWrite(connection *conn_, const void *data, size_t data_len) { ret = SSL_write(conn->ssl, data, data_len); if (ret <= 0) { - WantIOType want = 0; + WantIOType want = WANT_INVALID; if (!(ssl_err = handleSSLReturnCode(conn, ret, &want))) { if (want == WANT_READ) conn->flags |= TLS_CONN_FLAG_WRITE_WANT_READ; updateSSLEvent(conn); @@ -654,11 +723,13 @@ static int connTLSRead(connection *conn_, void *buf, size_t buf_len) { int ret; int ssl_err; + serverAssert(conn->el == serverTL->el); + if (conn->c.state != CONN_STATE_CONNECTED) return -1; ERR_clear_error(); ret = SSL_read(conn->ssl, buf, buf_len); if (ret <= 0) { - WantIOType want = 0; + WantIOType want = WANT_INVALID; if (!(ssl_err = handleSSLReturnCode(conn, ret, &want))) { if (want == WANT_WRITE) conn->flags |= TLS_CONN_FLAG_READ_WANT_WRITE; updateSSLEvent(conn); @@ -687,18 +758,32 @@ static const char *connTLSGetLastError(connection *conn_) { return NULL; } -int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) { +int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier, bool fThreadSafe) { + serverAssert(((tls_connection*)conn)->el == serverTL->el); conn->write_handler = func; if (barrier) conn->flags |= CONN_FLAG_WRITE_BARRIER; else conn->flags &= ~CONN_FLAG_WRITE_BARRIER; + + if (fThreadSafe) + conn->flags |= CONN_FLAG_WRITE_THREADSAFE; + else + conn->flags &= ~CONN_FLAG_WRITE_THREADSAFE; + updateSSLEvent((tls_connection *) conn); return C_OK; } -int connTLSSetReadHandler(connection *conn, ConnectionCallbackFunc func) { +int connTLSSetReadHandler(connection *conn, ConnectionCallbackFunc func, bool fThreadSafe) { + serverAssert(((tls_connection*)conn)->el == serverTL->el); conn->read_handler = func; + + if (fThreadSafe) + conn->flags |= CONN_FLAG_READ_THREADSAFE; + else + conn->flags &= ~CONN_FLAG_READ_THREADSAFE; + updateSSLEvent((tls_connection *) conn); return C_OK; } @@ -719,6 +804,8 @@ static int connTLSBlockingConnect(connection *conn_, const char *addr, int port, tls_connection *conn = (tls_connection *) conn_; int ret; + serverAssert(conn->el == serverTL->el); + if (conn->c.state != CONN_STATE_NONE) return C_ERR; /* Initiate socket blocking connect first */ @@ -739,9 +826,11 @@ static int connTLSBlockingConnect(connection *conn_, const char *addr, int port, return C_OK; } -static ssize_t connTLSSyncWrite(connection *conn_, char *ptr, ssize_t size, long long timeout) { +static ssize_t connTLSSyncWrite(connection *conn_, const char *ptr, ssize_t size, long long timeout) { tls_connection *conn = (tls_connection *) conn_; + serverAssert(conn->el == serverTL->el); + setBlockingTimeout(conn, timeout); SSL_clear_mode(conn->ssl, SSL_MODE_ENABLE_PARTIAL_WRITE); int ret = SSL_write(conn->ssl, ptr, size); @@ -754,6 +843,8 @@ static ssize_t connTLSSyncWrite(connection *conn_, char *ptr, ssize_t size, long static ssize_t connTLSSyncRead(connection *conn_, char *ptr, ssize_t size, long long timeout) { tls_connection *conn = (tls_connection *) conn_; + serverAssert(conn->el == serverTL->el); + setBlockingTimeout(conn, timeout); int ret = SSL_read(conn->ssl, ptr, size); unsetBlockingTimeout(conn); @@ -765,6 +856,8 @@ static ssize_t connTLSSyncReadLine(connection *conn_, char *ptr, ssize_t size, l tls_connection *conn = (tls_connection *) conn_; ssize_t nread = 0; + serverAssert(conn->el == serverTL->el); + setBlockingTimeout(conn, timeout); size--; @@ -792,19 +885,20 @@ exit: } ConnectionType CT_TLS = { - .ae_handler = tlsEventHandler, - .accept = connTLSAccept, - .connect = connTLSConnect, - .blocking_connect = connTLSBlockingConnect, - .read = connTLSRead, - .write = connTLSWrite, - .close = connTLSClose, - .set_write_handler = connTLSSetWriteHandler, - .set_read_handler = connTLSSetReadHandler, - .get_last_error = connTLSGetLastError, - .sync_write = connTLSSyncWrite, - .sync_read = connTLSSyncRead, - .sync_readline = connTLSSyncReadLine, + tlsEventHandler, + connTLSConnect, + connTLSWrite, + connTLSRead, + connTLSClose, + connTLSAccept, + connTLSSetWriteHandler, + connTLSSetReadHandler, + connTLSGetLastError, + connTLSBlockingConnect, + connTLSSyncWrite, + connTLSSyncRead, + connTLSSyncReadLine, + connTLSMarshalThread, }; int tlsHasPendingData() { @@ -820,7 +914,7 @@ int tlsProcessPendingData() { int processed = listLength(pending_list); listRewind(pending_list,&li); while((ln = listNext(&li))) { - tls_connection *conn = listNodeValue(ln); + tls_connection *conn = (tls_connection*)listNodeValue(ln); tlsHandleEvent(conn, AE_READABLE); } return processed; @@ -855,4 +949,6 @@ int tlsProcessPendingData() { return 0; } +void tlsInitThread() {} + #endif From fa671d51931d170a6cbeafa5e6566e97feb7ac9e Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 24 May 2020 20:05:22 -0400 Subject: [PATCH 2/5] Fix infinite logs of Error in rreplay Former-commit-id: 00cef78858cd05fc067fa5002f2995653452c63f --- src/replication.cpp | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 599c82030..0d9004c5f 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4150,21 +4150,27 @@ void replicaReplayCommand(client *c) cFake->lock.unlock(); if (cFake->master_error) { + selectDb(c, cFake->db->id); + freeClient(cFake); + remoteState.cFake = cFake = nullptr; addReplyError(c, "Error in rreplay command, please check logs."); } - if (fExec || cFake->flags & CLIENT_MULTI) + if (cFake != nullptr) { - addReply(c, shared.ok); - selectDb(c, cFake->db->id); - if (mvcc > remoteState.mvcc) - remoteState.mvcc = mvcc; + if (fExec || cFake->flags & CLIENT_MULTI) + { + addReply(c, shared.ok); + selectDb(c, cFake->db->id); + if (mvcc > remoteState.mvcc) + remoteState.mvcc = mvcc; + } + else + { + serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); + addReplyError(c, "command did not execute"); + } + serverAssert(sdslen(cFake->querybuf) == 0); } - else - { - serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); - addReplyError(c, "command did not execute"); - } - serverAssert(sdslen(cFake->querybuf) == 0); serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here From c15802af8b0fe433ab349e7647bd56ced3f71855 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 25 May 2020 02:13:57 -0400 Subject: [PATCH 3/5] Fix issue #170. Intermittent crash destroying fastlock due to implicit type conversion in assert Former-commit-id: ecbe168b5421687899a621c995e44f43faec2b71 --- src/fastlock.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index cc6eea99a..28092828f 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -433,8 +433,9 @@ extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex) extern "C" void fastlock_free(struct fastlock *lock) { // NOP - serverAssert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Asser the lock is unlocked - || (lock->m_pidOwner == gettid() && (lock->m_ticket.m_active == lock->m_ticket.m_avail-1))); // OR we own the lock and nobody else is waiting + serverAssert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Assert the lock is unlocked + || (lock->m_pidOwner == gettid() + && (lock->m_ticket.m_active == static_cast(lock->m_ticket.m_avail-1U)))); // OR we own the lock and nobody else is waiting lock->m_pidOwner = -2; // sentinal value indicating free ANNOTATE_RWLOCK_DESTROY(lock); } From b9b4b4cfde9bb64c1daf31313b93f04d6490a9d6 Mon Sep 17 00:00:00 2001 From: Maxime de Roucy Date: Sat, 2 May 2020 16:23:50 +0200 Subject: [PATCH 4/5] acl: @replication command group containing mandatory commands for replication (used by masteruser user) Former-commit-id: 123bc125cfe7b30fdeef833759a33ccc854b5c51 --- keydb.conf | 6 +++--- src/acl.cpp | 1 + src/help.h | 3 ++- src/server.cpp | 10 +++++----- src/server.h | 3 ++- utils/generate-command-help.rb | 3 ++- 6 files changed, 15 insertions(+), 11 deletions(-) diff --git a/keydb.conf b/keydb.conf index 30e12a5e0..68648df35 100644 --- a/keydb.conf +++ b/keydb.conf @@ -376,9 +376,9 @@ dir ./ # # However this is not enough if you are using KeyDB ACLs (for Redis version # 6 or greater), and the default user is not capable of running the PSYNC -# command and/or other commands needed for replication. In this case it's -# better to configure a special user to use with replication, and specify the -# masteruser configuration as such: +# command and/or other commands needed for replication (gathered in the +# @replication group). In this case it's better to configure a special user to +# use with replication, and specify the masteruser configuration as such: # # masteruser # diff --git a/src/acl.cpp b/src/acl.cpp index 1453d1fa2..b3a4ea522 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -80,6 +80,7 @@ struct ACLCategoryItem { {"connection", CMD_CATEGORY_CONNECTION}, {"transaction", CMD_CATEGORY_TRANSACTION}, {"scripting", CMD_CATEGORY_SCRIPTING}, + {"replication", CMD_CATEGORY_REPLICATION}, {NULL,0} /* Terminator. */ }; diff --git a/src/help.h b/src/help.h index eae4e579b..2f692dfc2 100644 --- a/src/help.h +++ b/src/help.h @@ -18,7 +18,8 @@ static char *commandGroups[] = { "hyperloglog", "cluster", "geo", - "stream" + "stream", + "replication" }; struct commandHelp { diff --git a/src/server.cpp b/src/server.cpp index 654044699..cfd739d24 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -180,7 +180,7 @@ volatile unsigned long lru_clock; /* Server global current LRU time. */ * * @keyspace, @read, @write, @set, @sortedset, @list, @hash, @string, @bitmap, * @hyperloglog, @stream, @admin, @fast, @slow, @pubsub, @blocking, @dangerous, - * @connection, @transaction, @scripting, @geo. + * @connection, @transaction, @scripting, @geo, @replication. * * Note that: * @@ -673,7 +673,7 @@ struct redisCommand redisCommandTable[] = { * failure detection, and a loading server is considered to be * not available. */ {"ping",pingCommand,-1, - "ok-stale fast @connection", + "ok-stale fast @connection @replication", 0,NULL,0,0,0,0,0,0}, {"echo",echoCommand,2, @@ -717,15 +717,15 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"sync",syncCommand,1, - "admin no-script", + "admin no-script @replication", 0,NULL,0,0,0,0,0,0}, {"psync",syncCommand,3, - "admin no-script", + "admin no-script @replication", 0,NULL,0,0,0,0,0,0}, {"replconf",replconfCommand,-1, - "admin no-script ok-loading ok-stale", + "admin no-script ok-loading ok-stale @replication", 0,NULL,0,0,0,0,0,0}, {"flushdb",flushdbCommand,-1, diff --git a/src/server.h b/src/server.h index f42968994..7a826a7cd 100644 --- a/src/server.h +++ b/src/server.h @@ -372,7 +372,8 @@ public: #define CMD_CATEGORY_CONNECTION (1ULL<<36) #define CMD_CATEGORY_TRANSACTION (1ULL<<37) #define CMD_CATEGORY_SCRIPTING (1ULL<<38) -#define CMD_SKIP_PROPOGATE (1ULL<<39) /* "noprop" flag */ +#define CMD_CATEGORY_REPLICATION (1ULL<<39) +#define CMD_SKIP_PROPOGATE (1ULL<<40) /* "noprop" flag */ /* AOF states */ #define AOF_OFF 0 /* AOF is off */ diff --git a/utils/generate-command-help.rb b/utils/generate-command-help.rb index 2e46d1ed8..3acd81244 100755 --- a/utils/generate-command-help.rb +++ b/utils/generate-command-help.rb @@ -15,7 +15,8 @@ GROUPS = [ "hyperloglog", "cluster", "geo", - "stream" + "stream", + "replication" ].freeze GROUPS_BY_NAME = Hash[* From d7fa406a0ebdcef493c61879706fb32b447ebd7d Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 26 May 2020 01:28:52 -0400 Subject: [PATCH 5/5] Fix client tracking, also ensure tracking tests are enabled Former-commit-id: 1938af27f50f9686dc98b4839fb439cc03b4a250 --- src/server.h | 1 + src/tracking.cpp | 21 +++++++++++++-------- tests/test_helper.tcl | 1 + 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/server.h b/src/server.h index f42968994..9fd0e55d1 100644 --- a/src/server.h +++ b/src/server.h @@ -56,6 +56,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include diff --git a/src/tracking.cpp b/src/tracking.cpp index d0eee35ff..2c0f4812b 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -202,6 +202,9 @@ void trackingRememberKeys(client *c) { * to the client as value of the invalidation. This is used in BCAST mode * in order to optimized the implementation to use less CPU time. */ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int proto) { + std::unique_lock ul(c->lock); + serverAssert(c->lock.fOwnLock()); + int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -210,12 +213,14 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot * are unable to send invalidation messages to the redirected * connection, because the client no longer exist. */ if (c->resp > 2) { - addReplyPushLen(c,3); - addReplyBulkCBuffer(c,"tracking-redir-broken",21); - addReplyLongLong(c,c->client_tracking_redirection); + addReplyPushLenAsync(c,3); + addReplyBulkCBufferAsync(c,"tracking-redir-broken",21); + addReplyLongLongAsync(c,c->client_tracking_redirection); } return; } + ul.unlock(); + ul = std::unique_lock(redir->lock); c = redir; using_redirection = 1; } @@ -225,8 +230,8 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot * in Pub/Sub mode, we can support the feature with RESP 2 as well, * by sending Pub/Sub messages in the __redis__:invalidate channel. */ if (c->resp > 2) { - addReplyPushLen(c,2); - addReplyBulkCBuffer(c,"invalidate",10); + addReplyPushLenAsync(c,2); + addReplyBulkCBufferAsync(c,"invalidate",10); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ @@ -241,10 +246,10 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot /* Send the "value" part, which is the array of keys. */ if (proto) { - addReplyProto(c,keyname,keylen); + addReplyProtoAsync(c,keyname,keylen); } else { - addReplyArrayLen(c,1); - addReplyBulkCBuffer(c,keyname,keylen); + addReplyArrayLenAsync(c,1); + addReplyBulkCBufferAsync(c,keyname,keylen); } } diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index f5e690f3d..b2b3b4333 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -70,6 +70,7 @@ set ::all_tests { unit/wait unit/pendingquerybuf unit/tls + unit/tracking } # Index to the next test to run in the ::all_tests list. set ::next_test 0