Merge commit 'a8e2bbe8f6a3f4833f286cc5049e6b52c87de1a9' into redis_6_merge

Former-commit-id: 5589a0a69ca6f5798b750a6a79f7e9b44d20e136
This commit is contained in:
John Sully 2020-04-14 19:22:44 -04:00
commit 6df76a3bfa
10 changed files with 98 additions and 150 deletions

View File

@ -155,23 +155,22 @@ tcp-keepalive 300
# tls-ca-cert-file ca.crt # tls-ca-cert-file ca.crt
# tls-ca-cert-dir /etc/ssl/certs # tls-ca-cert-dir /etc/ssl/certs
# If TLS/SSL clients are required to authenticate using a client side # By default, clients (including replica servers) on a TLS port are required
# certificate, use this directive. # to authenticate using valid client side certificates.
# #
# Note: this applies to all incoming clients, including replicas. # It is possible to disable authentication using this directive.
# #
# tls-auth-clients yes # tls-auth-clients no
# If TLS/SSL should be used when connecting as a replica to a master, enable # By default, a Redis replica does not attempt to establish a TLS connection
# this configuration directive: # with its master.
#
# Use the following directive to enable TLS on replication links.
# #
# tls-replication yes # tls-replication yes
# If TLS/SSL should be used for the Redis Cluster bus, enable this configuration # By default, the Redis Cluster bus uses a plain TCP connection. To enable
# directive. # TLS for the bus protocol, use the following directive:
#
# NOTE: If TLS/SSL is enabled for Cluster Bus, mutual authentication is always
# enforced.
# #
# tls-cluster yes # tls-cluster yes

View File

