diff --git a/keydb.conf b/keydb.conf index 03db7b6d0..39bde2789 100644 --- a/keydb.conf +++ b/keydb.conf @@ -376,9 +376,9 @@ dir ./ # # However this is not enough if you are using KeyDB ACLs (for Redis version # 6 or greater), and the default user is not capable of running the PSYNC -# command and/or other commands needed for replication. In this case it's -# better to configure a special user to use with replication, and specify the -# masteruser configuration as such: +# command and/or other commands needed for replication (gathered in the +# @replication group). In this case it's better to configure a special user to +# use with replication, and specify the masteruser configuration as such: # # masteruser # diff --git a/src/acl.cpp b/src/acl.cpp index 1453d1fa2..b3a4ea522 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -80,6 +80,7 @@ struct ACLCategoryItem { {"connection", CMD_CATEGORY_CONNECTION}, {"transaction", CMD_CATEGORY_TRANSACTION}, {"scripting", CMD_CATEGORY_SCRIPTING}, + {"replication", CMD_CATEGORY_REPLICATION}, {NULL,0} /* Terminator. */ }; diff --git a/src/connection.h b/src/connection.h index f5624b596..515229d6a 100644 --- a/src/connection.h +++ b/src/connection.h @@ -68,6 +68,7 @@ typedef struct ConnectionType { ssize_t (*sync_write)(struct connection *conn, const char *ptr, ssize_t size, long long timeout); ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); + void (*marshal_thread)(struct connection *conn); } ConnectionType; struct connection { @@ -198,6 +199,11 @@ static inline ssize_t connSyncReadLine(connection *conn, char *ptr, ssize_t size return conn->type->sync_readline(conn, ptr, size, timeout); } +static inline void connMarshalThread(connection *conn) { + if (conn->type->marshal_thread != nullptr) + conn->type->marshal_thread(conn); +} + connection *connCreateSocket(); connection *connCreateAcceptedSocket(int fd); diff --git a/src/fastlock.cpp b/src/fastlock.cpp index cc6eea99a..28092828f 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -433,8 +433,9 @@ extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex) extern "C" void fastlock_free(struct fastlock *lock) { // NOP - serverAssert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Asser the lock is unlocked - || (lock->m_pidOwner == gettid() && (lock->m_ticket.m_active == lock->m_ticket.m_avail-1))); // OR we own the lock and nobody else is waiting + serverAssert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Assert the lock is unlocked + || (lock->m_pidOwner == gettid() + && (lock->m_ticket.m_active == static_cast(lock->m_ticket.m_avail-1U)))); // OR we own the lock and nobody else is waiting lock->m_pidOwner = -2; // sentinal value indicating free ANNOTATE_RWLOCK_DESTROY(lock); } diff --git a/src/help.h b/src/help.h index eae4e579b..2f692dfc2 100644 --- a/src/help.h +++ b/src/help.h @@ -18,7 +18,8 @@ static char *commandGroups[] = { "hyperloglog", "cluster", "geo", - "stream" + "stream", + "replication" }; struct commandHelp { diff --git a/src/networking.cpp b/src/networking.cpp index 7a7eaa379..286b0e94e 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1315,7 +1315,8 @@ void acceptOnThread(connection *conn, int flags, char *cip) szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); } - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT]{ + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { + connMarshalThread(conn); acceptCommonHandler(conn,flags,szT,ielTarget); if (!g_fTestMode && !g_pserver->loading) rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); diff --git a/src/replication.cpp b/src/replication.cpp index 21768a89a..bd33cb810 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4197,21 +4197,27 @@ void replicaReplayCommand(client *c) cFake->lock.unlock(); if (cFake->master_error) { + selectDb(c, cFake->db->id); + freeClient(cFake); + remoteState.cFake = cFake = nullptr; addReplyError(c, "Error in rreplay command, please check logs."); } - if (fExec || cFake->flags & CLIENT_MULTI) + if (cFake != nullptr) { - addReply(c, shared.ok); - selectDb(c, cFake->db->id); - if (mvcc > remoteState.mvcc) - remoteState.mvcc = mvcc; + if (fExec || cFake->flags & CLIENT_MULTI) + { + addReply(c, shared.ok); + selectDb(c, cFake->db->id); + if (mvcc > remoteState.mvcc) + remoteState.mvcc = mvcc; + } + else + { + serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); + addReplyError(c, "command did not execute"); + } + serverAssert(sdslen(cFake->querybuf) == 0); } - else - { - serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); - addReplyError(c, "command did not execute"); - } - serverAssert(sdslen(cFake->querybuf) == 0); serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.cpp b/src/server.cpp index 15f5729d1..157655805 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -181,7 +181,7 @@ volatile unsigned long lru_clock; /* Server global current LRU time. */ * * @keyspace, @read, @write, @set, @sortedset, @list, @hash, @string, @bitmap, * @hyperloglog, @stream, @admin, @fast, @slow, @pubsub, @blocking, @dangerous, - * @connection, @transaction, @scripting, @geo. + * @connection, @transaction, @scripting, @geo, @replication. * * Note that: * @@ -674,7 +674,7 @@ struct redisCommand redisCommandTable[] = { * failure detection, and a loading server is considered to be * not available. */ {"ping",pingCommand,-1, - "ok-stale fast @connection", + "ok-stale fast @connection @replication", 0,NULL,0,0,0,0,0,0}, {"echo",echoCommand,2, @@ -718,15 +718,15 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"sync",syncCommand,1, - "admin no-script", + "admin no-script @replication", 0,NULL,0,0,0,0,0,0}, {"psync",syncCommand,3, - "admin no-script", + "admin no-script @replication", 0,NULL,0,0,0,0,0,0}, {"replconf",replconfCommand,-1, - "admin no-script ok-loading ok-stale", + "admin no-script ok-loading ok-stale @replication", 0,NULL,0,0,0,0,0,0}, {"flushdb",flushdbCommand,-1, @@ -3133,7 +3133,7 @@ static void initNetworking(int fReusePort) } /* Abort if there are no listening sockets at all. */ - if (g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].tlsfd_count && g_pserver->sofd < 0) { + if (g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].tlsfd_count == 0 && g_pserver->sofd < 0) { serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } diff --git a/src/server.h b/src/server.h index 99167d040..a04e3301c 100644 --- a/src/server.h +++ b/src/server.h @@ -58,6 +58,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -436,7 +437,8 @@ public: #define CMD_CATEGORY_CONNECTION (1ULL<<36) #define CMD_CATEGORY_TRANSACTION (1ULL<<37) #define CMD_CATEGORY_SCRIPTING (1ULL<<38) -#define CMD_SKIP_PROPOGATE (1ULL<<39) /* "noprop" flag */ +#define CMD_CATEGORY_REPLICATION (1ULL<<39) +#define CMD_SKIP_PROPOGATE (1ULL<<40) /* "noprop" flag */ /* AOF states */ #define AOF_OFF 0 /* AOF is off */ diff --git a/src/tls.cpp b/src/tls.cpp index 08135e585..c125eabc2 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -347,6 +347,11 @@ connection *connCreateTLS(void) { return (connection *) conn; } +void connTLSMarshalThread(connection *c) { + tls_connection *conn = (tls_connection*)c; + conn->el = serverTL->el; +} + connection *connCreateAcceptedTLS(int fd, int require_auth) { tls_connection *conn = (tls_connection *) connCreateTLS(); conn->c.fd = fd; @@ -893,6 +898,10 @@ ConnectionType CT_TLS = { connTLSSyncWrite, connTLSSyncRead, connTLSSyncReadLine, +<<<<<<< HEAD +======= + connTLSMarshalThread, +>>>>>>> unstable }; int tlsHasPendingData() { diff --git a/src/tracking.cpp b/src/tracking.cpp index d0eee35ff..2c0f4812b 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -202,6 +202,9 @@ void trackingRememberKeys(client *c) { * to the client as value of the invalidation. This is used in BCAST mode * in order to optimized the implementation to use less CPU time. */ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int proto) { + std::unique_lock ul(c->lock); + serverAssert(c->lock.fOwnLock()); + int using_redirection = 0; if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); @@ -210,12 +213,14 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot * are unable to send invalidation messages to the redirected * connection, because the client no longer exist. */ if (c->resp > 2) { - addReplyPushLen(c,3); - addReplyBulkCBuffer(c,"tracking-redir-broken",21); - addReplyLongLong(c,c->client_tracking_redirection); + addReplyPushLenAsync(c,3); + addReplyBulkCBufferAsync(c,"tracking-redir-broken",21); + addReplyLongLongAsync(c,c->client_tracking_redirection); } return; } + ul.unlock(); + ul = std::unique_lock(redir->lock); c = redir; using_redirection = 1; } @@ -225,8 +230,8 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot * in Pub/Sub mode, we can support the feature with RESP 2 as well, * by sending Pub/Sub messages in the __redis__:invalidate channel. */ if (c->resp > 2) { - addReplyPushLen(c,2); - addReplyBulkCBuffer(c,"invalidate",10); + addReplyPushLenAsync(c,2); + addReplyBulkCBufferAsync(c,"invalidate",10); } else if (using_redirection && c->flags & CLIENT_PUBSUB) { /* We use a static object to speedup things, however we assume * that addReplyPubsubMessage() will not take a reference. */ @@ -241,10 +246,10 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot /* Send the "value" part, which is the array of keys. */ if (proto) { - addReplyProto(c,keyname,keylen); + addReplyProtoAsync(c,keyname,keylen); } else { - addReplyArrayLen(c,1); - addReplyBulkCBuffer(c,keyname,keylen); + addReplyArrayLenAsync(c,1); + addReplyBulkCBufferAsync(c,keyname,keylen); } } diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index d66f1f75d..7ed06b464 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -71,6 +71,7 @@ set ::all_tests { unit/wait unit/pendingquerybuf unit/tls + unit/tracking } # Index to the next test to run in the ::all_tests list. set ::next_test 0 diff --git a/utils/generate-command-help.rb b/utils/generate-command-help.rb index 2e46d1ed8..3acd81244 100755 --- a/utils/generate-command-help.rb +++ b/utils/generate-command-help.rb @@ -15,7 +15,8 @@ GROUPS = [ "hyperloglog", "cluster", "geo", - "stream" + "stream", + "replication" ].freeze GROUPS_BY_NAME = Hash[*