diff --git a/keydb.conf b/keydb.conf index eee75213e..f588c634d 100644 --- a/keydb.conf +++ b/keydb.conf @@ -1628,7 +1628,7 @@ hz 10 # offers, and enables by default, the ability to use an adaptive HZ value # which will temporary raise when there are many connected clients. # -# When dynamic HZ is enabled, the actual configured HZ will be used as +# When dynamic HZ is enabled, the actual configured HZ will be used # as a baseline, but multiples of the configured HZ value will be actually # used as needed once more clients are connected. In this way an idle # instance will use very little CPU time while a busy instance will be diff --git a/sentinel.conf b/sentinel.conf index 4b1615c06..7cec3c356 100644 --- a/sentinel.conf +++ b/sentinel.conf @@ -102,6 +102,18 @@ sentinel monitor mymaster 127.0.0.1 6379 2 # # sentinel auth-pass mymaster MySUPER--secret-0123passw0rd +# sentinel auth-user +# +# This is useful in order to authenticate to instances having ACL capabilities, +# that is, running Redis 6.0 or greater. When just auth-pass is provided the +# Sentinel instance will authenticate to Redis using the old "AUTH " +# method. When also an username is provided, it will use "AUTH ". +# In the Redis servers side, the ACL to provide just minimal access to +# Sentinel instances, should be configured along the following lines: +# +# user sentinel-user >somepassword +client +subscribe +publish \ +# +ping +info +multi +slaveof +config +client +exec on + # sentinel down-after-milliseconds # # Number of milliseconds the master (or any attached replica or sentinel) should diff --git a/src/acl.cpp b/src/acl.cpp index 0cfd7ba38..24d1e9f64 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -901,16 +901,6 @@ const char *ACLSetUserStringError(void) { return errmsg; } -/* Return the first password of the default user or NULL. - * This function is needed for backward compatibility with the old - * directive "requirepass" when Redis supported a single global - * password. */ -sds ACLDefaultUserFirstPassword(void) { - if (listLength(DefaultUser->passwords) == 0) return NULL; - listNode *first = listFirst(DefaultUser->passwords); - return (sds)listNodeValue(first); -} - /* Initialize the default user, that will always exist for all the process * lifetime. */ void ACLInitDefaultUser(void) { @@ -927,6 +917,7 @@ void ACLInit(void) { UsersToLoad = listCreate(); ACLLog = listCreate(); ACLInitDefaultUser(); + g_pserver->requirepass = NULL; /* Only used for backward compatibility. */ } /* Check the username and password pair and return C_OK if they are valid, diff --git a/src/aof.cpp b/src/aof.cpp index 1472a832e..c93a488ce 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1238,12 +1238,13 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { /* Use the XADD MAXLEN 0 trick to generate an empty stream if * the key we are serializing is an empty string, which is possible * for the Stream type. */ + id.ms = 0; id.seq = 1; if (rioWriteBulkCount(r,'*',7) == 0) return 0; if (rioWriteBulkString(r,"XADD",4) == 0) return 0; if (rioWriteBulkObject(r,key) == 0) return 0; if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0; if (rioWriteBulkString(r,"0",1) == 0) return 0; - if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0; + if (rioWriteBulkStreamID(r,&id) == 0) return 0; if (rioWriteBulkString(r,"x",1) == 0) return 0; if (rioWriteBulkString(r,"y",1) == 0) return 0; } diff --git a/src/bitops.cpp b/src/bitops.cpp index bcb8840b3..5bc156882 100644 --- a/src/bitops.cpp +++ b/src/bitops.cpp @@ -976,6 +976,9 @@ void bitposCommand(client *c) { * OVERFLOW [WRAP|SAT|FAIL] */ +#define BITFIELD_FLAG_NONE 0 +#define BITFIELD_FLAG_READONLY (1<<0) + struct bitfieldOp { uint64_t offset; /* Bitfield offset. */ int64_t i64; /* Increment amount (INCRBY) or SET value */ @@ -985,7 +988,10 @@ struct bitfieldOp { int sign; /* True if signed, otherwise unsigned op. */ }; -void bitfieldCommand(client *c) { +/* This implements both the BITFIELD command and the BITFIELD_RO command + * when flags is set to BITFIELD_FLAG_READONLY: in this case only the + * GET subcommand is allowed, other subcommands will return an error. */ +void bitfieldGeneric(client *c, int flags) { robj_roptr o; size_t bitoffset; int j, numops = 0, changes = 0; @@ -1073,6 +1079,12 @@ void bitfieldCommand(client *c) { return; } } else { + if (flags & BITFIELD_FLAG_READONLY) { + zfree(ops); + addReplyError(c, "BITFIELD_RO only supports the GET subcommand"); + return; + } + /* Lookup by making room up to the farest bit reached by * this operation. */ if ((o = lookupStringForBitCommand(c, @@ -1203,3 +1215,11 @@ void bitfieldCommand(client *c) { } zfree(ops); } + +void bitfieldCommand(client *c) { + bitfieldGeneric(c, BITFIELD_FLAG_NONE); +} + +void bitfieldroCommand(client *c) { + bitfieldGeneric(c, BITFIELD_FLAG_READONLY); +} diff --git a/src/blocked.cpp b/src/blocked.cpp index a466c9c64..55b8ae3ed 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -113,6 +113,7 @@ void blockClient(client *c, int btype) { c->btype = btype; g_pserver->blocked_clients++; g_pserver->blocked_clients_by_type[btype]++; + addClientToTimeoutTable(c); } /* This function is called in the beforeSleep() function of the event loop @@ -200,6 +201,7 @@ void unblockClient(client *c) { g_pserver->blocked_clients_by_type[c->btype]--; c->flags &= ~CLIENT_BLOCKED; c->btype = BLOCKED_NONE; + removeClientFromTimeoutTable(c); queueClientForReprocessing(c); } diff --git a/src/cluster.cpp b/src/cluster.cpp index c0370e7e5..181516437 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -719,9 +719,10 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * or schedule it for later depending on connection implementation. */ if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) { - serverLog(LL_VERBOSE, - "Error accepting cluster node connection: %s", - connGetLastError(conn)); + if (connGetState(conn) == CONN_STATE_ERROR) + serverLog(LL_VERBOSE, + "Error accepting cluster node connection: %s", + connGetLastError(conn)); connClose(conn); return; } @@ -4328,7 +4329,7 @@ void clusterCommand(client *c) { "FORGET -- Remove a node from the cluster.", "GETKEYSINSLOT -- Return key names stored by current node in a slot.", "FLUSHSLOTS -- Delete current node own slots information.", -"INFO - Return onformation about the cluster.", +"INFO - Return information about the cluster.", "KEYSLOT -- Return the hash slot for .", "MEET [bus-port] -- Connect nodes into a working cluster.", "MYID -- Return the node id.", @@ -4339,6 +4340,7 @@ void clusterCommand(client *c) { "SET-config-epoch - Set config epoch of current node.", "SETSLOT (importing|migrating|stable|node ) -- Set slot state.", "REPLICAS -- Return replicas.", +"SAVECONFIG - Force saving cluster configuration on disk.", "SLOTS -- Return information about slots range mappings. Each range is made of:", " start, end, master and replicas IP addresses, ports and ids", NULL diff --git a/src/config.cpp b/src/config.cpp index 16127c070..983b06220 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -418,11 +418,15 @@ void loadServerConfigFromString(char *config) { goto loaderr; } /* The old "requirepass" directive just translates to setting - * a password to the default user. */ + * a password to the default user. The only thing we do + * additionally is to remember the cleartext password in this + * case, for backward compatibility with Redis <= 5. */ ACLSetUser(DefaultUser,"resetpass",-1); sds aclop = sdscatprintf(sdsempty(),">%s",argv[1]); ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); + sdsfree(g_pserver->requirepass); + g_pserver->requirepass = sdsnew(argv[1]); } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ /* DEAD OPTION */ } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2) { @@ -672,11 +676,15 @@ void configSetCommand(client *c) { config_set_special_field("requirepass") { if (sdslen(szFromObj(o)) > CONFIG_AUTHPASS_MAX_LEN) goto badfmt; /* The old "requirepass" directive just translates to setting - * a password to the default user. */ + * a password to the default user. The only thing we do + * additionally is to remember the cleartext password in this + * case, for backward compatibility with Redis <= 5. */ ACLSetUser(DefaultUser,"resetpass",-1); sds aclop = sdscatprintf(sdsempty(),">%s",(char*)ptrFromObj(o)); ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); + sdsfree(g_pserver->requirepass); + g_pserver->requirepass = sdsnew(szFromObj(o)); } config_set_special_field("save") { int vlen, j; sds *v = sdssplitlen(szFromObj(o),sdslen(szFromObj(o))," ",1,&vlen); @@ -965,7 +973,7 @@ void configGetCommand(client *c) { } if (stringmatch(pattern,"requirepass",1)) { addReplyBulkCString(c,"requirepass"); - sds password = ACLDefaultUserFirstPassword(); + sds password = g_pserver->requirepass; if (password) { addReplyBulkCBuffer(c,password,sdslen(password)); } else { @@ -1416,7 +1424,7 @@ void rewriteConfigBindOption(struct rewriteConfigState *state) { void rewriteConfigRequirepassOption(struct rewriteConfigState *state, const char *option) { int force = 1; sds line; - sds password = ACLDefaultUserFirstPassword(); + sds password = g_pserver->requirepass; /* If there is no password set, we don't want the requirepass option * to be present in the configuration at all. */ diff --git a/src/connection.cpp b/src/connection.cpp index fe987e3f0..05f49f689 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -152,7 +152,7 @@ static void connSocketClose(connection *conn) { /* If called from within a handler, schedule the close but * keep the connection until the handler returns. */ - if (conn->flags & CONN_FLAG_IN_HANDLER) { + if (connHasRefs(conn)) { conn->flags |= CONN_FLAG_CLOSE_SCHEDULED; return; } @@ -183,10 +183,16 @@ static int connSocketRead(connection *conn, void *buf, size_t buf_len) { } static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) { + int ret = C_OK; + if (conn->state != CONN_STATE_ACCEPTING) return C_ERR; conn->state = CONN_STATE_CONNECTED; - if (!callHandler(conn, accept_handler)) return C_ERR; - return C_OK; + + connIncrRefs(conn); + if (!callHandler(conn, accept_handler)) ret = C_ERR; + connDecrRefs(conn); + + return ret; } /* Register a write handler, to be called when the connection is writable. diff --git a/src/connection.h b/src/connection.h index 651df06bd..ff8010f5c 100644 --- a/src/connection.h +++ b/src/connection.h @@ -45,11 +45,10 @@ typedef enum { CONN_STATE_ERROR } ConnectionState; -#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */ -#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */ -#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */ -#define CONN_FLAG_READ_THREADSAFE (1<<3) -#define CONN_FLAG_WRITE_THREADSAFE (1<<4) +#define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */ +#define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */ +#define CONN_FLAG_READ_THREADSAFE (1<<2) +#define CONN_FLAG_WRITE_THREADSAFE (1<<3) typedef void (*ConnectionCallbackFunc)(struct connection *conn); @@ -72,7 +71,8 @@ typedef struct ConnectionType { struct connection { ConnectionType *type; ConnectionState state; - int flags; + short int flags; + short int refs; int last_errno; void *private_data; ConnectionCallbackFunc conn_handler; @@ -90,6 +90,13 @@ struct connection { * connAccept() may directly call accept_handler(), or return and call it * at a later time. This behavior is a bit awkward but aims to reduce the need * to wait for the next event loop, if no additional handshake is required. + * + * IMPORTANT: accept_handler may decide to close the connection, calling connClose(). + * To make this safe, the connection is only marked with CONN_FLAG_CLOSE_SCHEDULED + * in this case, and connAccept() returns with an error. + * + * connAccept() callers must always check the return value and on error (C_ERR) + * a connClose() must be called. */ static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) { diff --git a/src/connhelpers.h b/src/connhelpers.h index f237c9b1d..86250d09e 100644 --- a/src/connhelpers.h +++ b/src/connhelpers.h @@ -37,46 +37,49 @@ * implementations (currently sockets in connection.c and TLS in tls.c). * * Currently helpers implement the mechanisms for invoking connection - * handlers, tracking in-handler states and dealing with deferred - * destruction (if invoked by a handler). + * handlers and tracking connection references, to allow safe destruction + * of connections from within a handler. */ -/* Called whenever a handler is invoked on a connection and sets the - * CONN_FLAG_IN_HANDLER flag to indicate we're in a handler context. +/* Incremenet connection references. * - * An attempt to close a connection while CONN_FLAG_IN_HANDLER is - * set will result with deferred close, i.e. setting the CONN_FLAG_CLOSE_SCHEDULED - * instead of destructing it. + * Inside a connection handler, we guarantee refs >= 1 so it is always + * safe to connClose(). + * + * In other cases where we don't want to prematurely lose the connection, + * it can go beyond 1 as well; currently it is only done by connAccept(). */ -static inline void enterHandler(connection *conn) { - conn->flags |= CONN_FLAG_IN_HANDLER; +static inline void connIncrRefs(connection *conn) { + conn->refs++; } -/* Called whenever a handler returns. This unsets the CONN_FLAG_IN_HANDLER - * flag and performs actual close/destruction if a deferred close was - * scheduled by the handler. +/* Decrement connection references. + * + * Note that this is not intended to provide any automatic free logic! + * callHandler() takes care of that for the common flows, and anywhere an + * explicit connIncrRefs() is used, the caller is expected to take care of + * that. */ -static inline int exitHandler(connection *conn) { - conn->flags &= ~CONN_FLAG_IN_HANDLER; - if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) { - connClose(conn); - return 0; - } - return 1; + +static inline void connDecrRefs(connection *conn) { + conn->refs--; +} + +static inline int connHasRefs(connection *conn) { + return conn->refs; } /* Helper for connection implementations to call handlers: - * 1. Mark the handler in use. + * 1. Increment refs to protect the connection. * 2. Execute the handler (if set). - * 3. Mark the handler as NOT in use and perform deferred close if was - * requested by the handler at any time. + * 3. Decrement refs and perform deferred close, if refs==0. */ static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) { - conn->flags |= CONN_FLAG_IN_HANDLER; + connIncrRefs(conn); if (handler) handler(conn); - conn->flags &= ~CONN_FLAG_IN_HANDLER; + connDecrRefs(conn); if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) { - connClose(conn); + if (!connHasRefs(conn)) connClose(conn); return 0; } return 1; diff --git a/src/module.cpp b/src/module.cpp index 0fb8ffa85..b511fcb06 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -1864,7 +1864,12 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * current request context (whether the client is a Lua script or in a MULTI), * and about the Redis instance in general, i.e replication and persistence. * - * The available flags are: + * It is possible to call this function even with a NULL context, however + * in this case the following flags will not be reported: + * + * * LUA, MULTI, REPLICATED, DIRTY (see below for more info). + * + * Available flags and their meaning: * * * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script * @@ -1917,20 +1922,22 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { int flags = 0; /* Client specific flags */ - if (ctx->client) { - if (ctx->client->flags & CLIENT_LUA) - flags |= REDISMODULE_CTX_FLAGS_LUA; - if (ctx->client->flags & CLIENT_MULTI) - flags |= REDISMODULE_CTX_FLAGS_MULTI; - /* Module command recieved from MASTER, is replicated. */ - if (ctx->client->flags & CLIENT_MASTER) - flags |= REDISMODULE_CTX_FLAGS_REPLICATED; - } + if (ctx) { + if (ctx->client) { + if (ctx->client->flags & CLIENT_LUA) + flags |= REDISMODULE_CTX_FLAGS_LUA; + if (ctx->client->flags & CLIENT_MULTI) + flags |= REDISMODULE_CTX_FLAGS_MULTI; + /* Module command recieved from MASTER, is replicated. */ + if (ctx->client->flags & CLIENT_MASTER) + flags |= REDISMODULE_CTX_FLAGS_REPLICATED; + } - /* For DIRTY flags, we need the blocked client if used */ - client *c = ctx->blocked_client ? ctx->blocked_client->client : ctx->client; - if (c && (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))) { - flags |= REDISMODULE_CTX_FLAGS_MULTI_DIRTY; + /* For DIRTY flags, we need the blocked client if used */ + client *c = ctx->blocked_client ? ctx->blocked_client->client : ctx->client; + if (c && (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))) { + flags |= REDISMODULE_CTX_FLAGS_MULTI_DIRTY; + } } if (g_pserver->cluster_enabled) diff --git a/src/networking.cpp b/src/networking.cpp index 5c1519c1c..e27a7094c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -137,7 +137,8 @@ client *createClient(connection *conn, int iel) { c->ctime = c->lastinteraction = g_pserver->unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ - c->authenticated = (c->puser->flags & USER_FLAG_NOPASS) != 0; + c->authenticated = (c->puser->flags & USER_FLAG_NOPASS) && + !(c->puser->flags & USER_FLAG_DISABLED); c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; @@ -1100,7 +1101,7 @@ void clientAcceptHandler(connection *conn) { serverLog(LL_WARNING, "Error accepting a client connection: %s", connGetLastError(conn)); - freeClient(c); + freeClientAsync(c); return; } @@ -1146,7 +1147,7 @@ void clientAcceptHandler(connection *conn) { /* Nothing to do, Just to avoid the warning... */ } g_pserver->stat_rejected_conn++; - freeClient(c); + freeClientAsync(c); return; } } @@ -1207,9 +1208,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) */ if (connAccept(conn, clientAcceptHandler) == C_ERR) { char conninfo[100]; - serverLog(LL_WARNING, - "Error accepting a client connection: %s (conn: %s)", - connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); + if (connGetState(conn) == CONN_STATE_ERROR) + serverLog(LL_WARNING, + "Error accepting a client connection: %s (conn: %s)", + connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); freeClient((client*)connGetPrivateData(conn)); return; } diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 661124c1c..07e0e85f6 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -28,6 +28,7 @@ */ #include "server.h" +#include int clientSubscriptionsCount(client *c); @@ -209,6 +210,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(client *c, robj *pattern) { serverAssert(GlobalLocksAcquired()); + dictEntry *de; + list *clients; int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { @@ -220,6 +223,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) { pat->pattern = getDecodedObject(pattern); pat->pclient = c; listAddNodeTail(g_pserver->pubsub_patterns,pat); + /* Add the client to the pattern -> list of clients hash table */ + de = dictFind(g_pserver->pubsub_patterns_dict,pattern); + if (de == NULL) { + clients = listCreate(); + dictAdd(g_pserver->pubsub_patterns_dict,pattern,clients); + incrRefCount(pattern); + } else { + clients = (list*)dictGetVal(de); + } + listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubPatSubscribed(c,pattern); @@ -229,6 +242,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) { /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { + dictEntry *de; + list *clients; listNode *ln; pubsubPattern pat; int retval = 0; @@ -241,6 +256,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { pat.pattern = pattern; ln = listSearchKey(g_pserver->pubsub_patterns,&pat); listDelNode(g_pserver->pubsub_patterns,ln); + /* Remove the client from the pattern -> clients list hash table */ + de = dictFind(g_pserver->pubsub_patterns_dict,pattern); + serverAssertWithInfo(c,NULL,de != NULL); + clients = (list*)dictGetVal(de); + ln = listSearchKey(clients,c); + serverAssertWithInfo(c,NULL,ln != NULL); + listDelNode(clients,ln); + if (listLength(clients) == 0) { + /* Free the list and associated hash entry at all if this was + * the latest client. */ + dictDelete(g_pserver->pubsub_patterns_dict,pattern); + } } /* Notify the client */ if (notify) addReplyPubsubPatUnsubscribed(c,pattern); @@ -290,6 +317,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { serverAssert(GlobalLocksAcquired()); int receivers = 0; dictEntry *de; + dictIterator *di; listNode *ln; listIter li; @@ -314,29 +342,31 @@ int pubsubPublishMessage(robj *channel, robj *message) { } } /* Send to clients listening to matching channels */ - if (listLength(g_pserver->pubsub_patterns)) { - listRewind(g_pserver->pubsub_patterns,&li); + di = dictGetIterator(g_pserver->pubsub_patterns_dict); + if (di) { channel = getDecodedObject(channel); - while ((ln = listNext(&li)) != NULL) { - pubsubPattern *pat = (pubsubPattern*)ln->value; + while((de = dictNext(di)) != NULL) { + robj *pattern = (robj*)dictGetKey(de); + list *clients = (list*)dictGetVal(de); + if (!stringmatchlen(szFromObj(pattern), + sdslen(szFromObj(pattern)), + szFromObj(channel), + sdslen(szFromObj(channel)),0)) continue; - if (stringmatchlen((char*)ptrFromObj(pat->pattern), - sdslen(szFromObj(pat->pattern)), - (char*)ptrFromObj(channel), - sdslen(szFromObj(channel)),0)) - { - if (pat->pclient->flags & CLIENT_CLOSE_ASAP) + listRewind(clients,&li); + while ((ln = listNext(&li)) != NULL) { + client *c = (client*)listNodeValue(ln); + if (c->flags & CLIENT_CLOSE_ASAP) continue; - if (FCorrectThread(pat->pclient)) - fastlock_lock(&pat->pclient->lock); - addReplyPubsubPatMessage(pat->pclient, - pat->pattern,channel,message); - if (FCorrectThread(pat->pclient)) - fastlock_unlock(&pat->pclient->lock); + std::unique_lock l(c->lock, std::defer_lock); + if (FCorrectThread(c)) + l.lock(); + addReplyPubsubPatMessage(c,pattern,channel,message); receivers++; } } decrRefCount(channel); + dictReleaseIterator(di); } return receivers; } diff --git a/src/rdb.cpp b/src/rdb.cpp index 9a3cdf103..3d9221f37 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2384,7 +2384,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the replica. */ - bool fExpiredKey = (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica) && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; + bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; if (fStaleMvccKey || fExpiredKey) { if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) { // We have a key that we've already deleted and is not back in our database. diff --git a/src/redis-cli.c b/src/redis-cli.c index 7c3ffcf63..0f052dce4 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1103,7 +1103,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) { (argc == 3 && !strcasecmp(command,"latency") && !strcasecmp(argv[1],"graph")) || (argc == 2 && !strcasecmp(command,"latency") && - !strcasecmp(argv[1],"doctor"))) + !strcasecmp(argv[1],"doctor")) || + /* Format PROXY INFO command for Redis Cluster Proxy: + * https://github.com/artix75/redis-cluster-proxy */ + (argc >= 2 && !strcasecmp(command,"proxy") && + !strcasecmp(argv[1],"info"))) { output_raw = 1; } @@ -1800,6 +1804,8 @@ static void repl(void) { if (config.eval) { config.eval_ldb = 1; config.output = OUTPUT_RAW; + sdsfreesplitres(argv,argc); + linenoiseFree(line); return; /* Return to evalMode to restart the session. */ } else { printf("Use 'restart' only in Lua debugging mode."); diff --git a/src/replication.cpp b/src/replication.cpp index c3713b77b..c39b179d6 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3256,12 +3256,14 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) { delta); mi->master_initial_offset = g_pserver->master_repl_meaningful_offset; g_pserver->repl_backlog_histlen -= delta; + g_pserver->repl_backlog_idx = + (g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - delta)) % + g_pserver->repl_backlog_size; if (g_pserver->repl_backlog_histlen < 0) g_pserver->repl_backlog_histlen = 0; } /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ - mi->master_initial_offset = g_pserver->master_repl_offset; replicationCreateMasterClient(mi,NULL,-1); std::lock_guardmaster->lock)> lock(mi->master->lock); diff --git a/src/sentinel.cpp b/src/sentinel.cpp index 264eab6dd..6ee01202f 100644 --- a/src/sentinel.cpp +++ b/src/sentinel.cpp @@ -205,7 +205,8 @@ typedef struct sentinelRedisInstance { dict *slaves; /* Slaves for this master instance. */ unsigned int quorum;/* Number of sentinels that need to agree on failure. */ int parallel_syncs; /* How many slaves to reconfigure at same time. */ - char *auth_pass; /* Password to use for AUTH against master & slaves. */ + char *auth_pass; /* Password to use for AUTH against master & replica. */ + char *auth_user; /* Username for ACLs AUTH against master & replica. */ /* Slave specific. */ mstime_t master_link_down_time; /* Slave replication link down time. */ @@ -1231,6 +1232,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char * SENTINEL_DEFAULT_DOWN_AFTER; ri->master_link_down_time = 0; ri->auth_pass = NULL; + ri->auth_user = NULL; ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY; ri->slave_reconf_sent_time = 0; ri->slave_master_host = NULL; @@ -1289,6 +1291,7 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) { sdsfree(ri->slave_master_host); sdsfree(ri->leader); sdsfree(ri->auth_pass); + sdsfree(ri->auth_user); sdsfree(ri->info); releaseSentinelAddr(ri->addr); dictRelease(ri->renamed_commands); @@ -1654,19 +1657,19 @@ const char *sentinelHandleConfiguration(char **argv, int argc) { ri->failover_timeout = atoi(argv[2]); if (ri->failover_timeout <= 0) return "negative or zero time parameter."; - } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { + } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { /* parallel-syncs */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->parallel_syncs = atoi(argv[2]); - } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) { + } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) { /* notification-script */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; if (access(argv[2],X_OK) == -1) return "Notification script seems non existing or non executable."; ri->notification_script = sdsnew(argv[2]); - } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) { + } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) { /* client-reconfig-script */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; @@ -1674,11 +1677,16 @@ const char *sentinelHandleConfiguration(char **argv, int argc) { return "Client reconfiguration script seems non existing or " "non executable."; ri->client_reconfig_script = sdsnew(argv[2]); - } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) { + } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) { /* auth-pass */ ri = sentinelGetMasterByName(argv[1]); if (!ri) return "No such master with specified name."; ri->auth_pass = sdsnew(argv[2]); + } else if (!strcasecmp(argv[0],"auth-user") && argc == 3) { + /* auth-user */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->auth_user = sdsnew(argv[2]); } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) { /* current-epoch */ unsigned long long current_epoch = strtoull(argv[1],NULL,10); @@ -1836,7 +1844,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { rewriteConfigRewriteLine(state,"sentinel",line,1); } - /* sentinel auth-pass */ + /* sentinel auth-pass & auth-user */ if (master->auth_pass) { line = sdscatprintf(sdsempty(), "sentinel auth-pass %s %s", @@ -1844,6 +1852,13 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) { rewriteConfigRewriteLine(state,"sentinel",line,1); } + if (master->auth_user) { + line = sdscatprintf(sdsempty(), + "sentinel auth-user %s %s", + master->name, master->auth_user); + rewriteConfigRewriteLine(state,"sentinel",line,1); + } + /* sentinel config-epoch */ line = sdscatprintf(sdsempty(), "sentinel config-epoch %s %llu", @@ -1968,19 +1983,29 @@ werr: * will disconnect and reconnect the link and so forth. */ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { char *auth_pass = NULL; + char *auth_user = NULL; if (ri->flags & SRI_MASTER) { auth_pass = ri->auth_pass; + auth_user = ri->auth_user; } else if (ri->flags & SRI_SLAVE) { auth_pass = ri->master->auth_pass; + auth_user = ri->master->auth_user; } else if (ri->flags & SRI_SENTINEL) { - auth_pass = ACLDefaultUserFirstPassword(); + auth_pass = g_pserver->requirepass; + auth_user = NULL; } - if (auth_pass) { + if (auth_pass && auth_user == NULL) { if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s", sentinelInstanceMapCommand(ri,"AUTH"), auth_pass) == C_OK) ri->link->pending_commands++; + } else if (auth_pass && auth_user) { + /* If we also have an username, use the ACL-style AUTH command + * with two arguments, username and password. */ + if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s", + sentinelInstanceMapCommand(ri,"AUTH"), + auth_user, auth_pass) == C_OK) ri->link->pending_commands++; } } @@ -3522,6 +3547,12 @@ void sentinelSetCommand(client *c) { sdsfree(ri->auth_pass); ri->auth_pass = strlen(value) ? sdsnew(value) : NULL; changes++; + } else if (!strcasecmp(option,"auth-user") && moreargs > 0) { + /* auth-user */ + char *value = szFromObj(c->argv[++j]); + sdsfree(ri->auth_user); + ri->auth_user = strlen(value) ? sdsnew(value) : NULL; + changes++; } else if (!strcasecmp(option,"quorum") && moreargs > 0) { /* quorum */ robj *o = c->argv[++j]; diff --git a/src/server.cpp b/src/server.cpp index 217b15851..74fac4557 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -259,6 +259,10 @@ struct redisCommand redisCommandTable[] = { "write use-memory @bitmap", 0,NULL,1,1,1,0,0,0}, + {"bitfield_ro",bitfieldroCommand,-2, + "read-only fast @bitmap", + 0,NULL,1,1,1,0,0,0}, + {"setrange",setrangeCommand,4, "write use-memory @string", 0,NULL,1,1,1,0,0,0}, @@ -1497,6 +1501,137 @@ int allPersistenceDisabled(void) { return g_pserver->saveparamslen == 0 && g_pserver->aof_state == AOF_OFF; } +/* ========================== Clients timeouts ============================= */ + +/* Check if this blocked client timedout (does nothing if the client is + * not blocked right now). If so send a reply, unblock it, and return 1. + * Otherwise 0 is returned and no operation is performed. */ +int checkBlockedClientTimeout(client *c, mstime_t now) { + if (c->flags & CLIENT_BLOCKED && + c->bpop.timeout != 0 + && c->bpop.timeout < now) + { + /* Handle blocking operation specific timeout. */ + replyToBlockedClientTimedOut(c); + unblockClient(c); + return 1; + } else { + return 0; + } +} + +/* Check for timeouts. Returns non-zero if the client was terminated. + * The function gets the current time in milliseconds as argument since + * it gets called multiple times in a loop, so calling gettimeofday() for + * each iteration would be costly without any actual gain. */ +int clientsCronHandleTimeout(client *c, mstime_t now_ms) { + time_t now = now_ms/1000; + + if (cserver.maxidletime && + /* This handles the idle clients connection timeout if set. */ + !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ + !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ + !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ + !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ + (now - c->lastinteraction > cserver.maxidletime)) + { + serverLog(LL_VERBOSE,"Closing idle client"); + freeClient(c); + return 1; + } else if (c->flags & CLIENT_BLOCKED) { + /* Cluster: handle unblock & redirect of clients blocked + * into keys no longer served by this server. */ + if (g_pserver->cluster_enabled) { + if (clusterRedirectBlockedClientIfNeeded(c)) + unblockClient(c); + } + } + return 0; +} + +/* For blocked clients timeouts we populate a radix tree of 128 bit keys + * composed as such: + * + * [8 byte big endian expire time]+[8 byte client ID] + * + * We don't do any cleanup in the Radix tree: when we run the clients that + * reached the timeout already, if they are no longer existing or no longer + * blocked with such timeout, we just go forward. + * + * Every time a client blocks with a timeout, we add the client in + * the tree. In beforeSleep() we call clientsHandleTimeout() to run + * the tree and unblock the clients. */ + +#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ + +/* Given client ID and timeout, write the resulting radix tree key in buf. */ +void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { + timeout = htonu64(timeout); + memcpy(buf,&timeout,sizeof(timeout)); + memcpy(buf+8,&id,sizeof(id)); +} + +/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write + * the timeout into *toptr and the client ID into *idptr. */ +void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { + memcpy(toptr,buf,sizeof(*toptr)); + *toptr = ntohu64(*toptr); + memcpy(idptr,buf+8,sizeof(*idptr)); +} + +/* Add the specified client id / timeout as a key in the radix tree we use + * to handle blocked clients timeouts. The client is not added to the list + * if its timeout is zero (block forever). */ +void addClientToTimeoutTable(client *c) { + if (c->bpop.timeout == 0) return; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + if (raxTryInsert(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL,NULL)) + c->flags |= CLIENT_IN_TO_TABLE; +} + +/* Remove the client from the table when it is unblocked for reasons + * different than timing out. */ +void removeClientFromTimeoutTable(client *c) { + if (!(c->flags & CLIENT_IN_TO_TABLE)) return; + c->flags &= ~CLIENT_IN_TO_TABLE; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxRemove(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL); +} + +/* This function is called in beforeSleep() in order to unblock clients + * that are waiting in blocking operations with a timeout set. */ +void clientsHandleTimeout(void) { + serverAssert(GlobalLocksAcquired()); + + if (raxSize(g_pserver->clients_timeout_table) == 0) return; + uint64_t now = mstime(); + raxIterator ri; + raxStart(&ri,g_pserver->clients_timeout_table); + raxSeek(&ri,"^",NULL,0); + + while(raxNext(&ri)) { + uint64_t id, timeout; + decodeTimeoutKey(ri.key,&timeout,&id); + if (timeout >= now) break; /* All the timeouts are in the future. */ + client *c = lookupClientByID(id); + if (c) { + std::unique_lock l(c->lock, std::defer_lock); + if (FCorrectThread(c)) + l.lock(); + c->flags &= ~CLIENT_IN_TO_TABLE; + checkBlockedClientTimeout(c,now); + } + raxRemove(g_pserver->clients_timeout_table,ri.key,ri.key_len,NULL); + raxSeek(&ri,"^",NULL,0); + } +} + /* ======================= Cron: called every 100 ms ======================== */ /* Add a sample to the operations per second array of samples. */ @@ -1526,42 +1661,6 @@ long long getInstantaneousMetric(int metric) { return sum / STATS_METRIC_SAMPLES; } -/* Check for timeouts. Returns non-zero if the client was terminated. - * The function gets the current time in milliseconds as argument since - * it gets called multiple times in a loop, so calling gettimeofday() for - * each iteration would be costly without any actual gain. */ -int clientsCronHandleTimeout(client *c, mstime_t now_ms) { - time_t now = now_ms/1000; - - if (cserver.maxidletime && - !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */ - !(c->flags & CLIENT_MASTER) && /* no timeout for masters */ - !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ - !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ - (now - c->lastinteraction > cserver.maxidletime)) - { - serverLog(LL_VERBOSE,"Closing idle client"); - freeClient(c); - return 1; - } else if (c->flags & CLIENT_BLOCKED) { - /* Blocked OPS timeout is handled with milliseconds resolution. - * However note that the actual resolution is limited by - * g_pserver->hz. */ - - if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) { - /* Handle blocking operation specific timeout. */ - replyToBlockedClientTimedOut(c); - unblockClient(c); - } else if (g_pserver->cluster_enabled) { - /* Cluster: handle unblock & redirect of clients blocked - * into keys no longer served by this g_pserver-> */ - if (clusterRedirectBlockedClientIfNeeded(c)) - unblockClient(c); - } - } - return 0; -} - /* The client query buffer is an sds.c string that can end with a lot of * free space not used, this function reclaims space if needed. * @@ -1728,10 +1827,12 @@ void clientsCron(int iel) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) { - expireSlaveKeys(); + if (g_pserver->active_expire_enabled) { + if (iAmMaster()) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); + } else { + expireSlaveKeys(); + } } /* Defrag keys gradually. */ @@ -2163,8 +2264,12 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* Handle precise timeouts of blocked clients. */ + clientsHandleTimeout(); + /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ tlsProcessPendingData(); + /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(eventLoop, tlsHasPendingData()); @@ -2424,6 +2529,7 @@ void initServerConfig(void) { g_pserver->clients = listCreate(); g_pserver->slaves = listCreate(); g_pserver->monitors = listCreate(); + g_pserver->clients_timeout_table = raxNew(); g_pserver->timezone = getTimeZone(); /* Initialized by tzset(). */ cserver.configfile = NULL; cserver.executable = NULL; @@ -3003,6 +3109,7 @@ void initServer(void) { evictionPoolAlloc(); /* Initialize the LRU keys pool. */ g_pserver->pubsub_channels = dictCreate(&keylistDictType,NULL); g_pserver->pubsub_patterns = listCreate(); + g_pserver->pubsub_patterns_dict = dictCreate(&keylistDictType,NULL); listSetFreeMethod(g_pserver->pubsub_patterns,freePubsubPattern); listSetMatchMethod(g_pserver->pubsub_patterns,listMatchPubsubPattern); g_pserver->cronloops = 0; @@ -3593,7 +3700,7 @@ int processCommand(client *c, int callFlags) { /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || - DefaultUser->flags & USER_FLAG_DISABLED) && + (DefaultUser->flags & USER_FLAG_DISABLED)) && !c->authenticated; if (auth_required) { /* AUTH and HELLO and no auth modules are valid even in @@ -3727,6 +3834,7 @@ int processCommand(client *c, int callFlags) { !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { + flagTransaction(c); addReply(c, shared.roslaveerr); return C_OK; } @@ -3766,11 +3874,19 @@ int processCommand(client *c, int callFlags) { return C_OK; } - /* Lua script too slow? Only allow a limited number of commands. */ + /* Lua script too slow? Only allow a limited number of commands. + * Note that we need to allow the transactions commands, otherwise clients + * sending a transaction with pipelining without error checking, may have + * the MULTI plus a few initial commands refused, then the timeout + * condition resolves, and the bottom-half of the transaction gets + * executed, see Github PR #7022. */ if (g_pserver->lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != helloCommand && c->cmd->proc != replconfCommand && + c->cmd->proc != multiCommand && + c->cmd->proc != execCommand && + c->cmd->proc != discardCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') && @@ -4225,11 +4341,13 @@ sds genRedisInfoString(const char *section) { "client_recent_max_output_buffer:%zu\r\n" "blocked_clients:%d\r\n" "tracking_clients:%d\r\n" + "clients_in_timeout_table:%" PRIu64 "\r\n" "current_client_thread:%d\r\n", listLength(g_pserver->clients)-listLength(g_pserver->slaves), maxin, maxout, g_pserver->blocked_clients, g_pserver->tracking_clients, + raxSize(g_pserver->clients_timeout_table), static_cast(serverTL - g_pserver->rgthreadvar)); for (int ithread = 0; ithread < cserver.cthreads; ++ithread) { @@ -5231,6 +5349,12 @@ static void validateConfiguration() } } +int iAmMaster(void) { + return ((!g_pserver->cluster_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) || + (g_pserver->cluster_enabled && nodeIsMaster(g_pserver->cluster->myself))); +} + + int main(int argc, char **argv) { struct timeval tv; int j; diff --git a/src/server.h b/src/server.h index c04ae817f..7e6564d9c 100644 --- a/src/server.h +++ b/src/server.h @@ -416,7 +416,8 @@ public: #define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */ #define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given, depending on optin/optout mode. */ -#define CLIENT_FORCE_REPLY (1ULL<<37) /* Should addReply be forced to write the text? */ +#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */ +#define CLIENT_FORCE_REPLY (1ULL<<38) /* Should addReply be forced to write the text? */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1656,6 +1657,7 @@ struct redisServer { list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ + rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ rax *clients_index; /* Active clients dictionary by client ID. */ mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ @@ -1885,6 +1887,7 @@ struct redisServer { /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ + dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ /* Cluster */ @@ -1936,6 +1939,9 @@ struct redisServer { /* ACLs */ char *acl_filename; /* ACL Users file. NULL if not configured. */ unsigned long acllog_max_len; /* Maximum length of the ACL LOG list. */ + sds requirepass; /* Remember the cleartext password set with the + old "requirepass" directive for backward + compatibility with Redis <= 5. */ /* Assert & bug reporting */ const char *assert_failed; const char *assert_file; @@ -2718,6 +2724,8 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); +void addClientToTimeoutTable(client *c); +void removeClientFromTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); @@ -2760,6 +2768,7 @@ void existsCommand(client *c); void setbitCommand(client *c); void getbitCommand(client *c); void bitfieldCommand(client *c); +void bitfieldroCommand(client *c); void setrangeCommand(client *c); void getrangeCommand(client *c); void incrCommand(client *c); @@ -3021,4 +3030,6 @@ class ShutdownException #define redisDebugMark() \ printf("-- MARK %s:%d --\n", __FILE__, __LINE__) +int iAmMaster(void); + #endif diff --git a/src/tracking.cpp b/src/tracking.cpp index e82d358cc..8738e7237 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -94,7 +94,7 @@ void disableTracking(client *c) { g_pserver->tracking_clients--; c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN| - CLIENT_TRACKING_OPTOUT); + CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING); } } @@ -271,7 +271,7 @@ void trackingInvalidateKey(robj *keyobj) { trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey)); rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); - if (ids == raxNotFound) return;; + if (ids == raxNotFound) return; raxIterator ri; raxStart(&ri,ids); diff --git a/tests/cluster/tests/14-consistency-check.tcl b/tests/cluster/tests/14-consistency-check.tcl new file mode 100644 index 000000000..a43725ebc --- /dev/null +++ b/tests/cluster/tests/14-consistency-check.tcl @@ -0,0 +1,87 @@ +source "../tests/includes/init-tests.tcl" + +test "Create a 5 nodes cluster" { + create_cluster 5 5 +} + +test "Cluster should start ok" { + assert_cluster_state ok +} + +test "Cluster is writable" { + cluster_write_test 0 +} + +proc find_non_empty_master {} { + set master_id_no {} + foreach_redis_id id { + if {[RI $id role] eq {master} && [R $id dbsize] > 0} { + set master_id_no $id + } + } + return $master_id_no +} + +proc get_one_of_my_replica {id} { + set replica_port [lindex [lindex [lindex [R $id role] 2] 0] 1] + set replica_id_num [get_instance_id_by_port redis $replica_port] + return $replica_id_num +} + +proc cluster_write_keys_with_expire {id ttl} { + set prefix [randstring 20 20 alpha] + set port [get_instance_attrib redis $id port] + set cluster [redis_cluster 127.0.0.1:$port] + for {set j 100} {$j < 200} {incr j} { + $cluster setex key_expire.$j $ttl $prefix.$j + } + $cluster close +} + +proc test_slave_load_expired_keys {aof} { + test "Slave expired keys is loaded when restarted: appendonly=$aof" { + set master_id [find_non_empty_master] + set replica_id [get_one_of_my_replica $master_id] + + set master_dbsize [R $master_id dbsize] + set slave_dbsize [R $replica_id dbsize] + assert_equal $master_dbsize $slave_dbsize + + set data_ttl 5 + cluster_write_keys_with_expire $master_id $data_ttl + after 100 + set replica_dbsize_1 [R $replica_id dbsize] + assert {$replica_dbsize_1 > $slave_dbsize} + + R $replica_id config set appendonly $aof + R $replica_id config rewrite + + set start_time [clock seconds] + set end_time [expr $start_time+$data_ttl+2] + R $replica_id save + set replica_dbsize_2 [R $replica_id dbsize] + assert {$replica_dbsize_2 > $slave_dbsize} + kill_instance redis $replica_id + + set master_port [get_instance_attrib redis $master_id port] + exec ../../../src/redis-cli -h 127.0.0.1 -p $master_port debug sleep [expr $data_ttl+3] > /dev/null & + + while {[clock seconds] <= $end_time} { + #wait for $data_ttl seconds + } + restart_instance redis $replica_id + + wait_for_condition 200 50 { + [R $replica_id ping] eq {PONG} + } else { + fail "replica #$replica_id not started" + } + + set replica_dbsize_3 [R $replica_id dbsize] + assert {$replica_dbsize_3 > $slave_dbsize} + } +} + +test_slave_load_expired_keys no +after 5000 +test_slave_load_expired_keys yes diff --git a/tests/integration/psync2-pingoff.tcl b/tests/integration/psync2-pingoff.tcl new file mode 100644 index 000000000..1cea290e7 --- /dev/null +++ b/tests/integration/psync2-pingoff.tcl @@ -0,0 +1,61 @@ +# Test the meaningful offset implementation to make sure masters +# are able to PSYNC with replicas even if the replication stream +# has pending PINGs at the end. + +start_server {tags {"psync2"}} { +start_server {} { + # Config + set debug_msg 0 ; # Enable additional debug messages + + for {set j 0} {$j < 2} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + $R($j) CONFIG SET repl-ping-replica-period 1 + if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"} + } + + # Setup replication + test "PSYNC2 meaningful offset: setup" { + $R(1) replicaof $R_host(0) $R_port(0) + $R(0) set foo bar + wait_for_condition 50 1000 { + [$R(0) dbsize] == 1 && [$R(1) dbsize] == 1 + } else { + fail "Replicas not replicating from master" + } + } + + test "PSYNC2 meaningful offset: write and wait replication" { + $R(0) INCR counter + $R(0) INCR counter + $R(0) INCR counter + wait_for_condition 50 1000 { + [$R(0) GET counter] eq [$R(1) GET counter] + } else { + fail "Master and replica don't agree about counter" + } + } + + # In this test we'll make sure the replica will get stuck, but with + # an active connection: this way the master will continue to send PINGs + # every second (we modified the PING period earlier) + test "PSYNC2 meaningful offset: pause replica and promote it" { + $R(1) MULTI + $R(1) DEBUG SLEEP 5 + $R(1) SLAVEOF NO ONE + $R(1) EXEC + $R(1) ping ; # Wait for it to return back available + } + + test "Make the old master a replica of the new one and check conditions" { + set sync_partial [status $R(1) sync_partial_ok] + assert {$sync_partial == 0} + $R(0) REPLICAOF $R_host(1) $R_port(1) + wait_for_condition 50 1000 { + [status $R(1) sync_partial_ok] == 1 + } else { + fail "The new master was not able to partial sync" + } + } +}} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 49bb6f55d..24f6b82e0 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -52,6 +52,7 @@ set ::all_tests { integration/logging integration/psync2 integration/psync2-reg + integration/psync2-pingoff unit/pubsub unit/slowlog unit/scripting diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index fc1664a75..85c9b81a9 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -248,4 +248,11 @@ start_server {tags {"acl"}} { r AUTH default "" assert {[llength [r ACL LOG]] == 5} } + + test {When default user is off, new connections are not authenticated} { + r ACL setuser default off + catch {set rd1 [redis_deferring_client]} e + r ACL setuser default on + set e + } {*NOAUTH*} } diff --git a/tests/unit/bitfield.tcl b/tests/unit/bitfield.tcl index d76452b1b..1f2f6395e 100644 --- a/tests/unit/bitfield.tcl +++ b/tests/unit/bitfield.tcl @@ -199,3 +199,34 @@ start_server {tags {"bitops"}} { r del mystring } } + +start_server {tags {"repl"}} { + start_server {} { + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set slave [srv 0 client] + + test {BITFIELD: setup slave} { + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [s 0 master_link_status] eq {up} + } else { + fail "Replication not started." + } + } + + test {BITFIELD: write on master, read on slave} { + $master del bits + assert_equal 0 [$master bitfield bits set u8 0 255] + assert_equal 255 [$master bitfield bits set u8 0 100] + wait_for_ofs_sync $master $slave + assert_equal 100 [$slave bitfield_ro bits get u8 0] + } + + test {BITFIELD_RO fails when write option is used} { + catch {$slave bitfield_ro bits set u8 0 100 get u8 0} err + assert_match {*ERR BITFIELD_RO only supports the GET subcommand*} $err + } + } +} diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index 9fcef71d6..55f18bec8 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -320,4 +320,76 @@ start_server {tags {"multi"}} { $rd close r ping } {PONG} + + test {MULTI and script timeout} { + # check that if MULTI arrives during timeout, it is either refused, or + # allowed to pass, and we don't end up executing half of the transaction + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + catch { $rd2 incr xx; $rd2 read } e + catch { $rd2 exec; $rd2 read } e + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } + + test {EXEC and script timeout} { + # check that if EXEC arrives during timeout, we don't end up executing + # half of the transaction, and also that we exit the multi state + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 incr xx; $rd2 read } e + catch { $rd2 exec; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } + + test {MULTI-EXEC body and script timeout} { + # check that we don't run an imcomplete transaction due to some commands + # arriving during busy script + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r config set lua-time-limit 10 + r set xx 1 + catch { $rd2 multi; $rd2 read } e + catch { $rd2 incr xx; $rd2 read } e + $rd1 eval {while true do end} 0 + after 200 + catch { $rd2 incr xx; $rd2 read } e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + catch { $rd2 exec; $rd2 read } e + set xx [r get xx] + # make sure that either the whole transcation passed or none of it (we actually expect none) + assert { $xx == 1 || $xx == 3} + # check that the connection is no longer in multi state + $rd2 ping asdf + set pong [$rd2 read] + assert_equal $pong "asdf" + } } diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 072ed14d6..6b9a4a9cd 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -311,4 +311,17 @@ start_server { } } } + + start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} { + test {Empty stream with no lastid can be rewrite into AOF correctly} { + r XGROUP CREATE mystream group-name $ MKSTREAM + assert {[dict get [r xinfo stream mystream] length] == 0} + set grpinfo [r xinfo groups mystream] + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + assert {[dict get [r xinfo stream mystream] length] == 0} + assert {[r xinfo groups mystream] == $grpinfo} + } + } }