@ -1594,6 +1594,7 @@ void addACLLogEntry(client *c, int reason, int keypos, sds username) {
/* ACL -- show and modify the configuration of ACL users. /* ACL -- show and modify the configuration of ACL users.
* ACL HELP * ACL HELP
* ACL LOAD * ACL LOAD
* ACL SAVE
* ACL LIST * ACL LIST
* ACL USERS * ACL USERS
* ACL CAT [<category>] * ACL CAT [<category>]
@ -1857,6 +1858,7 @@ void aclCommand(client *c) {
} else if (!strcasecmp(sub,"help")) { } else if (!strcasecmp(sub,"help")) {
const char *help[] = { const char *help[] = {
"LOAD -- Reload users from the ACL file.", "LOAD -- Reload users from the ACL file.",
"SAVE -- Save the current config to the ACL file."
"LIST -- Show user details in config file format.", "LIST -- Show user details in config file format.",
"USERS -- List all the registered usernames.", "USERS -- List all the registered usernames.",
"SETUSER <username> [attribs ...] -- Create or modify a user.", "SETUSER <username> [attribs ...] -- Create or modify a user.",

View File

@ -257,6 +257,7 @@ void stopAppendOnly(void) {
g_pserver->aof_fd = -1; g_pserver->aof_fd = -1;
g_pserver->aof_selected_db = -1; g_pserver->aof_selected_db = -1;
g_pserver->aof_state = AOF_OFF; g_pserver->aof_state = AOF_OFF;
g_pserver->aof_rewrite_scheduled = 0;
killAppendOnlyChild(); killAppendOnlyChild();
} }

View File

@ -192,8 +192,9 @@ typedef struct typeInterface {
void (*init)(typeData data); void (*init)(typeData data);
/* Called on server start, should return 1 on success, 0 on error and should set err */ /* Called on server start, should return 1 on success, 0 on error and should set err */
int (*load)(typeData data, sds *argc, int argv, const char **err); int (*load)(typeData data, sds *argc, int argv, const char **err);
/* Called on CONFIG SET, returns 1 on success, 0 on error */ /* Called on server startup and CONFIG SET, returns 1 on success, 0 on error
int (*set)(typeData data, sds value, const char **err); * and can set a verbose err string, update is true when called from CONFIG SET */
int (*set)(typeData data, sds value, int update, const char **err);
/* Called on CONFIG GET, required to add output to the client */ /* Called on CONFIG GET, required to add output to the client */
void (*get)(client *c, typeData data); void (*get)(client *c, typeData data);
/* Called on CONFIG REWRITE, required to rewrite the config state */ /* Called on CONFIG REWRITE, required to rewrite the config state */
@ -332,7 +333,11 @@ void loadServerConfigFromString(char *config) {
if ((!strcasecmp(argv[0],config->name) || if ((!strcasecmp(argv[0],config->name) ||
(config->alias && !strcasecmp(argv[0],config->alias)))) (config->alias && !strcasecmp(argv[0],config->alias))))
{ {
if (!config->interface.load(config->data, argv, argc, &err)) { if (argc != 2) {
err = "wrong number of arguments";
goto loaderr;
}
if (!config->interface.set(config->data, argv[1], 0, &err)) {
goto loaderr; goto loaderr;
} }
@ -648,7 +653,7 @@ void configSetCommand(client *c) {
if(config->modifiable && (!strcasecmp(szFromObj(c->argv[2]),config->name) || if(config->modifiable && (!strcasecmp(szFromObj(c->argv[2]),config->name) ||
(config->alias && !strcasecmp(szFromObj(c->argv[2]),config->alias)))) (config->alias && !strcasecmp(szFromObj(c->argv[2]),config->alias))))
{ {
if (!config->interface.set(config->data,szFromObj(o), &errstr)) { if (!config->interface.set(config->data,szFromObj(o),1,&errstr)) {
goto badfmt; goto badfmt;
} }
addReply(c,shared.ok); addReply(c,shared.ok);
@ -1611,8 +1616,8 @@ static char loadbuf[LOADBUF_SIZE];
#define embedCommonConfig(config_name, config_alias, is_modifiable) \ #define embedCommonConfig(config_name, config_alias, is_modifiable) \
config_name, config_alias, is_modifiable, config_name, config_alias, is_modifiable,
#define embedConfigInterface(initfn, loadfn, setfn, getfn, rewritefn) { \ #define embedConfigInterface(initfn, setfn, getfn, rewritefn) { \
initfn, loadfn, setfn, getfn, rewritefn, \ initfn, nullptr, setfn, getfn, rewritefn, \
}, },
/* What follows is the generic config types that are supported. To add a new /* What follows is the generic config types that are supported. To add a new
@ -1632,31 +1637,19 @@ static void boolConfigInit(typeData data) {
*data.yesno.config = data.yesno.default_value; *data.yesno.config = data.yesno.default_value;
} }
static int boolConfigLoad(typeData data, sds *argv, int argc, const char **err) { static int boolConfigSet(typeData data, sds value, int update, const char **err) {
int yn;
if (argc != 2) {
*err = "wrong number of arguments";
return 0;
}
if ((yn = yesnotoi(argv[1])) == -1) {
if ((yn = truefalsetoi(argv[1])) == -1)
*err = "argument must be 'yes' or 'no'";
return 0;
}
if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err))
return 0;
*data.yesno.config = yn;
return 1;
}
static int boolConfigSet(typeData data, sds value, const char **err) {
int yn = yesnotoi(value); int yn = yesnotoi(value);
if (yn == -1) return 0; if (yn == -1) {
if ((yn = truefalsetoi(value)) == -1) {
*err = "argument must be 'yes' or 'no'";
return 0;
}
}
if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err)) if (data.yesno.is_valid_fn && !data.yesno.is_valid_fn(yn, err))
return 0; return 0;
int prev = *(data.yesno.config); int prev = *(data.yesno.config);
*(data.yesno.config) = yn; *(data.yesno.config) = yn;
if (data.yesno.update_fn && !data.yesno.update_fn(yn, prev, err)) { if (update && data.yesno.update_fn && !data.yesno.update_fn(yn, prev, err)) {
*(data.yesno.config) = prev; *(data.yesno.config) = prev;
return 0; return 0;
} }
@ -1675,7 +1668,7 @@ constexpr standardConfig createBoolConfig(const char *name, const char *alias, i
{ {
standardConfig conf = { standardConfig conf = {
embedCommonConfig(name, alias, modifiable) embedCommonConfig(name, alias, modifiable)
{ boolConfigInit, boolConfigLoad, boolConfigSet, boolConfigGet, boolConfigRewrite } { boolConfigInit, nullptr, boolConfigSet, boolConfigGet, boolConfigRewrite }
}; };
conf.data.yesno.config = &config_addr; conf.data.yesno.config = &config_addr;
conf.data.yesno.default_value = defaultValue; conf.data.yesno.default_value = defaultValue;
@ -1693,23 +1686,7 @@ static void stringConfigInit(typeData data) {
} }
} }
static int stringConfigLoad(typeData data, sds *argv, int argc, const char **err) { static int stringConfigSet(typeData data, sds value, int update, const char **err) {
if (argc != 2) {
*err = "wrong number of arguments";
return 0;
}
if (data.string.is_valid_fn && !data.string.is_valid_fn(argv[1], err))
return 0;
zfree(*data.string.config);
if (data.string.convert_empty_to_null) {
*data.string.config = argv[1][0] ? zstrdup(argv[1]) : NULL;
} else {
*data.string.config = zstrdup(argv[1]);
}
return 1;
}
static int stringConfigSet(typeData data, sds value, const char **err) {
if (data.string.is_valid_fn && !data.string.is_valid_fn(value, err)) if (data.string.is_valid_fn && !data.string.is_valid_fn(value, err))
return 0; return 0;
char *prev = *data.string.config; char *prev = *data.string.config;
@ -1718,7 +1695,7 @@ static int stringConfigSet(typeData data, sds value, const char **err) {
} else { } else {
*data.string.config = zstrdup(value); *data.string.config = zstrdup(value);
} }
if (data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) { if (update && data.string.update_fn && !data.string.update_fn(*data.string.config, prev, err)) {
zfree(*data.string.config); zfree(*data.string.config);
*data.string.config = prev; *data.string.config = prev;
return 0; return 0;
@ -1741,7 +1718,7 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC
constexpr standardConfig createStringConfig(const char *name, const char *alias, int modifiable, int empty_to_null, char *&config_addr, const char *defaultValue, int (*is_valid)(char*,const char**), int (*update)(char*,char*,const char**)) { constexpr standardConfig createStringConfig(const char *name, const char *alias, int modifiable, int empty_to_null, char *&config_addr, const char *defaultValue, int (*is_valid)(char*,const char**), int (*update)(char*,char*,const char**)) {
standardConfig conf = { standardConfig conf = {
embedCommonConfig(name, alias, modifiable) embedCommonConfig(name, alias, modifiable)
embedConfigInterface(stringConfigInit, stringConfigLoad, stringConfigSet, stringConfigGet, stringConfigRewrite) embedConfigInterface(stringConfigInit, stringConfigSet, stringConfigGet, stringConfigRewrite)
}; };
conf.data.string = { conf.data.string = {
&(config_addr), &(config_addr),
@ -1754,17 +1731,12 @@ constexpr standardConfig createStringConfig(const char *name, const char *alias,
} }
/* Enum configs */ /* Enum configs */
static void configEnumInit(typeData data) { static void enumConfigInit(typeData data) {
*data.enumd.config = data.enumd.default_value; *data.enumd.config = data.enumd.default_value;
} }
static int configEnumLoad(typeData data, sds *argv, int argc, const char **err) { static int enumConfigSet(typeData data, sds value, int update, const char **err) {
if (argc != 2) { int enumval = configEnumGetValue(data.enumd.enum_value, value);
*err = "wrong number of arguments";
return 0;
}
int enumval = configEnumGetValue(data.enumd.enum_value, argv[1]);
if (enumval == INT_MIN) { if (enumval == INT_MIN) {
sds enumerr = sdsnew("argument must be one of the following: "); sds enumerr = sdsnew("argument must be one of the following: ");
configEnum *enumNode = data.enumd.enum_value; configEnum *enumNode = data.enumd.enum_value;
@ -1784,38 +1756,29 @@ static int configEnumLoad(typeData data, sds *argv, int argc, const char **err)
*err = loadbuf; *err = loadbuf;
return 0; return 0;
} }
if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err))
return 0;
*(data.enumd.config) = enumval;
return 1;
}
static int configEnumSet(typeData data, sds value, const char **err) {
int enumval = configEnumGetValue(data.enumd.enum_value, value);
if (enumval == INT_MIN) return 0;
if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err)) if (data.enumd.is_valid_fn && !data.enumd.is_valid_fn(enumval, err))
return 0; return 0;
int prev = *(data.enumd.config); int prev = *(data.enumd.config);
*(data.enumd.config) = enumval; *(data.enumd.config) = enumval;
if (data.enumd.update_fn && !data.enumd.update_fn(enumval, prev, err)) { if (update && data.enumd.update_fn && !data.enumd.update_fn(enumval, prev, err)) {
*(data.enumd.config) = prev; *(data.enumd.config) = prev;
return 0; return 0;
} }
return 1; return 1;
} }
static void configEnumGet(client *c, typeData data) { static void enumConfigGet(client *c, typeData data) {
addReplyBulkCString(c, configEnumGetNameOrUnknown(data.enumd.enum_value,*data.enumd.config)); addReplyBulkCString(c, configEnumGetNameOrUnknown(data.enumd.enum_value,*data.enumd.config));
} }
static void configEnumRewrite(typeData data, const char *name, struct rewriteConfigState *state) { static void enumConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) {
rewriteConfigEnumOption(state, name,*(data.enumd.config), data.enumd.enum_value, data.enumd.default_value); rewriteConfigEnumOption(state, name,*(data.enumd.config), data.enumd.enum_value, data.enumd.default_value);
} }
constexpr standardConfig createEnumConfig(const char *name, const char *alias, int modifiable, configEnum *enumVal, int &config_addr, int defaultValue, int (*is_valid)(int,const char**), int (*update)(int,int,const char**)) { constexpr standardConfig createEnumConfig(const char *name, const char *alias, int modifiable, configEnum *enumVal, int &config_addr, int defaultValue, int (*is_valid)(int,const char**), int (*update)(int,int,const char**)) {
standardConfig c = { standardConfig c = {
embedCommonConfig(name, alias, modifiable) embedCommonConfig(name, alias, modifiable)
embedConfigInterface(configEnumInit, configEnumLoad, configEnumSet, configEnumGet, configEnumRewrite) embedConfigInterface(enumConfigInit, enumConfigSet, enumConfigGet, enumConfigRewrite)
}; };
c.data.enumd = { c.data.enumd = {
&(config_addr), &(config_addr),
@ -1913,49 +1876,22 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) {
return 1; return 1;
} }
static int numericConfigLoad(typeData data, sds *argv, int argc, const char **err) { static int numericConfigSet(typeData data, sds value, int update, const char **err) {
long long ll; long long ll, prev = 0;
if (argc != 2) {
*err = "wrong number of arguments";
return 0;
}
if (data.numeric.is_memory) { if (data.numeric.is_memory) {
int memerr; int memerr;
ll = memtoll(argv[1], &memerr); ll = memtoll(value, &memerr);
if (memerr || ll < 0) { if (memerr || ll < 0) {
*err = "argument must be a memory value"; *err = "argument must be a memory value";
return 0; return 0;
} }
} else { } else {
if (!string2ll(argv[1], sdslen(argv[1]),&ll)) { if (!string2ll(value, sdslen(value),&ll)) {
*err = "argument couldn't be parsed into an integer" ; *err = "argument couldn't be parsed into an integer" ;
return 0; return 0;
} }
} }
if (!numericBoundaryCheck(data, ll, err))
return 0;
if (data.numeric.is_valid_fn && !data.numeric.is_valid_fn(ll, err))
return 0;
SET_NUMERIC_TYPE(ll)
return 1;
}
static int numericConfigSet(typeData data, sds value, const char **err) {
long long ll, prev = 0;
if (data.numeric.is_memory) {
int memerr;
ll = memtoll(value, &memerr);
if (memerr || ll < 0) return 0;
} else {
if (!string2ll(value, sdslen(value),&ll)) return 0;
}
if (!numericBoundaryCheck(data, ll, err)) if (!numericBoundaryCheck(data, ll, err))
return 0; return 0;
@ -1965,7 +1901,7 @@ static int numericConfigSet(typeData data, sds value, const char **err) {
GET_NUMERIC_TYPE(prev) GET_NUMERIC_TYPE(prev)
SET_NUMERIC_TYPE(ll) SET_NUMERIC_TYPE(ll)
if (data.numeric.update_fn && !data.numeric.update_fn(ll, prev, err)) { if (update && data.numeric.update_fn && !data.numeric.update_fn(ll, prev, err)) {
SET_NUMERIC_TYPE(prev) SET_NUMERIC_TYPE(prev)
return 0; return 0;
} }
@ -2000,7 +1936,7 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite
constexpr standardConfig embedCommonNumericalConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { constexpr standardConfig embedCommonNumericalConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) {
standardConfig conf = { standardConfig conf = {
embedCommonConfig(name, alias, modifiable) embedCommonConfig(name, alias, modifiable)
embedConfigInterface(numericConfigInit, numericConfigLoad, numericConfigSet, numericConfigGet, numericConfigRewrite) embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite)
}; };
conf.data.numeric.is_memory = (memory); conf.data.numeric.is_memory = (memory);
conf.data.numeric.lower_bound = (lower); conf.data.numeric.lower_bound = (lower);

View File

@ -1787,6 +1787,22 @@ int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numk
return keys; return keys;
} }
/* Helper function to extract keys from memory command.
* MEMORY USAGE <key> */
int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
int *keys;
UNUSED(cmd);
if (argc >= 3 && !strcasecmp(szFromObj(argv[1]),"usage")) {
keys = (int*)zmalloc(sizeof(int) * 1);
keys[0] = 2;
*numkeys = 1;
return keys;
}
*numkeys = 0;
return NULL;
}
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] /* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */ * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N */
int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) { int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {

View File

@ -364,6 +364,7 @@ void debugCommand(client *c) {
"CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.", "CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.",
"DIGEST -- Output a hex signature representing the current DB content.", "DIGEST -- Output a hex signature representing the current DB content.",
"DIGEST-VALUE <key-1> ... <key-N>-- Output a hex signature of the values of all the specified keys.", "DIGEST-VALUE <key-1> ... <key-N>-- Output a hex signature of the values of all the specified keys.",
"DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false]",
"ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.", "ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.",
"LOG <message> -- write message to the server log.", "LOG <message> -- write message to the server log.",
"HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.", "HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.",
@ -595,8 +596,8 @@ NULL
} }
} else if (!strcasecmp(szFromObj(c->argv[1]),"protocol") && c->argc == 3) { } else if (!strcasecmp(szFromObj(c->argv[1]),"protocol") && c->argc == 3) {
/* DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map| /* DEBUG PROTOCOL [string|integer|double|bignum|null|array|set|map|
* attrib|push|verbatim|true|false|state|err|bloberr] */ * attrib|push|verbatim|true|false] */
char *name = szFromObj(c->argv[2]); const char *name = szFromObj(c->argv[2]);
if (!strcasecmp(name,"string")) { if (!strcasecmp(name,"string")) {
addReplyBulkCString(c,"Hello World"); addReplyBulkCString(c,"Hello World");
} else if (!strcasecmp(name,"integer")) { } else if (!strcasecmp(name,"integer")) {
@ -643,7 +644,7 @@ NULL
} else if (!strcasecmp(name,"verbatim")) { } else if (!strcasecmp(name,"verbatim")) {
addReplyVerbatim(c,"This is a verbatim\nstring",25,"txt"); addReplyVerbatim(c,"This is a verbatim\nstring",25,"txt");
} else { } else {
addReplyError(c,"Wrong protocol type name. Please use one of the following: string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false|state|err|bloberr"); addReplyError(c,"Wrong protocol type name. Please use one of the following: string|integer|double|bignum|null|array|set|map|attrib|push|verbatim|true|false");
} }
} else if (!strcasecmp(szFromObj(c->argv[1]),"sleep") && c->argc == 3) { } else if (!strcasecmp(szFromObj(c->argv[1]),"sleep") && c->argc == 3) {
double dtime = strtod(szFromObj(c->argv[2]),NULL); double dtime = strtod(szFromObj(c->argv[2]),NULL);

View File

@ -485,10 +485,11 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
* Where the master must propagate the first change even if the second * Where the master must propagate the first change even if the second
* will produce an error. However it is useful to log such events since * will produce an error. However it is useful to log such events since
* they are rare and may hint at errors in a script or a bug in Redis. */ * they are rare and may hint at errors in a script or a bug in Redis. */
if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { int ctype = getClientType(c);
const char* to = reinterpret_cast<const char*>(c->flags & CLIENT_MASTER? "master": "replica"); if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE) {
const char* from = reinterpret_cast<const char*>(c->flags & CLIENT_MASTER? "replica": "master"); const char* to = ctype == CLIENT_TYPE_MASTER? "master": "replica";
const char *cmdname = reinterpret_cast<const char*>(c->lastcmd ? c->lastcmd->name : "<unknown>"); const char* from = ctype == CLIENT_TYPE_MASTER? "replica": "master";
const char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its %s: '%s' after processing the command " "to its %s: '%s' after processing the command "
"'%s'", from, to, s, cmdname); "'%s'", from, to, s, cmdname);
@ -1492,7 +1493,7 @@ bool freeClient(client *c) {
} }
/* Log link disconnection with replica */ /* Log link disconnection with replica */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { if (getClientType(c) == CLIENT_TYPE_SLAVE) {
serverLog(LL_WARNING,"Connection with replica %s lost.", serverLog(LL_WARNING,"Connection with replica %s lost.",
replicationGetSlaveName(c)); replicationGetSlaveName(c));
} }
@ -1539,7 +1540,7 @@ bool freeClient(client *c) {
/* We need to remember the time when we started to have zero /* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication * attached slaves, as after some time we'll free the replication
* backlog. */ * backlog. */
if (c->flags & CLIENT_SLAVE && listLength(g_pserver->slaves) == 0) if (getClientType(c) == CLIENT_TYPE_SLAVE && listLength(g_pserver->slaves) == 0)
g_pserver->repl_no_slaves_since = g_pserver->unixtime; g_pserver->repl_no_slaves_since = g_pserver->unixtime;
refreshGoodSlavesCount(); refreshGoodSlavesCount();
/* Fire the replica change modules event. */ /* Fire the replica change modules event. */
@ -1689,8 +1690,8 @@ int writeToClient(client *c, int handler_installed) {
* just deliver as much data as it is possible to deliver. * just deliver as much data as it is possible to deliver.
* *
* Moreover, we also send as much as possible if the client is * Moreover, we also send as much as possible if the client is
* a replica (otherwise, on high-speed traffic, the replication * a replica or a monitor (otherwise, on high-speed traffic, the
* buffer will grow indefinitely) */ * replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT && if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(g_pserver->maxmemory == 0 || (g_pserver->maxmemory == 0 ||
zmalloc_used_memory() < g_pserver->maxmemory) && zmalloc_used_memory() < g_pserver->maxmemory) &&
@ -2000,7 +2001,7 @@ int processInlineBuffer(client *c) {
/* Newline from slaves can be used to refresh the last ACK time. /* Newline from slaves can be used to refresh the last ACK time.
* This is useful for a replica to ping back while loading a big * This is useful for a replica to ping back while loading a big
* RDB file. */ * RDB file. */
if (querylen == 0 && c->flags & CLIENT_SLAVE) if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
c->repl_ack_time = g_pserver->unixtime; c->repl_ack_time = g_pserver->unixtime;
/* Move querybuffer position to the next query in the buffer. */ /* Move querybuffer position to the next query in the buffer. */
@ -3008,12 +3009,14 @@ unsigned long getClientOutputBufferMemoryUsage(client *c) {
* *
* The function will return one of the following: * The function will return one of the following:
* CLIENT_TYPE_NORMAL -> Normal client * CLIENT_TYPE_NORMAL -> Normal client
* CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command * CLIENT_TYPE_SLAVE -> Slave
* CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels * CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
* CLIENT_TYPE_MASTER -> The client representing our replication master. * CLIENT_TYPE_MASTER -> The client representing our replication master.
*/ */
int getClientType(client *c) { int getClientType(client *c) {
if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER; if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
/* Even though MONITOR clients are marked as replicas, we
* want the expose them as normal clients. */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
return CLIENT_TYPE_SLAVE; return CLIENT_TYPE_SLAVE;
if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB; if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;

View File

@ -1023,40 +1023,29 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mh->repl_backlog = mem; mh->repl_backlog = mem;
mem_total += mem; mem_total += mem;
mem = 0;
if (listLength(g_pserver->slaves)) {
listIter li;
listNode *ln;
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *c = (client*)listNodeValue(ln);
if (c->flags & CLIENT_CLOSE_ASAP)
continue;
mem += getClientOutputBufferMemoryUsage(c);
mem += sdsAllocSize(c->querybuf);
mem += sizeof(client);
}
}
mh->clients_slaves = mem;
mem_total+=mem;
mem = 0; mem = 0;
if (listLength(g_pserver->clients)) { if (listLength(g_pserver->clients)) {
listIter li; listIter li;
listNode *ln; listNode *ln;
size_t mem_normal = 0, mem_slaves = 0;
listRewind(g_pserver->clients,&li); listRewind(g_pserver->clients,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
size_t mem_curr = 0;
client *c = (client*)listNodeValue(ln); client *c = (client*)listNodeValue(ln);
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) int type = getClientType(c);
continue; mem_curr += getClientOutputBufferMemoryUsage(c);
mem += getClientOutputBufferMemoryUsage(c); mem_curr += sdsAllocSize(c->querybuf);
mem += sdsAllocSize(c->querybuf); mem_curr += sizeof(client);
mem += sizeof(client); if (type == CLIENT_TYPE_SLAVE)
mem_slaves += mem_curr;
else
mem_normal += mem_curr;
} }
mh->clients_slaves = mem_slaves;
mh->clients_normal = mem_normal;
mem = mem_slaves + mem_normal;
} }
mh->clients_normal = mem;
mem_total+=mem; mem_total+=mem;
mem = 0; mem = 0;

View File

@ -846,7 +846,7 @@ struct redisCommand redisCommandTable[] = {
{"memory",memoryCommand,-2, {"memory",memoryCommand,-2,
"random read-only", "random read-only",
0,NULL,0,0,0,0,0,0}, 0,memoryGetKeys,0,0,0,0,0,0},
{"client",clientCommand,-2, {"client",clientCommand,-2,
"admin no-script random @connection", "admin no-script random @connection",
@ -1534,7 +1534,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
time_t now = now_ms/1000; time_t now = now_ms/1000;
if (cserver.maxidletime && if (cserver.maxidletime &&
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */ !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves and monitors */
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */ !(c->flags & CLIENT_MASTER) && /* no timeout for masters */
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */ !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */ !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */

View File

@ -2665,6 +2665,7 @@ int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys); int *xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
int *memoryGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
/* Cluster */ /* Cluster */
void clusterInit(void); void clusterInit(void);