Precise timeouts: use only radix tree for timeouts.
This commit is contained in:
parent
077f965426
commit
aa9d92d94a
@ -111,7 +111,7 @@ void blockClient(client *c, int btype) {
|
|||||||
c->btype = btype;
|
c->btype = btype;
|
||||||
server.blocked_clients++;
|
server.blocked_clients++;
|
||||||
server.blocked_clients_by_type[btype]++;
|
server.blocked_clients_by_type[btype]++;
|
||||||
addClientToShortTimeoutTable(c);
|
addClientToTimeoutTable(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called in the beforeSleep() function of the event loop
|
/* This function is called in the beforeSleep() function of the event loop
|
||||||
|
46
src/server.c
46
src/server.c
@ -1511,13 +1511,6 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
|
|||||||
freeClient(c);
|
freeClient(c);
|
||||||
return 1;
|
return 1;
|
||||||
} else if (c->flags & CLIENT_BLOCKED) {
|
} else if (c->flags & CLIENT_BLOCKED) {
|
||||||
/* Blocked OPS timeout is handled with milliseconds resolution.
|
|
||||||
* However note that the actual resolution is limited by
|
|
||||||
* server.hz. So for short timeouts (less than SERVER_SHORT_TIMEOUT
|
|
||||||
* milliseconds) we populate a Radix tree and handle such timeouts
|
|
||||||
* in clientsHandleShortTimeout(). */
|
|
||||||
if (checkBlockedClientTimeout(c,now_ms)) return 0;
|
|
||||||
|
|
||||||
/* Cluster: handle unblock & redirect of clients blocked
|
/* Cluster: handle unblock & redirect of clients blocked
|
||||||
* into keys no longer served by this server. */
|
* into keys no longer served by this server. */
|
||||||
if (server.cluster_enabled) {
|
if (server.cluster_enabled) {
|
||||||
@ -1528,8 +1521,8 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For shor timeouts, less than < CLIENT_SHORT_TIMEOUT milliseconds, we
|
/* For blocked clients timeouts we populate a radix tree of 128 bit keys
|
||||||
* populate a radix tree of 128 bit keys composed as such:
|
* composed as such:
|
||||||
*
|
*
|
||||||
* [8 byte big endian expire time]+[8 byte client ID]
|
* [8 byte big endian expire time]+[8 byte client ID]
|
||||||
*
|
*
|
||||||
@ -1538,16 +1531,8 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
|
|||||||
* blocked with such timeout, we just go forward.
|
* blocked with such timeout, we just go forward.
|
||||||
*
|
*
|
||||||
* Every time a client blocks with a short timeout, we add the client in
|
* Every time a client blocks with a short timeout, we add the client in
|
||||||
* the tree. In beforeSleep() we call clientsHandleShortTimeout() to run
|
* the tree. In beforeSleep() we call clientsHandleTimeout() to run
|
||||||
* the tree and unblock the clients.
|
* the tree and unblock the clients. */
|
||||||
*
|
|
||||||
* Design hint: why we block only clients with short timeouts? For frugality:
|
|
||||||
* Clients blocking for 30 seconds usually don't need to be unblocked
|
|
||||||
* precisely, and anyway for the nature of Redis to *guarantee* unblock time
|
|
||||||
* precision is hard, so we can avoid putting a large number of clients in
|
|
||||||
* the radix tree without a good reason. This idea also has a role in memory
|
|
||||||
* usage as well given that we don't do cleanup, the shorter a client timeout,
|
|
||||||
* the less time it will stay in the radix tree. */
|
|
||||||
|
|
||||||
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
|
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
|
||||||
|
|
||||||
@ -1568,13 +1553,9 @@ void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
|
|||||||
|
|
||||||
/* Add the specified client id / timeout as a key in the radix tree we use
|
/* Add the specified client id / timeout as a key in the radix tree we use
|
||||||
* to handle short timeouts. The client is not added to the list if its
|
* to handle short timeouts. The client is not added to the list if its
|
||||||
* timeout is longer than CLIENT_SHORT_TIMEOUT milliseconds. */
|
* timeout is zero (block forever). */
|
||||||
void addClientToShortTimeoutTable(client *c) {
|
void addClientToTimeoutTable(client *c) {
|
||||||
if (c->bpop.timeout == 0 ||
|
if (c->bpop.timeout == 0) return;
|
||||||
c->bpop.timeout - mstime() > CLIENT_SHORT_TIMEOUT)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
uint64_t timeout = c->bpop.timeout;
|
uint64_t timeout = c->bpop.timeout;
|
||||||
uint64_t id = c->id;
|
uint64_t id = c->id;
|
||||||
unsigned char buf[CLIENT_ST_KEYLEN];
|
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||||
@ -1584,7 +1565,7 @@ void addClientToShortTimeoutTable(client *c) {
|
|||||||
|
|
||||||
/* This function is called in beforeSleep() in order to unblock ASAP clients
|
/* This function is called in beforeSleep() in order to unblock ASAP clients
|
||||||
* that are waiting in blocking operations with a short timeout set. */
|
* that are waiting in blocking operations with a short timeout set. */
|
||||||
void clientsHandleShortTimeout(void) {
|
void clientsHandleTimeout(void) {
|
||||||
if (raxSize(server.clients_timeout_table) == 0) return;
|
if (raxSize(server.clients_timeout_table) == 0) return;
|
||||||
uint64_t now = mstime();
|
uint64_t now = mstime();
|
||||||
raxIterator ri;
|
raxIterator ri;
|
||||||
@ -1747,9 +1728,6 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
|
|||||||
*/
|
*/
|
||||||
#define CLIENTS_CRON_MIN_ITERATIONS 5
|
#define CLIENTS_CRON_MIN_ITERATIONS 5
|
||||||
void clientsCron(void) {
|
void clientsCron(void) {
|
||||||
/* Unblock short timeout clients ASAP. */
|
|
||||||
clientsHandleShortTimeout();
|
|
||||||
|
|
||||||
/* Try to process at least numclients/server.hz of clients
|
/* Try to process at least numclients/server.hz of clients
|
||||||
* per call. Since normally (if there are no big latency events) this
|
* per call. Since normally (if there are no big latency events) this
|
||||||
* function is called server.hz times per second, in the average case we
|
* function is called server.hz times per second, in the average case we
|
||||||
@ -2189,7 +2167,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
UNUSED(eventLoop);
|
UNUSED(eventLoop);
|
||||||
|
|
||||||
/* Handle precise timeouts of blocked clients. */
|
/* Handle precise timeouts of blocked clients. */
|
||||||
clientsHandleShortTimeout();
|
clientsHandleTimeout();
|
||||||
|
|
||||||
/* We should handle pending reads clients ASAP after event loop. */
|
/* We should handle pending reads clients ASAP after event loop. */
|
||||||
handleClientsWithPendingReadsUsingThreads();
|
handleClientsWithPendingReadsUsingThreads();
|
||||||
@ -4101,11 +4079,13 @@ sds genRedisInfoString(const char *section) {
|
|||||||
"client_recent_max_input_buffer:%zu\r\n"
|
"client_recent_max_input_buffer:%zu\r\n"
|
||||||
"client_recent_max_output_buffer:%zu\r\n"
|
"client_recent_max_output_buffer:%zu\r\n"
|
||||||
"blocked_clients:%d\r\n"
|
"blocked_clients:%d\r\n"
|
||||||
"tracking_clients:%d\r\n",
|
"tracking_clients:%d\r\n"
|
||||||
|
"clients_in_timeout_table:%lld\r\n",
|
||||||
listLength(server.clients)-listLength(server.slaves),
|
listLength(server.clients)-listLength(server.slaves),
|
||||||
maxin, maxout,
|
maxin, maxout,
|
||||||
server.blocked_clients,
|
server.blocked_clients,
|
||||||
server.tracking_clients);
|
server.tracking_clients,
|
||||||
|
raxSize(server.clients_timeout_table));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Memory */
|
/* Memory */
|
||||||
|
@ -277,9 +277,6 @@ typedef long long ustime_t; /* microsecond time type. */
|
|||||||
buffer configuration. Just the first
|
buffer configuration. Just the first
|
||||||
three: normal, slave, pubsub. */
|
three: normal, slave, pubsub. */
|
||||||
|
|
||||||
/* Other client related defines. */
|
|
||||||
#define CLIENT_SHORT_TIMEOUT 2000 /* See clientsHandleShortTimeout(). */
|
|
||||||
|
|
||||||
/* Slave replication state. Used in server.repl_state for slaves to remember
|
/* Slave replication state. Used in server.repl_state for slaves to remember
|
||||||
* what to do next. */
|
* what to do next. */
|
||||||
#define REPL_STATE_NONE 0 /* No active replication */
|
#define REPL_STATE_NONE 0 /* No active replication */
|
||||||
@ -2140,7 +2137,7 @@ void disconnectAllBlockedClients(void);
|
|||||||
void handleClientsBlockedOnKeys(void);
|
void handleClientsBlockedOnKeys(void);
|
||||||
void signalKeyAsReady(redisDb *db, robj *key);
|
void signalKeyAsReady(redisDb *db, robj *key);
|
||||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
||||||
void addClientToShortTimeoutTable(client *c);
|
void addClientToTimeoutTable(client *c);
|
||||||
|
|
||||||
/* expire.c -- Handling of expired keys */
|
/* expire.c -- Handling of expired keys */
|
||||||
void activeExpireCycle(int type);
|
void activeExpireCycle(int type);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user