CLIENT_MASTER introduced.
This commit is contained in:
parent
c1e94b6b9c
commit
e6f39338e6
@ -90,7 +90,7 @@ configEnum aof_fsync_enum[] = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
/* Output buffer limits presets. */
|
/* Output buffer limits presets. */
|
||||||
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_COUNT] = {
|
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
|
||||||
{0, 0, 0}, /* normal */
|
{0, 0, 0}, /* normal */
|
||||||
{1024*1024*256, 1024*1024*64, 60}, /* slave */
|
{1024*1024*256, 1024*1024*64, 60}, /* slave */
|
||||||
{1024*1024*32, 1024*1024*8, 60} /* pubsub */
|
{1024*1024*32, 1024*1024*8, 60} /* pubsub */
|
||||||
@ -1163,13 +1163,13 @@ void configGetCommand(client *c) {
|
|||||||
sds buf = sdsempty();
|
sds buf = sdsempty();
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
for (j = 0; j < CLIENT_TYPE_COUNT; j++) {
|
for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++) {
|
||||||
buf = sdscatprintf(buf,"%s %llu %llu %ld",
|
buf = sdscatprintf(buf,"%s %llu %llu %ld",
|
||||||
getClientTypeName(j),
|
getClientTypeName(j),
|
||||||
server.client_obuf_limits[j].hard_limit_bytes,
|
server.client_obuf_limits[j].hard_limit_bytes,
|
||||||
server.client_obuf_limits[j].soft_limit_bytes,
|
server.client_obuf_limits[j].soft_limit_bytes,
|
||||||
(long) server.client_obuf_limits[j].soft_limit_seconds);
|
(long) server.client_obuf_limits[j].soft_limit_seconds);
|
||||||
if (j != CLIENT_TYPE_COUNT-1)
|
if (j != CLIENT_TYPE_OBUF_COUNT-1)
|
||||||
buf = sdscatlen(buf," ",1);
|
buf = sdscatlen(buf," ",1);
|
||||||
}
|
}
|
||||||
addReplyBulkCString(c,"client-output-buffer-limit");
|
addReplyBulkCString(c,"client-output-buffer-limit");
|
||||||
@ -1566,7 +1566,7 @@ void rewriteConfigClientoutputbufferlimitOption(struct rewriteConfigState *state
|
|||||||
int j;
|
int j;
|
||||||
char *option = "client-output-buffer-limit";
|
char *option = "client-output-buffer-limit";
|
||||||
|
|
||||||
for (j = 0; j < CLIENT_TYPE_COUNT; j++) {
|
for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++) {
|
||||||
int force = (server.client_obuf_limits[j].hard_limit_bytes !=
|
int force = (server.client_obuf_limits[j].hard_limit_bytes !=
|
||||||
clientBufferLimitsDefaults[j].hard_limit_bytes) ||
|
clientBufferLimitsDefaults[j].hard_limit_bytes) ||
|
||||||
(server.client_obuf_limits[j].soft_limit_bytes !=
|
(server.client_obuf_limits[j].soft_limit_bytes !=
|
||||||
|
@ -1567,12 +1567,13 @@ unsigned long getClientOutputBufferMemoryUsage(client *c) {
|
|||||||
* CLIENT_TYPE_NORMAL -> Normal client
|
* CLIENT_TYPE_NORMAL -> Normal client
|
||||||
* CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command
|
* CLIENT_TYPE_SLAVE -> Slave or client executing MONITOR command
|
||||||
* CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
|
* CLIENT_TYPE_PUBSUB -> Client subscribed to Pub/Sub channels
|
||||||
|
* CLIENT_TYPE_MASTER -> The client representing our replication master.
|
||||||
*/
|
*/
|
||||||
int getClientType(client *c) {
|
int getClientType(client *c) {
|
||||||
|
if (c->flags & CLIENT_MASTER) return CLIENT_TYPE_MASTER;
|
||||||
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
|
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR))
|
||||||
return CLIENT_TYPE_SLAVE;
|
return CLIENT_TYPE_SLAVE;
|
||||||
if (c->flags & CLIENT_PUBSUB)
|
if (c->flags & CLIENT_PUBSUB) return CLIENT_TYPE_PUBSUB;
|
||||||
return CLIENT_TYPE_PUBSUB;
|
|
||||||
return CLIENT_TYPE_NORMAL;
|
return CLIENT_TYPE_NORMAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1580,6 +1581,7 @@ int getClientTypeByName(char *name) {
|
|||||||
if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
|
if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
|
||||||
else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
|
else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
|
||||||
else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
|
else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
|
||||||
|
else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
|
||||||
else return -1;
|
else return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1588,6 +1590,7 @@ char *getClientTypeName(int class) {
|
|||||||
case CLIENT_TYPE_NORMAL: return "normal";
|
case CLIENT_TYPE_NORMAL: return "normal";
|
||||||
case CLIENT_TYPE_SLAVE: return "slave";
|
case CLIENT_TYPE_SLAVE: return "slave";
|
||||||
case CLIENT_TYPE_PUBSUB: return "pubsub";
|
case CLIENT_TYPE_PUBSUB: return "pubsub";
|
||||||
|
case CLIENT_TYPE_MASTER: return "master";
|
||||||
default: return NULL;
|
default: return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1603,6 +1606,10 @@ int checkClientOutputBufferLimits(client *c) {
|
|||||||
unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
|
unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
|
||||||
|
|
||||||
class = getClientType(c);
|
class = getClientType(c);
|
||||||
|
/* For the purpose of output buffer limiting, masters are handled
|
||||||
|
* like normal clients. */
|
||||||
|
if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
|
||||||
|
|
||||||
if (server.client_obuf_limits[class].hard_limit_bytes &&
|
if (server.client_obuf_limits[class].hard_limit_bytes &&
|
||||||
used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
|
used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
|
||||||
hard = 1;
|
hard = 1;
|
||||||
|
@ -1533,7 +1533,7 @@ void initServerConfig(void) {
|
|||||||
server.repl_no_slaves_since = time(NULL);
|
server.repl_no_slaves_since = time(NULL);
|
||||||
|
|
||||||
/* Client output buffer limits */
|
/* Client output buffer limits */
|
||||||
for (j = 0; j < CLIENT_TYPE_COUNT; j++)
|
for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
|
||||||
server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
|
server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
|
||||||
|
|
||||||
/* Double constants initialization */
|
/* Double constants initialization */
|
||||||
|
@ -276,7 +276,10 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define CLIENT_TYPE_NORMAL 0 /* Normal req-reply clients + MONITORs */
|
#define CLIENT_TYPE_NORMAL 0 /* Normal req-reply clients + MONITORs */
|
||||||
#define CLIENT_TYPE_SLAVE 1 /* Slaves. */
|
#define CLIENT_TYPE_SLAVE 1 /* Slaves. */
|
||||||
#define CLIENT_TYPE_PUBSUB 2 /* Clients subscribed to PubSub channels. */
|
#define CLIENT_TYPE_PUBSUB 2 /* Clients subscribed to PubSub channels. */
|
||||||
#define CLIENT_TYPE_COUNT 3
|
#define CLIENT_TYPE_MASTER 3 /* Master. */
|
||||||
|
#define CLIENT_TYPE_OBUF_COUNT 3 /* Number of clients to expose to output
|
||||||
|
buffer configuration. Just the first
|
||||||
|
three: normal, slave, pubsub. */
|
||||||
|
|
||||||
/* Slave replication state. Used in server.repl_state for slaves to remember
|
/* Slave replication state. Used in server.repl_state for slaves to remember
|
||||||
* what to do next. */
|
* what to do next. */
|
||||||
@ -625,7 +628,7 @@ typedef struct clientBufferLimitsConfig {
|
|||||||
time_t soft_limit_seconds;
|
time_t soft_limit_seconds;
|
||||||
} clientBufferLimitsConfig;
|
} clientBufferLimitsConfig;
|
||||||
|
|
||||||
extern clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_COUNT];
|
extern clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT];
|
||||||
|
|
||||||
/* The redisOp structure defines a Redis Operation, that is an instance of
|
/* The redisOp structure defines a Redis Operation, that is an instance of
|
||||||
* a command with an argument vector, database ID, propagation target
|
* a command with an argument vector, database ID, propagation target
|
||||||
@ -751,7 +754,7 @@ struct redisServer {
|
|||||||
int supervised; /* 1 if supervised, 0 otherwise. */
|
int supervised; /* 1 if supervised, 0 otherwise. */
|
||||||
int supervised_mode; /* See SUPERVISED_* */
|
int supervised_mode; /* See SUPERVISED_* */
|
||||||
int daemonize; /* True if running as a daemon */
|
int daemonize; /* True if running as a daemon */
|
||||||
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_COUNT];
|
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
|
||||||
/* AOF persistence */
|
/* AOF persistence */
|
||||||
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
|
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
|
||||||
int aof_fsync; /* Kind of fsync() policy */
|
int aof_fsync; /* Kind of fsync() policy */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user