Merge branch 'unstable' into keydbpro

Former-commit-id: b0636273806ce323627ce31c1ad7f86ceb39696d
This commit is contained in:
John Sully 2020-05-26 01:34:19 -04:00
commit d50c0c5ba3
13 changed files with 68 additions and 34 deletions

View File

@ -376,9 +376,9 @@ dir ./
# #
# However this is not enough if you are using KeyDB ACLs (for Redis version # However this is not enough if you are using KeyDB ACLs (for Redis version
# 6 or greater), and the default user is not capable of running the PSYNC # 6 or greater), and the default user is not capable of running the PSYNC
# command and/or other commands needed for replication. In this case it's # command and/or other commands needed for replication (gathered in the
# better to configure a special user to use with replication, and specify the # @replication group). In this case it's better to configure a special user to
# masteruser configuration as such: # use with replication, and specify the masteruser configuration as such:
# #
# masteruser <username> # masteruser <username>
# #

View File

@ -80,6 +80,7 @@ struct ACLCategoryItem {
{"connection", CMD_CATEGORY_CONNECTION}, {"connection", CMD_CATEGORY_CONNECTION},
{"transaction", CMD_CATEGORY_TRANSACTION}, {"transaction", CMD_CATEGORY_TRANSACTION},
{"scripting", CMD_CATEGORY_SCRIPTING}, {"scripting", CMD_CATEGORY_SCRIPTING},
{"replication", CMD_CATEGORY_REPLICATION},
{NULL,0} /* Terminator. */ {NULL,0} /* Terminator. */
}; };

View File

@ -68,6 +68,7 @@ typedef struct ConnectionType {
ssize_t (*sync_write)(struct connection *conn, const char *ptr, ssize_t size, long long timeout); ssize_t (*sync_write)(struct connection *conn, const char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
void (*marshal_thread)(struct connection *conn);
} ConnectionType; } ConnectionType;
struct connection { struct connection {
@ -198,6 +199,11 @@ static inline ssize_t connSyncReadLine(connection *conn, char *ptr, ssize_t size
return conn->type->sync_readline(conn, ptr, size, timeout); return conn->type->sync_readline(conn, ptr, size, timeout);
} }
static inline void connMarshalThread(connection *conn) {
if (conn->type->marshal_thread != nullptr)
conn->type->marshal_thread(conn);
}
connection *connCreateSocket(); connection *connCreateSocket();
connection *connCreateAcceptedSocket(int fd); connection *connCreateAcceptedSocket(int fd);

View File

@ -433,8 +433,9 @@ extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex)
extern "C" void fastlock_free(struct fastlock *lock) extern "C" void fastlock_free(struct fastlock *lock)
{ {
// NOP // NOP
serverAssert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Asser the lock is unlocked serverAssert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Assert the lock is unlocked
|| (lock->m_pidOwner == gettid() && (lock->m_ticket.m_active == lock->m_ticket.m_avail-1))); // OR we own the lock and nobody else is waiting || (lock->m_pidOwner == gettid()
&& (lock->m_ticket.m_active == static_cast<uint16_t>(lock->m_ticket.m_avail-1U)))); // OR we own the lock and nobody else is waiting
lock->m_pidOwner = -2; // sentinal value indicating free lock->m_pidOwner = -2; // sentinal value indicating free
ANNOTATE_RWLOCK_DESTROY(lock); ANNOTATE_RWLOCK_DESTROY(lock);
} }

View File

@ -18,7 +18,8 @@ static char *commandGroups[] = {
"hyperloglog", "hyperloglog",
"cluster", "cluster",
"geo", "geo",
"stream" "stream",
"replication"
}; };
struct commandHelp { struct commandHelp {

View File

@ -1316,6 +1316,7 @@ void acceptOnThread(connection *conn, int flags, char *cip)
memcpy(szT, cip, NET_IP_STR_LEN); memcpy(szT, cip, NET_IP_STR_LEN);
} }
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] {
connMarshalThread(conn);
acceptCommonHandler(conn,flags,szT,ielTarget); acceptCommonHandler(conn,flags,szT,ielTarget);
if (!g_fTestMode && !g_pserver->loading) if (!g_fTestMode && !g_pserver->loading)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);

View File

