Merge commit '454e12cb8961f21c9dd8502dc82ae6ffd7e22fe0' into redis_6_merge
Former-commit-id: cc3ebbe5194e9744fb84ce490e90ac5fbe7f8716
This commit is contained in:
commit
ce54857237
@ -1628,7 +1628,7 @@ hz 10
|
||||
# offers, and enables by default, the ability to use an adaptive HZ value
|
||||
# which will temporary raise when there are many connected clients.
|
||||
#
|
||||
# When dynamic HZ is enabled, the actual configured HZ will be used as
|
||||
# When dynamic HZ is enabled, the actual configured HZ will be used
|
||||
# as a baseline, but multiples of the configured HZ value will be actually
|
||||
# used as needed once more clients are connected. In this way an idle
|
||||
# instance will use very little CPU time while a busy instance will be
|
||||
|
@ -102,6 +102,18 @@ sentinel monitor mymaster 127.0.0.1 6379 2
|
||||
#
|
||||
# sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
|
||||
|
||||
# sentinel auth-user <master-name> <username>
|
||||
#
|
||||
# This is useful in order to authenticate to instances having ACL capabilities,
|
||||
# that is, running Redis 6.0 or greater. When just auth-pass is provided the
|
||||
# Sentinel instance will authenticate to Redis using the old "AUTH <pass>"
|
||||
# method. When also an username is provided, it will use "AUTH <user> <pass>".
|
||||
# In the Redis servers side, the ACL to provide just minimal access to
|
||||
# Sentinel instances, should be configured along the following lines:
|
||||
#
|
||||
# user sentinel-user >somepassword +client +subscribe +publish \
|
||||
# +ping +info +multi +slaveof +config +client +exec on
|
||||
|
||||
# sentinel down-after-milliseconds <master-name> <milliseconds>
|
||||
#
|
||||
# Number of milliseconds the master (or any attached replica or sentinel) should
|
||||
|
11
src/acl.cpp
11
src/acl.cpp
@ -901,16 +901,6 @@ const char *ACLSetUserStringError(void) {
|
||||
return errmsg;
|
||||
}
|
||||
|
||||
/* Return the first password of the default user or NULL.
|
||||
* This function is needed for backward compatibility with the old
|
||||
* directive "requirepass" when Redis supported a single global
|
||||
* password. */
|
||||
sds ACLDefaultUserFirstPassword(void) {
|
||||
if (listLength(DefaultUser->passwords) == 0) return NULL;
|
||||
listNode *first = listFirst(DefaultUser->passwords);
|
||||
return (sds)listNodeValue(first);
|
||||
}
|
||||
|
||||
/* Initialize the default user, that will always exist for all the process
|
||||
* lifetime. */
|
||||
void ACLInitDefaultUser(void) {
|
||||
@ -927,6 +917,7 @@ void ACLInit(void) {
|
||||
UsersToLoad = listCreate();
|
||||
ACLLog = listCreate();
|
||||
ACLInitDefaultUser();
|
||||
g_pserver->requirepass = NULL; /* Only used for backward compatibility. */
|
||||
}
|
||||
|
||||
/* Check the username and password pair and return C_OK if they are valid,
|
||||
|
@ -1238,12 +1238,13 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
|
||||
/* Use the XADD MAXLEN 0 trick to generate an empty stream if
|
||||
* the key we are serializing is an empty string, which is possible
|
||||
* for the Stream type. */
|
||||
id.ms = 0; id.seq = 1;
|
||||
if (rioWriteBulkCount(r,'*',7) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
|
||||
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"0",1) == 0) return 0;
|
||||
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
|
||||
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"x",1) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"y",1) == 0) return 0;
|
||||
}
|
||||
|
@ -976,6 +976,9 @@ void bitposCommand(client *c) {
|
||||
* OVERFLOW [WRAP|SAT|FAIL]
|
||||
*/
|
||||
|
||||
#define BITFIELD_FLAG_NONE 0
|
||||
#define BITFIELD_FLAG_READONLY (1<<0)
|
||||
|
||||
struct bitfieldOp {
|
||||
uint64_t offset; /* Bitfield offset. */
|
||||
int64_t i64; /* Increment amount (INCRBY) or SET value */
|
||||
@ -985,7 +988,10 @@ struct bitfieldOp {
|
||||
int sign; /* True if signed, otherwise unsigned op. */
|
||||
};
|
||||
|
||||
void bitfieldCommand(client *c) {
|
||||
/* This implements both the BITFIELD command and the BITFIELD_RO command
|
||||
* when flags is set to BITFIELD_FLAG_READONLY: in this case only the
|
||||
* GET subcommand is allowed, other subcommands will return an error. */
|
||||
void bitfieldGeneric(client *c, int flags) {
|
||||
robj_roptr o;
|
||||
size_t bitoffset;
|
||||
int j, numops = 0, changes = 0;
|
||||
@ -1073,6 +1079,12 @@ void bitfieldCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (flags & BITFIELD_FLAG_READONLY) {
|
||||
zfree(ops);
|
||||
addReplyError(c, "BITFIELD_RO only supports the GET subcommand");
|
||||
return;
|
||||
}
|
||||
|
||||
/* Lookup by making room up to the farest bit reached by
|
||||
* this operation. */
|
||||
if ((o = lookupStringForBitCommand(c,
|
||||
@ -1203,3 +1215,11 @@ void bitfieldCommand(client *c) {
|
||||
}
|
||||
zfree(ops);
|
||||
}
|
||||
|
||||
void bitfieldCommand(client *c) {
|
||||
bitfieldGeneric(c, BITFIELD_FLAG_NONE);
|
||||
}
|
||||
|
||||
void bitfieldroCommand(client *c) {
|
||||
bitfieldGeneric(c, BITFIELD_FLAG_READONLY);
|
||||
}
|
||||
|
@ -113,6 +113,7 @@ void blockClient(client *c, int btype) {
|
||||
c->btype = btype;
|
||||
g_pserver->blocked_clients++;
|
||||
g_pserver->blocked_clients_by_type[btype]++;
|
||||
addClientToTimeoutTable(c);
|
||||
}
|
||||
|
||||
/* This function is called in the beforeSleep() function of the event loop
|
||||
@ -200,6 +201,7 @@ void unblockClient(client *c) {
|
||||
g_pserver->blocked_clients_by_type[c->btype]--;
|
||||
c->flags &= ~CLIENT_BLOCKED;
|
||||
c->btype = BLOCKED_NONE;
|
||||
removeClientFromTimeoutTable(c);
|
||||
queueClientForReprocessing(c);
|
||||
}
|
||||
|
||||
|
@ -719,6 +719,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
* or schedule it for later depending on connection implementation.
|
||||
*/
|
||||
if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
|
||||
if (connGetState(conn) == CONN_STATE_ERROR)
|
||||
serverLog(LL_VERBOSE,
|
||||
"Error accepting cluster node connection: %s",
|
||||
connGetLastError(conn));
|
||||
@ -4328,7 +4329,7 @@ void clusterCommand(client *c) {
|
||||
"FORGET <node-id> -- Remove a node from the cluster.",
|
||||
"GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
|
||||
"FLUSHSLOTS -- Delete current node own slots information.",
|
||||
"INFO - Return onformation about the cluster.",
|
||||
"INFO - Return information about the cluster.",
|
||||
"KEYSLOT <key> -- Return the hash slot for <key>.",
|
||||
"MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
|
||||
"MYID -- Return the node id.",
|
||||
@ -4339,6 +4340,7 @@ void clusterCommand(client *c) {
|
||||
"SET-config-epoch <epoch> - Set config epoch of current node.",
|
||||
"SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
|
||||
"REPLICAS <node-id> -- Return <node-id> replicas.",
|
||||
"SAVECONFIG - Force saving cluster configuration on disk.",
|
||||
"SLOTS -- Return information about slots range mappings. Each range is made of:",
|
||||
" start, end, master and replicas IP addresses, ports and ids",
|
||||
NULL
|
||||
|
@ -418,11 +418,15 @@ void loadServerConfigFromString(char *config) {
|
||||
goto loaderr;
|
||||
}
|
||||
/* The old "requirepass" directive just translates to setting
|
||||
* a password to the default user. */
|
||||
* a password to the default user. The only thing we do
|
||||
* additionally is to remember the cleartext password in this
|
||||
* case, for backward compatibility with Redis <= 5. */
|
||||
ACLSetUser(DefaultUser,"resetpass",-1);
|
||||
sds aclop = sdscatprintf(sdsempty(),">%s",argv[1]);
|
||||
ACLSetUser(DefaultUser,aclop,sdslen(aclop));
|
||||
sdsfree(aclop);
|
||||
sdsfree(g_pserver->requirepass);
|
||||
g_pserver->requirepass = sdsnew(argv[1]);
|
||||
} else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){
|
||||
/* DEAD OPTION */
|
||||
} else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2) {
|
||||
@ -672,11 +676,15 @@ void configSetCommand(client *c) {
|
||||
config_set_special_field("requirepass") {
|
||||
if (sdslen(szFromObj(o)) > CONFIG_AUTHPASS_MAX_LEN) goto badfmt;
|
||||
/* The old "requirepass" directive just translates to setting
|
||||
* a password to the default user. */
|
||||
* a password to the default user. The only thing we do
|
||||
* additionally is to remember the cleartext password in this
|
||||
* case, for backward compatibility with Redis <= 5. */
|
||||
ACLSetUser(DefaultUser,"resetpass",-1);
|
||||
sds aclop = sdscatprintf(sdsempty(),">%s",(char*)ptrFromObj(o));
|
||||
ACLSetUser(DefaultUser,aclop,sdslen(aclop));
|
||||
sdsfree(aclop);
|
||||
sdsfree(g_pserver->requirepass);
|
||||
g_pserver->requirepass = sdsnew(szFromObj(o));
|
||||
} config_set_special_field("save") {
|
||||
int vlen, j;
|
||||
sds *v = sdssplitlen(szFromObj(o),sdslen(szFromObj(o))," ",1,&vlen);
|
||||
@ -965,7 +973,7 @@ void configGetCommand(client *c) {
|
||||
}
|
||||
if (stringmatch(pattern,"requirepass",1)) {
|
||||
addReplyBulkCString(c,"requirepass");
|
||||
sds password = ACLDefaultUserFirstPassword();
|
||||
sds password = g_pserver->requirepass;
|
||||
if (password) {
|
||||
addReplyBulkCBuffer(c,password,sdslen(password));
|
||||
} else {
|
||||
@ -1416,7 +1424,7 @@ void rewriteConfigBindOption(struct rewriteConfigState *state) {
|
||||
void rewriteConfigRequirepassOption(struct rewriteConfigState *state, const char *option) {
|
||||
int force = 1;
|
||||
sds line;
|
||||
sds password = ACLDefaultUserFirstPassword();
|
||||
sds password = g_pserver->requirepass;
|
||||
|
||||
/* If there is no password set, we don't want the requirepass option
|
||||
* to be present in the configuration at all. */
|
||||
|
@ -152,7 +152,7 @@ static void connSocketClose(connection *conn) {
|
||||
/* If called from within a handler, schedule the close but
|
||||
* keep the connection until the handler returns.
|
||||
*/
|
||||
if (conn->flags & CONN_FLAG_IN_HANDLER) {
|
||||
if (connHasRefs(conn)) {
|
||||
conn->flags |= CONN_FLAG_CLOSE_SCHEDULED;
|
||||
return;
|
||||
}
|
||||
@ -183,10 +183,16 @@ static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
|
||||
}
|
||||
|
||||
static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
|
||||
int ret = C_OK;
|
||||
|
||||
if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
|
||||
conn->state = CONN_STATE_CONNECTED;
|
||||
if (!callHandler(conn, accept_handler)) return C_ERR;
|
||||
return C_OK;
|
||||
|
||||
connIncrRefs(conn);
|
||||
if (!callHandler(conn, accept_handler)) ret = C_ERR;
|
||||
connDecrRefs(conn);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Register a write handler, to be called when the connection is writable.
|
||||
|
@ -45,11 +45,10 @@ typedef enum {
|
||||
CONN_STATE_ERROR
|
||||
} ConnectionState;
|
||||
|
||||
#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */
|
||||
#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */
|
||||
#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */
|
||||
#define CONN_FLAG_READ_THREADSAFE (1<<3)
|
||||
#define CONN_FLAG_WRITE_THREADSAFE (1<<4)
|
||||
#define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */
|
||||
#define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */
|
||||
#define CONN_FLAG_READ_THREADSAFE (1<<2)
|
||||
#define CONN_FLAG_WRITE_THREADSAFE (1<<3)
|
||||
|
||||
typedef void (*ConnectionCallbackFunc)(struct connection *conn);
|
||||
|
||||
@ -72,7 +71,8 @@ typedef struct ConnectionType {
|
||||
struct connection {
|
||||
ConnectionType *type;
|
||||
ConnectionState state;
|
||||
int flags;
|
||||
short int flags;
|
||||
short int refs;
|
||||
int last_errno;
|
||||
void *private_data;
|
||||
ConnectionCallbackFunc conn_handler;
|
||||
@ -90,6 +90,13 @@ struct connection {
|
||||
* connAccept() may directly call accept_handler(), or return and call it
|
||||
* at a later time. This behavior is a bit awkward but aims to reduce the need
|
||||
* to wait for the next event loop, if no additional handshake is required.
|
||||
*
|
||||
* IMPORTANT: accept_handler may decide to close the connection, calling connClose().
|
||||
* To make this safe, the connection is only marked with CONN_FLAG_CLOSE_SCHEDULED
|
||||
* in this case, and connAccept() returns with an error.
|
||||
*
|
||||
* connAccept() callers must always check the return value and on error (C_ERR)
|
||||
* a connClose() must be called.
|
||||
*/
|
||||
|
||||
static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
|
||||
|
@ -37,46 +37,49 @@
|
||||
* implementations (currently sockets in connection.c and TLS in tls.c).
|
||||
*
|
||||
* Currently helpers implement the mechanisms for invoking connection
|
||||
* handlers, tracking in-handler states and dealing with deferred
|
||||
* destruction (if invoked by a handler).
|
||||
* handlers and tracking connection references, to allow safe destruction
|
||||
* of connections from within a handler.
|
||||
*/
|
||||
|
||||
/* Called whenever a handler is invoked on a connection and sets the
|
||||
* CONN_FLAG_IN_HANDLER flag to indicate we're in a handler context.
|
||||
/* Incremenet connection references.
|
||||
*
|
||||
* An attempt to close a connection while CONN_FLAG_IN_HANDLER is
|
||||
* set will result with deferred close, i.e. setting the CONN_FLAG_CLOSE_SCHEDULED
|
||||
* instead of destructing it.
|
||||
* Inside a connection handler, we guarantee refs >= 1 so it is always
|
||||
* safe to connClose().
|
||||
*
|
||||
* In other cases where we don't want to prematurely lose the connection,
|
||||
* it can go beyond 1 as well; currently it is only done by connAccept().
|
||||
*/
|
||||
static inline void enterHandler(connection *conn) {
|
||||
conn->flags |= CONN_FLAG_IN_HANDLER;
|
||||
static inline void connIncrRefs(connection *conn) {
|
||||
conn->refs++;
|
||||
}
|
||||
|
||||
/* Called whenever a handler returns. This unsets the CONN_FLAG_IN_HANDLER
|
||||
* flag and performs actual close/destruction if a deferred close was
|
||||
* scheduled by the handler.
|
||||
/* Decrement connection references.
|
||||
*
|
||||
* Note that this is not intended to provide any automatic free logic!
|
||||
* callHandler() takes care of that for the common flows, and anywhere an
|
||||
* explicit connIncrRefs() is used, the caller is expected to take care of
|
||||
* that.
|
||||
*/
|
||||
static inline int exitHandler(connection *conn) {
|
||||
conn->flags &= ~CONN_FLAG_IN_HANDLER;
|
||||
if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) {
|
||||
connClose(conn);
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
|
||||
static inline void connDecrRefs(connection *conn) {
|
||||
conn->refs--;
|
||||
}
|
||||
|
||||
static inline int connHasRefs(connection *conn) {
|
||||
return conn->refs;
|
||||
}
|
||||
|
||||
/* Helper for connection implementations to call handlers:
|
||||
* 1. Mark the handler in use.
|
||||
* 1. Increment refs to protect the connection.
|
||||
* 2. Execute the handler (if set).
|
||||
* 3. Mark the handler as NOT in use and perform deferred close if was
|
||||
* requested by the handler at any time.
|
||||
* 3. Decrement refs and perform deferred close, if refs==0.
|
||||
*/
|
||||
static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) {
|
||||
conn->flags |= CONN_FLAG_IN_HANDLER;
|
||||
connIncrRefs(conn);
|
||||
if (handler) handler(conn);
|
||||
conn->flags &= ~CONN_FLAG_IN_HANDLER;
|
||||
connDecrRefs(conn);
|
||||
if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) {
|
||||
connClose(conn);
|
||||
if (!connHasRefs(conn)) connClose(conn);
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
|
@ -1864,7 +1864,12 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
|
||||
* current request context (whether the client is a Lua script or in a MULTI),
|
||||
* and about the Redis instance in general, i.e replication and persistence.
|
||||
*
|
||||
* The available flags are:
|
||||
* It is possible to call this function even with a NULL context, however
|
||||
* in this case the following flags will not be reported:
|
||||
*
|
||||
* * LUA, MULTI, REPLICATED, DIRTY (see below for more info).
|
||||
*
|
||||
* Available flags and their meaning:
|
||||
*
|
||||
* * REDISMODULE_CTX_FLAGS_LUA: The command is running in a Lua script
|
||||
*
|
||||
@ -1917,6 +1922,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
|
||||
|
||||
int flags = 0;
|
||||
/* Client specific flags */
|
||||
if (ctx) {
|
||||
if (ctx->client) {
|
||||
if (ctx->client->flags & CLIENT_LUA)
|
||||
flags |= REDISMODULE_CTX_FLAGS_LUA;
|
||||
@ -1932,6 +1938,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
|
||||
if (c && (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))) {
|
||||
flags |= REDISMODULE_CTX_FLAGS_MULTI_DIRTY;
|
||||
}
|
||||
}
|
||||
|
||||
if (g_pserver->cluster_enabled)
|
||||
flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
|
||||
|
@ -137,7 +137,8 @@ client *createClient(connection *conn, int iel) {
|
||||
c->ctime = c->lastinteraction = g_pserver->unixtime;
|
||||
/* If the default user does not require authentication, the user is
|
||||
* directly authenticated. */
|
||||
c->authenticated = (c->puser->flags & USER_FLAG_NOPASS) != 0;
|
||||
c->authenticated = (c->puser->flags & USER_FLAG_NOPASS) &&
|
||||
!(c->puser->flags & USER_FLAG_DISABLED);
|
||||
c->replstate = REPL_STATE_NONE;
|
||||
c->repl_put_online_on_ack = 0;
|
||||
c->reploff = 0;
|
||||
@ -1100,7 +1101,7 @@ void clientAcceptHandler(connection *conn) {
|
||||
serverLog(LL_WARNING,
|
||||
"Error accepting a client connection: %s",
|
||||
connGetLastError(conn));
|
||||
freeClient(c);
|
||||
freeClientAsync(c);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1146,7 +1147,7 @@ void clientAcceptHandler(connection *conn) {
|
||||
/* Nothing to do, Just to avoid the warning... */
|
||||
}
|
||||
g_pserver->stat_rejected_conn++;
|
||||
freeClient(c);
|
||||
freeClientAsync(c);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -1207,6 +1208,7 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel)
|
||||
*/
|
||||
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
|
||||
char conninfo[100];
|
||||
if (connGetState(conn) == CONN_STATE_ERROR)
|
||||
serverLog(LL_WARNING,
|
||||
"Error accepting a client connection: %s (conn: %s)",
|
||||
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
|
||||
|
@ -28,6 +28,7 @@
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
#include <mutex>
|
||||
|
||||
int clientSubscriptionsCount(client *c);
|
||||
|
||||
@ -209,6 +210,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
|
||||
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
|
||||
int pubsubSubscribePattern(client *c, robj *pattern) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
dictEntry *de;
|
||||
list *clients;
|
||||
int retval = 0;
|
||||
|
||||
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
|
||||
@ -220,6 +223,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
|
||||
pat->pattern = getDecodedObject(pattern);
|
||||
pat->pclient = c;
|
||||
listAddNodeTail(g_pserver->pubsub_patterns,pat);
|
||||
/* Add the client to the pattern -> list of clients hash table */
|
||||
de = dictFind(g_pserver->pubsub_patterns_dict,pattern);
|
||||
if (de == NULL) {
|
||||
clients = listCreate();
|
||||
dictAdd(g_pserver->pubsub_patterns_dict,pattern,clients);
|
||||
incrRefCount(pattern);
|
||||
} else {
|
||||
clients = (list*)dictGetVal(de);
|
||||
}
|
||||
listAddNodeTail(clients,c);
|
||||
}
|
||||
/* Notify the client */
|
||||
addReplyPubsubPatSubscribed(c,pattern);
|
||||
@ -229,6 +242,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
|
||||
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
|
||||
* 0 if the client was not subscribed to the specified channel. */
|
||||
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
|
||||
dictEntry *de;
|
||||
list *clients;
|
||||
listNode *ln;
|
||||
pubsubPattern pat;
|
||||
int retval = 0;
|
||||
@ -241,6 +256,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
|
||||
pat.pattern = pattern;
|
||||
ln = listSearchKey(g_pserver->pubsub_patterns,&pat);
|
||||
listDelNode(g_pserver->pubsub_patterns,ln);
|
||||
/* Remove the client from the pattern -> clients list hash table */
|
||||
de = dictFind(g_pserver->pubsub_patterns_dict,pattern);
|
||||
serverAssertWithInfo(c,NULL,de != NULL);
|
||||
clients = (list*)dictGetVal(de);
|
||||
ln = listSearchKey(clients,c);
|
||||
serverAssertWithInfo(c,NULL,ln != NULL);
|
||||
listDelNode(clients,ln);
|
||||
if (listLength(clients) == 0) {
|
||||
/* Free the list and associated hash entry at all if this was
|
||||
* the latest client. */
|
||||
dictDelete(g_pserver->pubsub_patterns_dict,pattern);
|
||||
}
|
||||
}
|
||||
/* Notify the client */
|
||||
if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
|
||||
@ -290,6 +317,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
int receivers = 0;
|
||||
dictEntry *de;
|
||||
dictIterator *di;
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
|
||||
@ -314,29 +342,31 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
||||
}
|
||||
}
|
||||
/* Send to clients listening to matching channels */
|
||||
if (listLength(g_pserver->pubsub_patterns)) {
|
||||
listRewind(g_pserver->pubsub_patterns,&li);
|
||||
di = dictGetIterator(g_pserver->pubsub_patterns_dict);
|
||||
if (di) {
|
||||
channel = getDecodedObject(channel);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
pubsubPattern *pat = (pubsubPattern*)ln->value;
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *pattern = (robj*)dictGetKey(de);
|
||||
list *clients = (list*)dictGetVal(de);
|
||||
if (!stringmatchlen(szFromObj(pattern),
|
||||
sdslen(szFromObj(pattern)),
|
||||
szFromObj(channel),
|
||||
sdslen(szFromObj(channel)),0)) continue;
|
||||
|
||||
if (stringmatchlen((char*)ptrFromObj(pat->pattern),
|
||||
sdslen(szFromObj(pat->pattern)),
|
||||
(char*)ptrFromObj(channel),
|
||||
sdslen(szFromObj(channel)),0))
|
||||
{
|
||||
if (pat->pclient->flags & CLIENT_CLOSE_ASAP)
|
||||
listRewind(clients,&li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
client *c = (client*)listNodeValue(ln);
|
||||
if (c->flags & CLIENT_CLOSE_ASAP)
|
||||
continue;
|
||||
if (FCorrectThread(pat->pclient))
|
||||
fastlock_lock(&pat->pclient->lock);
|
||||
addReplyPubsubPatMessage(pat->pclient,
|
||||
pat->pattern,channel,message);
|
||||
if (FCorrectThread(pat->pclient))
|
||||
fastlock_unlock(&pat->pclient->lock);
|
||||
std::unique_lock<fastlock> l(c->lock, std::defer_lock);
|
||||
if (FCorrectThread(c))
|
||||
l.lock();
|
||||
addReplyPubsubPatMessage(c,pattern,channel,message);
|
||||
receivers++;
|
||||
}
|
||||
}
|
||||
decrRefCount(channel);
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
return receivers;
|
||||
}
|
||||
|
@ -2384,7 +2384,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
* received from the master. In the latter case, the master is
|
||||
* responsible for key expiry. If we would expire keys here, the
|
||||
* snapshot taken by the master may not be reflected on the replica. */
|
||||
bool fExpiredKey = (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica) && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
|
||||
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
|
||||
if (fStaleMvccKey || fExpiredKey) {
|
||||
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) {
|
||||
// We have a key that we've already deleted and is not back in our database.
|
||||
|
@ -1103,7 +1103,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
|
||||
(argc == 3 && !strcasecmp(command,"latency") &&
|
||||
!strcasecmp(argv[1],"graph")) ||
|
||||
(argc == 2 && !strcasecmp(command,"latency") &&
|
||||
!strcasecmp(argv[1],"doctor")))
|
||||
!strcasecmp(argv[1],"doctor")) ||
|
||||
/* Format PROXY INFO command for Redis Cluster Proxy:
|
||||
* https://github.com/artix75/redis-cluster-proxy */
|
||||
(argc >= 2 && !strcasecmp(command,"proxy") &&
|
||||
!strcasecmp(argv[1],"info")))
|
||||
{
|
||||
output_raw = 1;
|
||||
}
|
||||
@ -1800,6 +1804,8 @@ static void repl(void) {
|
||||
if (config.eval) {
|
||||
config.eval_ldb = 1;
|
||||
config.output = OUTPUT_RAW;
|
||||
sdsfreesplitres(argv,argc);
|
||||
linenoiseFree(line);
|
||||
return; /* Return to evalMode to restart the session. */
|
||||
} else {
|
||||
printf("Use 'restart' only in Lua debugging mode.");
|
||||
|
@ -3256,12 +3256,14 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) {
|
||||
delta);
|
||||
mi->master_initial_offset = g_pserver->master_repl_meaningful_offset;
|
||||
g_pserver->repl_backlog_histlen -= delta;
|
||||
g_pserver->repl_backlog_idx =
|
||||
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - delta)) %
|
||||
g_pserver->repl_backlog_size;
|
||||
if (g_pserver->repl_backlog_histlen < 0) g_pserver->repl_backlog_histlen = 0;
|
||||
}
|
||||
|
||||
/* The master client we create can be set to any DBID, because
|
||||
* the new master will start its replication stream with SELECT. */
|
||||
mi->master_initial_offset = g_pserver->master_repl_offset;
|
||||
replicationCreateMasterClient(mi,NULL,-1);
|
||||
std::lock_guard<decltype(mi->master->lock)> lock(mi->master->lock);
|
||||
|
||||
|
@ -205,7 +205,8 @@ typedef struct sentinelRedisInstance {
|
||||
dict *slaves; /* Slaves for this master instance. */
|
||||
unsigned int quorum;/* Number of sentinels that need to agree on failure. */
|
||||
int parallel_syncs; /* How many slaves to reconfigure at same time. */
|
||||
char *auth_pass; /* Password to use for AUTH against master & slaves. */
|
||||
char *auth_pass; /* Password to use for AUTH against master & replica. */
|
||||
char *auth_user; /* Username for ACLs AUTH against master & replica. */
|
||||
|
||||
/* Slave specific. */
|
||||
mstime_t master_link_down_time; /* Slave replication link down time. */
|
||||
@ -1231,6 +1232,7 @@ sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *
|
||||
SENTINEL_DEFAULT_DOWN_AFTER;
|
||||
ri->master_link_down_time = 0;
|
||||
ri->auth_pass = NULL;
|
||||
ri->auth_user = NULL;
|
||||
ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
|
||||
ri->slave_reconf_sent_time = 0;
|
||||
ri->slave_master_host = NULL;
|
||||
@ -1289,6 +1291,7 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
|
||||
sdsfree(ri->slave_master_host);
|
||||
sdsfree(ri->leader);
|
||||
sdsfree(ri->auth_pass);
|
||||
sdsfree(ri->auth_user);
|
||||
sdsfree(ri->info);
|
||||
releaseSentinelAddr(ri->addr);
|
||||
dictRelease(ri->renamed_commands);
|
||||
@ -1679,6 +1682,11 @@ const char *sentinelHandleConfiguration(char **argv, int argc) {
|
||||
ri = sentinelGetMasterByName(argv[1]);
|
||||
if (!ri) return "No such master with specified name.";
|
||||
ri->auth_pass = sdsnew(argv[2]);
|
||||
} else if (!strcasecmp(argv[0],"auth-user") && argc == 3) {
|
||||
/* auth-user <name> <username> */
|
||||
ri = sentinelGetMasterByName(argv[1]);
|
||||
if (!ri) return "No such master with specified name.";
|
||||
ri->auth_user = sdsnew(argv[2]);
|
||||
} else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
|
||||
/* current-epoch <epoch> */
|
||||
unsigned long long current_epoch = strtoull(argv[1],NULL,10);
|
||||
@ -1836,7 +1844,7 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
|
||||
rewriteConfigRewriteLine(state,"sentinel",line,1);
|
||||
}
|
||||
|
||||
/* sentinel auth-pass */
|
||||
/* sentinel auth-pass & auth-user */
|
||||
if (master->auth_pass) {
|
||||
line = sdscatprintf(sdsempty(),
|
||||
"sentinel auth-pass %s %s",
|
||||
@ -1844,6 +1852,13 @@ void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
|
||||
rewriteConfigRewriteLine(state,"sentinel",line,1);
|
||||
}
|
||||
|
||||
if (master->auth_user) {
|
||||
line = sdscatprintf(sdsempty(),
|
||||
"sentinel auth-user %s %s",
|
||||
master->name, master->auth_user);
|
||||
rewriteConfigRewriteLine(state,"sentinel",line,1);
|
||||
}
|
||||
|
||||
/* sentinel config-epoch */
|
||||
line = sdscatprintf(sdsempty(),
|
||||
"sentinel config-epoch %s %llu",
|
||||
@ -1968,19 +1983,29 @@ werr:
|
||||
* will disconnect and reconnect the link and so forth. */
|
||||
void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
|
||||
char *auth_pass = NULL;
|
||||
char *auth_user = NULL;
|
||||
|
||||
if (ri->flags & SRI_MASTER) {
|
||||
auth_pass = ri->auth_pass;
|
||||
auth_user = ri->auth_user;
|
||||
} else if (ri->flags & SRI_SLAVE) {
|
||||
auth_pass = ri->master->auth_pass;
|
||||
auth_user = ri->master->auth_user;
|
||||
} else if (ri->flags & SRI_SENTINEL) {
|
||||
auth_pass = ACLDefaultUserFirstPassword();
|
||||
auth_pass = g_pserver->requirepass;
|
||||
auth_user = NULL;
|
||||
}
|
||||
|
||||
if (auth_pass) {
|
||||
if (auth_pass && auth_user == NULL) {
|
||||
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
|
||||
sentinelInstanceMapCommand(ri,"AUTH"),
|
||||
auth_pass) == C_OK) ri->link->pending_commands++;
|
||||
} else if (auth_pass && auth_user) {
|
||||
/* If we also have an username, use the ACL-style AUTH command
|
||||
* with two arguments, username and password. */
|
||||
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s",
|
||||
sentinelInstanceMapCommand(ri,"AUTH"),
|
||||
auth_user, auth_pass) == C_OK) ri->link->pending_commands++;
|
||||
}
|
||||
}
|
||||
|
||||
@ -3522,6 +3547,12 @@ void sentinelSetCommand(client *c) {
|
||||
sdsfree(ri->auth_pass);
|
||||
ri->auth_pass = strlen(value) ? sdsnew(value) : NULL;
|
||||
changes++;
|
||||
} else if (!strcasecmp(option,"auth-user") && moreargs > 0) {
|
||||
/* auth-user <username> */
|
||||
char *value = szFromObj(c->argv[++j]);
|
||||
sdsfree(ri->auth_user);
|
||||
ri->auth_user = strlen(value) ? sdsnew(value) : NULL;
|
||||
changes++;
|
||||
} else if (!strcasecmp(option,"quorum") && moreargs > 0) {
|
||||
/* quorum <count> */
|
||||
robj *o = c->argv[++j];
|
||||
|
204
src/server.cpp
204
src/server.cpp
@ -259,6 +259,10 @@ struct redisCommand redisCommandTable[] = {
|
||||
"write use-memory @bitmap",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
|
||||
{"bitfield_ro",bitfieldroCommand,-2,
|
||||
"read-only fast @bitmap",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
|
||||
{"setrange",setrangeCommand,4,
|
||||
"write use-memory @string",
|
||||
0,NULL,1,1,1,0,0,0},
|
||||
@ -1497,6 +1501,137 @@ int allPersistenceDisabled(void) {
|
||||
return g_pserver->saveparamslen == 0 && g_pserver->aof_state == AOF_OFF;
|
||||
}
|
||||
|
||||
/* ========================== Clients timeouts ============================= */
|
||||
|
||||
/* Check if this blocked client timedout (does nothing if the client is
|
||||
* not blocked right now). If so send a reply, unblock it, and return 1.
|
||||
* Otherwise 0 is returned and no operation is performed. */
|
||||
int checkBlockedClientTimeout(client *c, mstime_t now) {
|
||||
if (c->flags & CLIENT_BLOCKED &&
|
||||
c->bpop.timeout != 0
|
||||
&& c->bpop.timeout < now)
|
||||
{
|
||||
/* Handle blocking operation specific timeout. */
|
||||
replyToBlockedClientTimedOut(c);
|
||||
unblockClient(c);
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check for timeouts. Returns non-zero if the client was terminated.
|
||||
* The function gets the current time in milliseconds as argument since
|
||||
* it gets called multiple times in a loop, so calling gettimeofday() for
|
||||
* each iteration would be costly without any actual gain. */
|
||||
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
|
||||
time_t now = now_ms/1000;
|
||||
|
||||
if (cserver.maxidletime &&
|
||||
/* This handles the idle clients connection timeout if set. */
|
||||
!(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
|
||||
!(c->flags & CLIENT_MASTER) && /* No timeout for masters */
|
||||
!(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
|
||||
!(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
|
||||
(now - c->lastinteraction > cserver.maxidletime))
|
||||
{
|
||||
serverLog(LL_VERBOSE,"Closing idle client");
|
||||
freeClient(c);
|
||||
return 1;
|
||||
} else if (c->flags & CLIENT_BLOCKED) {
|
||||
/* Cluster: handle unblock & redirect of clients blocked
|
||||
* into keys no longer served by this server. */
|
||||
if (g_pserver->cluster_enabled) {
|
||||
if (clusterRedirectBlockedClientIfNeeded(c))
|
||||
unblockClient(c);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* For blocked clients timeouts we populate a radix tree of 128 bit keys
|
||||
* composed as such:
|
||||
*
|
||||
* [8 byte big endian expire time]+[8 byte client ID]
|
||||
*
|
||||
* We don't do any cleanup in the Radix tree: when we run the clients that
|
||||
* reached the timeout already, if they are no longer existing or no longer
|
||||
* blocked with such timeout, we just go forward.
|
||||
*
|
||||
* Every time a client blocks with a timeout, we add the client in
|
||||
* the tree. In beforeSleep() we call clientsHandleTimeout() to run
|
||||
* the tree and unblock the clients. */
|
||||
|
||||
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
|
||||
|
||||
/* Given client ID and timeout, write the resulting radix tree key in buf. */
|
||||
void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) {
|
||||
timeout = htonu64(timeout);
|
||||
memcpy(buf,&timeout,sizeof(timeout));
|
||||
memcpy(buf+8,&id,sizeof(id));
|
||||
}
|
||||
|
||||
/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
|
||||
* the timeout into *toptr and the client ID into *idptr. */
|
||||
void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
|
||||
memcpy(toptr,buf,sizeof(*toptr));
|
||||
*toptr = ntohu64(*toptr);
|
||||
memcpy(idptr,buf+8,sizeof(*idptr));
|
||||
}
|
||||
|
||||
/* Add the specified client id / timeout as a key in the radix tree we use
|
||||
* to handle blocked clients timeouts. The client is not added to the list
|
||||
* if its timeout is zero (block forever). */
|
||||
void addClientToTimeoutTable(client *c) {
|
||||
if (c->bpop.timeout == 0) return;
|
||||
uint64_t timeout = c->bpop.timeout;
|
||||
uint64_t id = c->id;
|
||||
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||
encodeTimeoutKey(buf,timeout,id);
|
||||
if (raxTryInsert(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL,NULL))
|
||||
c->flags |= CLIENT_IN_TO_TABLE;
|
||||
}
|
||||
|
||||
/* Remove the client from the table when it is unblocked for reasons
|
||||
* different than timing out. */
|
||||
void removeClientFromTimeoutTable(client *c) {
|
||||
if (!(c->flags & CLIENT_IN_TO_TABLE)) return;
|
||||
c->flags &= ~CLIENT_IN_TO_TABLE;
|
||||
uint64_t timeout = c->bpop.timeout;
|
||||
uint64_t id = c->id;
|
||||
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||
encodeTimeoutKey(buf,timeout,id);
|
||||
raxRemove(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL);
|
||||
}
|
||||
|
||||
/* This function is called in beforeSleep() in order to unblock clients
|
||||
* that are waiting in blocking operations with a timeout set. */
|
||||
void clientsHandleTimeout(void) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
if (raxSize(g_pserver->clients_timeout_table) == 0) return;
|
||||
uint64_t now = mstime();
|
||||
raxIterator ri;
|
||||
raxStart(&ri,g_pserver->clients_timeout_table);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
|
||||
while(raxNext(&ri)) {
|
||||
uint64_t id, timeout;
|
||||
decodeTimeoutKey(ri.key,&timeout,&id);
|
||||
if (timeout >= now) break; /* All the timeouts are in the future. */
|
||||
client *c = lookupClientByID(id);
|
||||
if (c) {
|
||||
std::unique_lock<fastlock> l(c->lock, std::defer_lock);
|
||||
if (FCorrectThread(c))
|
||||
l.lock();
|
||||
c->flags &= ~CLIENT_IN_TO_TABLE;
|
||||
checkBlockedClientTimeout(c,now);
|
||||
}
|
||||
raxRemove(g_pserver->clients_timeout_table,ri.key,ri.key_len,NULL);
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
}
|
||||
}
|
||||
|
||||
/* ======================= Cron: called every 100 ms ======================== */
|
||||
|
||||
/* Add a sample to the operations per second array of samples. */
|
||||
@ -1526,42 +1661,6 @@ long long getInstantaneousMetric(int metric) {
|
||||
return sum / STATS_METRIC_SAMPLES;
|
||||
}
|
||||
|
||||
/* Check for timeouts. Returns non-zero if the client was terminated.
|
||||
* The function gets the current time in milliseconds as argument since
|
||||
* it gets called multiple times in a loop, so calling gettimeofday() for
|
||||
* each iteration would be costly without any actual gain. */
|
||||
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
|
||||
time_t now = now_ms/1000;
|
||||
|
||||
if (cserver.maxidletime &&
|
||||
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */
|
||||
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */
|
||||
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
|
||||
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
|
||||
(now - c->lastinteraction > cserver.maxidletime))
|
||||
{
|
||||
serverLog(LL_VERBOSE,"Closing idle client");
|
||||
freeClient(c);
|
||||
return 1;
|
||||
} else if (c->flags & CLIENT_BLOCKED) {
|
||||
/* Blocked OPS timeout is handled with milliseconds resolution.
|
||||
* However note that the actual resolution is limited by
|
||||
* g_pserver->hz. */
|
||||
|
||||
if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
|
||||
/* Handle blocking operation specific timeout. */
|
||||
replyToBlockedClientTimedOut(c);
|
||||
unblockClient(c);
|
||||
} else if (g_pserver->cluster_enabled) {
|
||||
/* Cluster: handle unblock & redirect of clients blocked
|
||||
* into keys no longer served by this g_pserver-> */
|
||||
if (clusterRedirectBlockedClientIfNeeded(c))
|
||||
unblockClient(c);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* The client query buffer is an sds.c string that can end with a lot of
|
||||
* free space not used, this function reclaims space if needed.
|
||||
*
|
||||
@ -1728,11 +1827,13 @@ void clientsCron(int iel) {
|
||||
void databasesCron(void) {
|
||||
/* Expire keys by random sampling. Not required for slaves
|
||||
* as master will synthesize DELs for us. */
|
||||
if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) {
|
||||
if (g_pserver->active_expire_enabled) {
|
||||
if (iAmMaster()) {
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
||||
} else if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) {
|
||||
} else {
|
||||
expireSlaveKeys();
|
||||
}
|
||||
}
|
||||
|
||||
/* Defrag keys gradually. */
|
||||
activeDefragCycle();
|
||||
@ -2163,8 +2264,12 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
UNUSED(eventLoop);
|
||||
|
||||
/* Handle precise timeouts of blocked clients. */
|
||||
clientsHandleTimeout();
|
||||
|
||||
/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
|
||||
tlsProcessPendingData();
|
||||
|
||||
/* If tls still has pending unread data don't sleep at all. */
|
||||
aeSetDontWait(eventLoop, tlsHasPendingData());
|
||||
|
||||
@ -2424,6 +2529,7 @@ void initServerConfig(void) {
|
||||
g_pserver->clients = listCreate();
|
||||
g_pserver->slaves = listCreate();
|
||||
g_pserver->monitors = listCreate();
|
||||
g_pserver->clients_timeout_table = raxNew();
|
||||
g_pserver->timezone = getTimeZone(); /* Initialized by tzset(). */
|
||||
cserver.configfile = NULL;
|
||||
cserver.executable = NULL;
|
||||
@ -3003,6 +3109,7 @@ void initServer(void) {
|
||||
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
|
||||
g_pserver->pubsub_channels = dictCreate(&keylistDictType,NULL);
|
||||
g_pserver->pubsub_patterns = listCreate();
|
||||
g_pserver->pubsub_patterns_dict = dictCreate(&keylistDictType,NULL);
|
||||
listSetFreeMethod(g_pserver->pubsub_patterns,freePubsubPattern);
|
||||
listSetMatchMethod(g_pserver->pubsub_patterns,listMatchPubsubPattern);
|
||||
g_pserver->cronloops = 0;
|
||||
@ -3593,7 +3700,7 @@ int processCommand(client *c, int callFlags) {
|
||||
/* Check if the user is authenticated. This check is skipped in case
|
||||
* the default user is flagged as "nopass" and is active. */
|
||||
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
|
||||
DefaultUser->flags & USER_FLAG_DISABLED) &&
|
||||
(DefaultUser->flags & USER_FLAG_DISABLED)) &&
|
||||
!c->authenticated;
|
||||
if (auth_required) {
|
||||
/* AUTH and HELLO and no auth modules are valid even in
|
||||
@ -3727,6 +3834,7 @@ int processCommand(client *c, int callFlags) {
|
||||
!(c->flags & CLIENT_MASTER) &&
|
||||
c->cmd->flags & CMD_WRITE)
|
||||
{
|
||||
flagTransaction(c);
|
||||
addReply(c, shared.roslaveerr);
|
||||
return C_OK;
|
||||
}
|
||||
@ -3766,11 +3874,19 @@ int processCommand(client *c, int callFlags) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Lua script too slow? Only allow a limited number of commands. */
|
||||
/* Lua script too slow? Only allow a limited number of commands.
|
||||
* Note that we need to allow the transactions commands, otherwise clients
|
||||
* sending a transaction with pipelining without error checking, may have
|
||||
* the MULTI plus a few initial commands refused, then the timeout
|
||||
* condition resolves, and the bottom-half of the transaction gets
|
||||
* executed, see Github PR #7022. */
|
||||
if (g_pserver->lua_timedout &&
|
||||
c->cmd->proc != authCommand &&
|
||||
c->cmd->proc != helloCommand &&
|
||||
c->cmd->proc != replconfCommand &&
|
||||
c->cmd->proc != multiCommand &&
|
||||
c->cmd->proc != execCommand &&
|
||||
c->cmd->proc != discardCommand &&
|
||||
!(c->cmd->proc == shutdownCommand &&
|
||||
c->argc == 2 &&
|
||||
tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') &&
|
||||
@ -4225,11 +4341,13 @@ sds genRedisInfoString(const char *section) {
|
||||
"client_recent_max_output_buffer:%zu\r\n"
|
||||
"blocked_clients:%d\r\n"
|
||||
"tracking_clients:%d\r\n"
|
||||
"clients_in_timeout_table:%" PRIu64 "\r\n"
|
||||
"current_client_thread:%d\r\n",
|
||||
listLength(g_pserver->clients)-listLength(g_pserver->slaves),
|
||||
maxin, maxout,
|
||||
g_pserver->blocked_clients,
|
||||
g_pserver->tracking_clients,
|
||||
raxSize(g_pserver->clients_timeout_table),
|
||||
static_cast<int>(serverTL - g_pserver->rgthreadvar));
|
||||
for (int ithread = 0; ithread < cserver.cthreads; ++ithread)
|
||||
{
|
||||
@ -5231,6 +5349,12 @@ static void validateConfiguration()
|
||||
}
|
||||
}
|
||||
|
||||
int iAmMaster(void) {
|
||||
return ((!g_pserver->cluster_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) ||
|
||||
(g_pserver->cluster_enabled && nodeIsMaster(g_pserver->cluster->myself)));
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
struct timeval tv;
|
||||
int j;
|
||||
|
13
src/server.h
13
src/server.h
@ -416,7 +416,8 @@ public:
|
||||
#define CLIENT_TRACKING_OPTOUT (1ULL<<35) /* Tracking in opt-out mode. */
|
||||
#define CLIENT_TRACKING_CACHING (1ULL<<36) /* CACHING yes/no was given,
|
||||
depending on optin/optout mode. */
|
||||
#define CLIENT_FORCE_REPLY (1ULL<<37) /* Should addReply be forced to write the text? */
|
||||
#define CLIENT_IN_TO_TABLE (1ULL<<37) /* This client is in the timeout table. */
|
||||
#define CLIENT_FORCE_REPLY (1ULL<<38) /* Should addReply be forced to write the text? */
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if CLIENT_BLOCKED flag is set. */
|
||||
@ -1656,6 +1657,7 @@ struct redisServer {
|
||||
list *clients; /* List of active clients */
|
||||
list *clients_to_close; /* Clients to close asynchronously */
|
||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
|
||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
|
||||
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
||||
@ -1885,6 +1887,7 @@ struct redisServer {
|
||||
/* Pubsub */
|
||||
dict *pubsub_channels; /* Map channels to list of subscribed clients */
|
||||
list *pubsub_patterns; /* A list of pubsub_patterns */
|
||||
dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */
|
||||
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
|
||||
xor of NOTIFY_... flags. */
|
||||
/* Cluster */
|
||||
@ -1936,6 +1939,9 @@ struct redisServer {
|
||||
/* ACLs */
|
||||
char *acl_filename; /* ACL Users file. NULL if not configured. */
|
||||
unsigned long acllog_max_len; /* Maximum length of the ACL LOG list. */
|
||||
sds requirepass; /* Remember the cleartext password set with the
|
||||
old "requirepass" directive for backward
|
||||
compatibility with Redis <= 5. */
|
||||
/* Assert & bug reporting */
|
||||
const char *assert_failed;
|
||||
const char *assert_file;
|
||||
@ -2718,6 +2724,8 @@ void disconnectAllBlockedClients(void);
|
||||
void handleClientsBlockedOnKeys(void);
|
||||
void signalKeyAsReady(redisDb *db, robj *key);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
||||
void addClientToTimeoutTable(client *c);
|
||||
void removeClientFromTimeoutTable(client *c);
|
||||
|
||||
/* expire.c -- Handling of expired keys */
|
||||
void activeExpireCycle(int type);
|
||||
@ -2760,6 +2768,7 @@ void existsCommand(client *c);
|
||||
void setbitCommand(client *c);
|
||||
void getbitCommand(client *c);
|
||||
void bitfieldCommand(client *c);
|
||||
void bitfieldroCommand(client *c);
|
||||
void setrangeCommand(client *c);
|
||||
void getrangeCommand(client *c);
|
||||
void incrCommand(client *c);
|
||||
@ -3021,4 +3030,6 @@ class ShutdownException
|
||||
#define redisDebugMark() \
|
||||
printf("-- MARK %s:%d --\n", __FILE__, __LINE__)
|
||||
|
||||
int iAmMaster(void);
|
||||
|
||||
#endif
|
||||
|
@ -94,7 +94,7 @@ void disableTracking(client *c) {
|
||||
g_pserver->tracking_clients--;
|
||||
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
|
||||
CLIENT_TRACKING_BCAST|CLIENT_TRACKING_OPTIN|
|
||||
CLIENT_TRACKING_OPTOUT);
|
||||
CLIENT_TRACKING_OPTOUT|CLIENT_TRACKING_CACHING);
|
||||
}
|
||||
}
|
||||
|
||||
@ -271,7 +271,7 @@ void trackingInvalidateKey(robj *keyobj) {
|
||||
trackingRememberKeyToBroadcast(sdskey,sdslen(sdskey));
|
||||
|
||||
rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
|
||||
if (ids == raxNotFound) return;;
|
||||
if (ids == raxNotFound) return;
|
||||
|
||||
raxIterator ri;
|
||||
raxStart(&ri,ids);
|
||||
|
87
tests/cluster/tests/14-consistency-check.tcl
Normal file
87
tests/cluster/tests/14-consistency-check.tcl
Normal file
@ -0,0 +1,87 @@
|
||||
source "../tests/includes/init-tests.tcl"
|
||||
|
||||
test "Create a 5 nodes cluster" {
|
||||
create_cluster 5 5
|
||||
}
|
||||
|
||||
test "Cluster should start ok" {
|
||||
assert_cluster_state ok
|
||||
}
|
||||
|
||||
test "Cluster is writable" {
|
||||
cluster_write_test 0
|
||||
}
|
||||
|
||||
proc find_non_empty_master {} {
|
||||
set master_id_no {}
|
||||
foreach_redis_id id {
|
||||
if {[RI $id role] eq {master} && [R $id dbsize] > 0} {
|
||||
set master_id_no $id
|
||||
}
|
||||
}
|
||||
return $master_id_no
|
||||
}
|
||||
|
||||
proc get_one_of_my_replica {id} {
|
||||
set replica_port [lindex [lindex [lindex [R $id role] 2] 0] 1]
|
||||
set replica_id_num [get_instance_id_by_port redis $replica_port]
|
||||
return $replica_id_num
|
||||
}
|
||||
|
||||
proc cluster_write_keys_with_expire {id ttl} {
|
||||
set prefix [randstring 20 20 alpha]
|
||||
set port [get_instance_attrib redis $id port]
|
||||
set cluster [redis_cluster 127.0.0.1:$port]
|
||||
for {set j 100} {$j < 200} {incr j} {
|
||||
$cluster setex key_expire.$j $ttl $prefix.$j
|
||||
}
|
||||
$cluster close
|
||||
}
|
||||
|
||||
proc test_slave_load_expired_keys {aof} {
|
||||
test "Slave expired keys is loaded when restarted: appendonly=$aof" {
|
||||
set master_id [find_non_empty_master]
|
||||
set replica_id [get_one_of_my_replica $master_id]
|
||||
|
||||
set master_dbsize [R $master_id dbsize]
|
||||
set slave_dbsize [R $replica_id dbsize]
|
||||
assert_equal $master_dbsize $slave_dbsize
|
||||
|
||||
set data_ttl 5
|
||||
cluster_write_keys_with_expire $master_id $data_ttl
|
||||
after 100
|
||||
set replica_dbsize_1 [R $replica_id dbsize]
|
||||
assert {$replica_dbsize_1 > $slave_dbsize}
|
||||
|
||||
R $replica_id config set appendonly $aof
|
||||
R $replica_id config rewrite
|
||||
|
||||
set start_time [clock seconds]
|
||||
set end_time [expr $start_time+$data_ttl+2]
|
||||
R $replica_id save
|
||||
set replica_dbsize_2 [R $replica_id dbsize]
|
||||
assert {$replica_dbsize_2 > $slave_dbsize}
|
||||
kill_instance redis $replica_id
|
||||
|
||||
set master_port [get_instance_attrib redis $master_id port]
|
||||
exec ../../../src/redis-cli -h 127.0.0.1 -p $master_port debug sleep [expr $data_ttl+3] > /dev/null &
|
||||
|
||||
while {[clock seconds] <= $end_time} {
|
||||
#wait for $data_ttl seconds
|
||||
}
|
||||
restart_instance redis $replica_id
|
||||
|
||||
wait_for_condition 200 50 {
|
||||
[R $replica_id ping] eq {PONG}
|
||||
} else {
|
||||
fail "replica #$replica_id not started"
|
||||
}
|
||||
|
||||
set replica_dbsize_3 [R $replica_id dbsize]
|
||||
assert {$replica_dbsize_3 > $slave_dbsize}
|
||||
}
|
||||
}
|
||||
|
||||
test_slave_load_expired_keys no
|
||||
after 5000
|
||||
test_slave_load_expired_keys yes
|
61
tests/integration/psync2-pingoff.tcl
Normal file
61
tests/integration/psync2-pingoff.tcl
Normal file
@ -0,0 +1,61 @@
|
||||
# Test the meaningful offset implementation to make sure masters
|
||||
# are able to PSYNC with replicas even if the replication stream
|
||||
# has pending PINGs at the end.
|
||||
|
||||
start_server {tags {"psync2"}} {
|
||||
start_server {} {
|
||||
# Config
|
||||
set debug_msg 0 ; # Enable additional debug messages
|
||||
|
||||
for {set j 0} {$j < 2} {incr j} {
|
||||
set R($j) [srv [expr 0-$j] client]
|
||||
set R_host($j) [srv [expr 0-$j] host]
|
||||
set R_port($j) [srv [expr 0-$j] port]
|
||||
$R($j) CONFIG SET repl-ping-replica-period 1
|
||||
if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"}
|
||||
}
|
||||
|
||||
# Setup replication
|
||||
test "PSYNC2 meaningful offset: setup" {
|
||||
$R(1) replicaof $R_host(0) $R_port(0)
|
||||
$R(0) set foo bar
|
||||
wait_for_condition 50 1000 {
|
||||
[$R(0) dbsize] == 1 && [$R(1) dbsize] == 1
|
||||
} else {
|
||||
fail "Replicas not replicating from master"
|
||||
}
|
||||
}
|
||||
|
||||
test "PSYNC2 meaningful offset: write and wait replication" {
|
||||
$R(0) INCR counter
|
||||
$R(0) INCR counter
|
||||
$R(0) INCR counter
|
||||
wait_for_condition 50 1000 {
|
||||
[$R(0) GET counter] eq [$R(1) GET counter]
|
||||
} else {
|
||||
fail "Master and replica don't agree about counter"
|
||||
}
|
||||
}
|
||||
|
||||
# In this test we'll make sure the replica will get stuck, but with
|
||||
# an active connection: this way the master will continue to send PINGs
|
||||
# every second (we modified the PING period earlier)
|
||||
test "PSYNC2 meaningful offset: pause replica and promote it" {
|
||||
$R(1) MULTI
|
||||
$R(1) DEBUG SLEEP 5
|
||||
$R(1) SLAVEOF NO ONE
|
||||
$R(1) EXEC
|
||||
$R(1) ping ; # Wait for it to return back available
|
||||
}
|
||||
|
||||
test "Make the old master a replica of the new one and check conditions" {
|
||||
set sync_partial [status $R(1) sync_partial_ok]
|
||||
assert {$sync_partial == 0}
|
||||
$R(0) REPLICAOF $R_host(1) $R_port(1)
|
||||
wait_for_condition 50 1000 {
|
||||
[status $R(1) sync_partial_ok] == 1
|
||||
} else {
|
||||
fail "The new master was not able to partial sync"
|
||||
}
|
||||
}
|
||||
}}
|
@ -52,6 +52,7 @@ set ::all_tests {
|
||||
integration/logging
|
||||
integration/psync2
|
||||
integration/psync2-reg
|
||||
integration/psync2-pingoff
|
||||
unit/pubsub
|
||||
unit/slowlog
|
||||
unit/scripting
|
||||
|
@ -248,4 +248,11 @@ start_server {tags {"acl"}} {
|
||||
r AUTH default ""
|
||||
assert {[llength [r ACL LOG]] == 5}
|
||||
}
|
||||
|
||||
test {When default user is off, new connections are not authenticated} {
|
||||
r ACL setuser default off
|
||||
catch {set rd1 [redis_deferring_client]} e
|
||||
r ACL setuser default on
|
||||
set e
|
||||
} {*NOAUTH*}
|
||||
}
|
||||
|
@ -199,3 +199,34 @@ start_server {tags {"bitops"}} {
|
||||
r del mystring
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"repl"}} {
|
||||
start_server {} {
|
||||
set master [srv -1 client]
|
||||
set master_host [srv -1 host]
|
||||
set master_port [srv -1 port]
|
||||
set slave [srv 0 client]
|
||||
|
||||
test {BITFIELD: setup slave} {
|
||||
$slave slaveof $master_host $master_port
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
}
|
||||
}
|
||||
|
||||
test {BITFIELD: write on master, read on slave} {
|
||||
$master del bits
|
||||
assert_equal 0 [$master bitfield bits set u8 0 255]
|
||||
assert_equal 255 [$master bitfield bits set u8 0 100]
|
||||
wait_for_ofs_sync $master $slave
|
||||
assert_equal 100 [$slave bitfield_ro bits get u8 0]
|
||||
}
|
||||
|
||||
test {BITFIELD_RO fails when write option is used} {
|
||||
catch {$slave bitfield_ro bits set u8 0 100 get u8 0} err
|
||||
assert_match {*ERR BITFIELD_RO only supports the GET subcommand*} $err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -320,4 +320,76 @@ start_server {tags {"multi"}} {
|
||||
$rd close
|
||||
r ping
|
||||
} {PONG}
|
||||
|
||||
test {MULTI and script timeout} {
|
||||
# check that if MULTI arrives during timeout, it is either refused, or
|
||||
# allowed to pass, and we don't end up executing half of the transaction
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
r config set lua-time-limit 10
|
||||
r set xx 1
|
||||
$rd1 eval {while true do end} 0
|
||||
after 200
|
||||
catch { $rd2 multi; $rd2 read } e
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
r script kill
|
||||
after 200 ; # Give some time to Lua to call the hook again...
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
catch { $rd2 exec; $rd2 read } e
|
||||
set xx [r get xx]
|
||||
# make sure that either the whole transcation passed or none of it (we actually expect none)
|
||||
assert { $xx == 1 || $xx == 3}
|
||||
# check that the connection is no longer in multi state
|
||||
$rd2 ping asdf
|
||||
set pong [$rd2 read]
|
||||
assert_equal $pong "asdf"
|
||||
}
|
||||
|
||||
test {EXEC and script timeout} {
|
||||
# check that if EXEC arrives during timeout, we don't end up executing
|
||||
# half of the transaction, and also that we exit the multi state
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
r config set lua-time-limit 10
|
||||
r set xx 1
|
||||
catch { $rd2 multi; $rd2 read } e
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
$rd1 eval {while true do end} 0
|
||||
after 200
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
catch { $rd2 exec; $rd2 read } e
|
||||
r script kill
|
||||
after 200 ; # Give some time to Lua to call the hook again...
|
||||
set xx [r get xx]
|
||||
# make sure that either the whole transcation passed or none of it (we actually expect none)
|
||||
assert { $xx == 1 || $xx == 3}
|
||||
# check that the connection is no longer in multi state
|
||||
$rd2 ping asdf
|
||||
set pong [$rd2 read]
|
||||
assert_equal $pong "asdf"
|
||||
}
|
||||
|
||||
test {MULTI-EXEC body and script timeout} {
|
||||
# check that we don't run an imcomplete transaction due to some commands
|
||||
# arriving during busy script
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
r config set lua-time-limit 10
|
||||
r set xx 1
|
||||
catch { $rd2 multi; $rd2 read } e
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
$rd1 eval {while true do end} 0
|
||||
after 200
|
||||
catch { $rd2 incr xx; $rd2 read } e
|
||||
r script kill
|
||||
after 200 ; # Give some time to Lua to call the hook again...
|
||||
catch { $rd2 exec; $rd2 read } e
|
||||
set xx [r get xx]
|
||||
# make sure that either the whole transcation passed or none of it (we actually expect none)
|
||||
assert { $xx == 1 || $xx == 3}
|
||||
# check that the connection is no longer in multi state
|
||||
$rd2 ping asdf
|
||||
set pong [$rd2 read]
|
||||
assert_equal $pong "asdf"
|
||||
}
|
||||
}
|
||||
|
@ -311,4 +311,17 @@ start_server {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
|
||||
test {Empty stream with no lastid can be rewrite into AOF correctly} {
|
||||
r XGROUP CREATE mystream group-name $ MKSTREAM
|
||||
assert {[dict get [r xinfo stream mystream] length] == 0}
|
||||
set grpinfo [r xinfo groups mystream]
|
||||
r bgrewriteaof
|
||||
waitForBgrewriteaof r
|
||||
r debug loadaof
|
||||
assert {[dict get [r xinfo stream mystream] length] == 0}
|
||||
assert {[r xinfo groups mystream] == $grpinfo}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user