@ -4197,8 +4197,13 @@ void replicaReplayCommand(client *c)
cFake->lock.unlock(); cFake->lock.unlock();
if (cFake->master_error) if (cFake->master_error)
{ {
selectDb(c, cFake->db->id);
freeClient(cFake);
remoteState.cFake = cFake = nullptr;
addReplyError(c, "Error in rreplay command, please check logs."); addReplyError(c, "Error in rreplay command, please check logs.");
} }
if (cFake != nullptr)
{
if (fExec || cFake->flags & CLIENT_MULTI) if (fExec || cFake->flags & CLIENT_MULTI)
{ {
addReply(c, shared.ok); addReply(c, shared.ok);
@ -4212,6 +4217,7 @@ void replicaReplayCommand(client *c)
addReplyError(c, "command did not execute"); addReplyError(c, "command did not execute");
} }
serverAssert(sdslen(cFake->querybuf) == 0); serverAssert(sdslen(cFake->querybuf) == 0);
}
serverTL->current_client = current_clientSave; serverTL->current_client = current_clientSave;
// call() will not propogate this for us, so we do so here // call() will not propogate this for us, so we do so here

View File

@ -181,7 +181,7 @@ volatile unsigned long lru_clock; /* Server global current LRU time. */
* *
* @keyspace, @read, @write, @set, @sortedset, @list, @hash, @string, @bitmap, * @keyspace, @read, @write, @set, @sortedset, @list, @hash, @string, @bitmap,
* @hyperloglog, @stream, @admin, @fast, @slow, @pubsub, @blocking, @dangerous, * @hyperloglog, @stream, @admin, @fast, @slow, @pubsub, @blocking, @dangerous,
* @connection, @transaction, @scripting, @geo. * @connection, @transaction, @scripting, @geo, @replication.
* *
* Note that: * Note that:
* *
@ -674,7 +674,7 @@ struct redisCommand redisCommandTable[] = {
* failure detection, and a loading server is considered to be * failure detection, and a loading server is considered to be
* not available. */ * not available. */
{"ping",pingCommand,-1, {"ping",pingCommand,-1,
"ok-stale fast @connection", "ok-stale fast @connection @replication",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"echo",echoCommand,2, {"echo",echoCommand,2,
@ -718,15 +718,15 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"sync",syncCommand,1, {"sync",syncCommand,1,
"admin no-script", "admin no-script @replication",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"psync",syncCommand,3, {"psync",syncCommand,3,
"admin no-script", "admin no-script @replication",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"replconf",replconfCommand,-1, {"replconf",replconfCommand,-1,
"admin no-script ok-loading ok-stale", "admin no-script ok-loading ok-stale @replication",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"flushdb",flushdbCommand,-1, {"flushdb",flushdbCommand,-1,
@ -3133,7 +3133,7 @@ static void initNetworking(int fReusePort)
} }
/* Abort if there are no listening sockets at all. */ /* Abort if there are no listening sockets at all. */
if (g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].tlsfd_count && g_pserver->sofd < 0) { if (g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].tlsfd_count == 0 && g_pserver->sofd < 0) {
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
exit(1); exit(1);
} }

View File

@ -58,6 +58,7 @@
#include <set> #include <set>
#include <map> #include <map>
#include <string> #include <string>
#include <mutex>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#include <lua.h> #include <lua.h>
@ -436,7 +437,8 @@ public:
#define CMD_CATEGORY_CONNECTION (1ULL<<36) #define CMD_CATEGORY_CONNECTION (1ULL<<36)
#define CMD_CATEGORY_TRANSACTION (1ULL<<37) #define CMD_CATEGORY_TRANSACTION (1ULL<<37)
#define CMD_CATEGORY_SCRIPTING (1ULL<<38) #define CMD_CATEGORY_SCRIPTING (1ULL<<38)
#define CMD_SKIP_PROPOGATE (1ULL<<39) /* "noprop" flag */ #define CMD_CATEGORY_REPLICATION (1ULL<<39)
#define CMD_SKIP_PROPOGATE (1ULL<<40) /* "noprop" flag */
/* AOF states */ /* AOF states */
#define AOF_OFF 0 /* AOF is off */ #define AOF_OFF 0 /* AOF is off */

View File

@ -347,6 +347,11 @@ connection *connCreateTLS(void) {
return (connection *) conn; return (connection *) conn;
} }
void connTLSMarshalThread(connection *c) {
tls_connection *conn = (tls_connection*)c;
conn->el = serverTL->el;
}
connection *connCreateAcceptedTLS(int fd, int require_auth) { connection *connCreateAcceptedTLS(int fd, int require_auth) {
tls_connection *conn = (tls_connection *) connCreateTLS(); tls_connection *conn = (tls_connection *) connCreateTLS();
conn->c.fd = fd; conn->c.fd = fd;
@ -893,6 +898,10 @@ ConnectionType CT_TLS = {
connTLSSyncWrite, connTLSSyncWrite,
connTLSSyncRead, connTLSSyncRead,
connTLSSyncReadLine, connTLSSyncReadLine,
<<<<<<< HEAD
=======
connTLSMarshalThread,
>>>>>>> unstable
}; };
int tlsHasPendingData() { int tlsHasPendingData() {

View File

@ -202,6 +202,9 @@ void trackingRememberKeys(client *c) {
* to the client as value of the invalidation. This is used in BCAST mode * to the client as value of the invalidation. This is used in BCAST mode
* in order to optimized the implementation to use less CPU time. */ * in order to optimized the implementation to use less CPU time. */
void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int proto) { void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int proto) {
std::unique_lock<fastlock> ul(c->lock);
serverAssert(c->lock.fOwnLock());
int using_redirection = 0; int using_redirection = 0;
if (c->client_tracking_redirection) { if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection); client *redir = lookupClientByID(c->client_tracking_redirection);
@ -210,12 +213,14 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot
* are unable to send invalidation messages to the redirected * are unable to send invalidation messages to the redirected
* connection, because the client no longer exist. */ * connection, because the client no longer exist. */
if (c->resp > 2) { if (c->resp > 2) {
addReplyPushLen(c,3); addReplyPushLenAsync(c,3);
addReplyBulkCBuffer(c,"tracking-redir-broken",21); addReplyBulkCBufferAsync(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection); addReplyLongLongAsync(c,c->client_tracking_redirection);
} }
return; return;
} }
ul.unlock();
ul = std::unique_lock<fastlock>(redir->lock);
c = redir; c = redir;
using_redirection = 1; using_redirection = 1;
} }
@ -225,8 +230,8 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot
* in Pub/Sub mode, we can support the feature with RESP 2 as well, * in Pub/Sub mode, we can support the feature with RESP 2 as well,
* by sending Pub/Sub messages in the __redis__:invalidate channel. */ * by sending Pub/Sub messages in the __redis__:invalidate channel. */
if (c->resp > 2) { if (c->resp > 2) {
addReplyPushLen(c,2); addReplyPushLenAsync(c,2);
addReplyBulkCBuffer(c,"invalidate",10); addReplyBulkCBufferAsync(c,"invalidate",10);
} else if (using_redirection && c->flags & CLIENT_PUBSUB) { } else if (using_redirection && c->flags & CLIENT_PUBSUB) {
/* We use a static object to speedup things, however we assume /* We use a static object to speedup things, however we assume
* that addReplyPubsubMessage() will not take a reference. */ * that addReplyPubsubMessage() will not take a reference. */
@ -241,10 +246,10 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot
/* Send the "value" part, which is the array of keys. */ /* Send the "value" part, which is the array of keys. */
if (proto) { if (proto) {
addReplyProto(c,keyname,keylen); addReplyProtoAsync(c,keyname,keylen);
} else { } else {
addReplyArrayLen(c,1); addReplyArrayLenAsync(c,1);
addReplyBulkCBuffer(c,keyname,keylen); addReplyBulkCBufferAsync(c,keyname,keylen);
} }
} }

View File

@ -71,6 +71,7 @@ set ::all_tests {
unit/wait unit/wait
unit/pendingquerybuf unit/pendingquerybuf
unit/tls unit/tls
unit/tracking
} }
# Index to the next test to run in the ::all_tests list. # Index to the next test to run in the ::all_tests list.
set ::next_test 0 set ::next_test 0

View File

@ -15,7 +15,8 @@ GROUPS = [
"hyperloglog", "hyperloglog",
"cluster", "cluster",
"geo", "geo",
"stream" "stream",
"replication"
].freeze ].freeze
GROUPS_BY_NAME = Hash[* GROUPS_BY_NAME = Hash[*