Precise timeouts: working initial implementation.
This commit is contained in:
parent
b636856df0
commit
cedeec01f2
@ -619,6 +619,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
|
|||||||
listAddNodeTail(l,c);
|
listAddNodeTail(l,c);
|
||||||
}
|
}
|
||||||
blockClient(c,btype);
|
blockClient(c,btype);
|
||||||
|
addClientToShortTimeoutTable(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
|
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
|
||||||
|
135
src/server.c
135
src/server.c
@ -1473,34 +1473,7 @@ int allPersistenceDisabled(void) {
|
|||||||
return server.saveparamslen == 0 && server.aof_state == AOF_OFF;
|
return server.saveparamslen == 0 && server.aof_state == AOF_OFF;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ======================= Cron: called every 100 ms ======================== */
|
/* ========================== Clients timeouts ============================= */
|
||||||
|
|
||||||
/* Add a sample to the operations per second array of samples. */
|
|
||||||
void trackInstantaneousMetric(int metric, long long current_reading) {
|
|
||||||
long long t = mstime() - server.inst_metric[metric].last_sample_time;
|
|
||||||
long long ops = current_reading -
|
|
||||||
server.inst_metric[metric].last_sample_count;
|
|
||||||
long long ops_sec;
|
|
||||||
|
|
||||||
ops_sec = t > 0 ? (ops*1000/t) : 0;
|
|
||||||
|
|
||||||
server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
|
|
||||||
ops_sec;
|
|
||||||
server.inst_metric[metric].idx++;
|
|
||||||
server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
|
|
||||||
server.inst_metric[metric].last_sample_time = mstime();
|
|
||||||
server.inst_metric[metric].last_sample_count = current_reading;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Return the mean of all the samples. */
|
|
||||||
long long getInstantaneousMetric(int metric) {
|
|
||||||
int j;
|
|
||||||
long long sum = 0;
|
|
||||||
|
|
||||||
for (j = 0; j < STATS_METRIC_SAMPLES; j++)
|
|
||||||
sum += server.inst_metric[metric].samples[j];
|
|
||||||
return sum / STATS_METRIC_SAMPLES;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Check if this blocked client timedout (does nothing if the client is
|
/* Check if this blocked client timedout (does nothing if the client is
|
||||||
* not blocked right now). If so send a reply, unblock it, and return 1.
|
* not blocked right now). If so send a reply, unblock it, and return 1.
|
||||||
@ -1555,6 +1528,107 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* For shor timeouts, less than < CLIENT_SHORT_TIMEOUT milliseconds, we
|
||||||
|
* populate a radix tree of 128 bit keys composed as such:
|
||||||
|
*
|
||||||
|
* [8 byte big endian expire time]+[8 byte client ID]
|
||||||
|
*
|
||||||
|
* We don't do any cleanup in the Radix tree: when we run the clients that
|
||||||
|
* reached the timeout already, if they are no longer existing or no longer
|
||||||
|
* blocked with such timeout, we just go forward.
|
||||||
|
*
|
||||||
|
* Every time a client blocks with a short timeout, we add the client in
|
||||||
|
* the tree. In beforeSleep() we call clientsHandleShortTimeout() to run
|
||||||
|
* the tree and unblock the clients.
|
||||||
|
*
|
||||||
|
* Design hint: why we block only clients with short timeouts? For frugality:
|
||||||
|
* Clients blocking for 30 seconds usually don't need to be unblocked
|
||||||
|
* precisely, and anyway for the nature of Redis to *guarantee* unblock time
|
||||||
|
* precision is hard, so we can avoid putting a large number of clients in
|
||||||
|
* the radix tree without a good reason. This idea also has a role in memory
|
||||||
|
* usage as well given that we don't do cleanup, the shorter a client timeout,
|
||||||
|
* the less time it will stay in the radix tree. */
|
||||||
|
|
||||||
|
#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */
|
||||||
|
|
||||||
|
/* Given client ID and timeout, write the resulting radix tree key in buf. */
|
||||||
|
void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) {
|
||||||
|
timeout = htonu64(timeout);
|
||||||
|
memcpy(buf,&timeout,sizeof(timeout));
|
||||||
|
memcpy(buf+8,&id,sizeof(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write
|
||||||
|
* the timeout into *toptr and the client ID into *idptr. */
|
||||||
|
void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) {
|
||||||
|
memcpy(toptr,buf,sizeof(*toptr));
|
||||||
|
*toptr = ntohu64(*toptr);
|
||||||
|
memcpy(idptr,buf+8,sizeof(*idptr));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Add the specified client id / timeout as a key in the radix tree we use
|
||||||
|
* to handle short timeouts. The client is not added to the list if its
|
||||||
|
* timeout is longer than CLIENT_SHORT_TIMEOUT milliseconds. */
|
||||||
|
void addClientToShortTimeoutTable(client *c) {
|
||||||
|
if (c->bpop.timeout == 0 ||
|
||||||
|
c->bpop.timeout - mstime() > CLIENT_SHORT_TIMEOUT)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
uint64_t timeout = c->bpop.timeout;
|
||||||
|
uint64_t id = c->id;
|
||||||
|
unsigned char buf[CLIENT_ST_KEYLEN];
|
||||||
|
encodeTimeoutKey(buf,timeout,id);
|
||||||
|
raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This function is called in beforeSleep() in order to unblock ASAP clients
|
||||||
|
* that are waiting in blocking operations with a short timeout set. */
|
||||||
|
void clientsHandleShortTimeout(void) {
|
||||||
|
uint64_t now = mstime();
|
||||||
|
raxIterator ri;
|
||||||
|
raxStart(&ri,server.clients_timeout_table);
|
||||||
|
|
||||||
|
while(raxNext(&ri)) {
|
||||||
|
uint64_t id, timeout;
|
||||||
|
decodeTimeoutKey(ri.key,&timeout,&id);
|
||||||
|
if (timeout >= now) break; /* All the timeouts are in the future. */
|
||||||
|
client *c = lookupClientByID(id);
|
||||||
|
if (c) checkBlockedClientTimeout(c,now);
|
||||||
|
raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL);
|
||||||
|
raxSeek(&ri,"^",NULL,0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ======================= Cron: called every 100 ms ======================== */
|
||||||
|
|
||||||
|
/* Add a sample to the operations per second array of samples. */
|
||||||
|
void trackInstantaneousMetric(int metric, long long current_reading) {
|
||||||
|
long long t = mstime() - server.inst_metric[metric].last_sample_time;
|
||||||
|
long long ops = current_reading -
|
||||||
|
server.inst_metric[metric].last_sample_count;
|
||||||
|
long long ops_sec;
|
||||||
|
|
||||||
|
ops_sec = t > 0 ? (ops*1000/t) : 0;
|
||||||
|
|
||||||
|
server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
|
||||||
|
ops_sec;
|
||||||
|
server.inst_metric[metric].idx++;
|
||||||
|
server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
|
||||||
|
server.inst_metric[metric].last_sample_time = mstime();
|
||||||
|
server.inst_metric[metric].last_sample_count = current_reading;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return the mean of all the samples. */
|
||||||
|
long long getInstantaneousMetric(int metric) {
|
||||||
|
int j;
|
||||||
|
long long sum = 0;
|
||||||
|
|
||||||
|
for (j = 0; j < STATS_METRIC_SAMPLES; j++)
|
||||||
|
sum += server.inst_metric[metric].samples[j];
|
||||||
|
return sum / STATS_METRIC_SAMPLES;
|
||||||
|
}
|
||||||
|
|
||||||
/* The client query buffer is an sds.c string that can end with a lot of
|
/* The client query buffer is an sds.c string that can end with a lot of
|
||||||
* free space not used, this function reclaims space if needed.
|
* free space not used, this function reclaims space if needed.
|
||||||
*
|
*
|
||||||
@ -2109,11 +2183,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||||
UNUSED(eventLoop);
|
UNUSED(eventLoop);
|
||||||
|
|
||||||
|
/* Handle precise timeouts of blocked clients. */
|
||||||
|
clientsHandleShortTimeout();
|
||||||
|
|
||||||
/* We should handle pending reads clients ASAP after event loop. */
|
/* We should handle pending reads clients ASAP after event loop. */
|
||||||
handleClientsWithPendingReadsUsingThreads();
|
handleClientsWithPendingReadsUsingThreads();
|
||||||
|
|
||||||
/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
|
/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
|
||||||
tlsProcessPendingData();
|
tlsProcessPendingData();
|
||||||
|
|
||||||
/* If tls still has pending unread data don't sleep at all. */
|
/* If tls still has pending unread data don't sleep at all. */
|
||||||
aeSetDontWait(server.el, tlsHasPendingData());
|
aeSetDontWait(server.el, tlsHasPendingData());
|
||||||
|
|
||||||
@ -2738,6 +2816,7 @@ void initServer(void) {
|
|||||||
server.monitors = listCreate();
|
server.monitors = listCreate();
|
||||||
server.clients_pending_write = listCreate();
|
server.clients_pending_write = listCreate();
|
||||||
server.clients_pending_read = listCreate();
|
server.clients_pending_read = listCreate();
|
||||||
|
server.clients_timeout_table = raxNew();
|
||||||
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
||||||
server.unblocked_clients = listCreate();
|
server.unblocked_clients = listCreate();
|
||||||
server.ready_keys = listCreate();
|
server.ready_keys = listCreate();
|
||||||
|
@ -1070,6 +1070,7 @@ struct redisServer {
|
|||||||
list *clients_pending_read; /* Client has pending read socket buffers. */
|
list *clients_pending_read; /* Client has pending read socket buffers. */
|
||||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||||
client *current_client; /* Current client executing the command. */
|
client *current_client; /* Current client executing the command. */
|
||||||
|
rax *clients_timeout_table; /* Radix tree for clients with short timeout. */
|
||||||
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
|
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
|
||||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||||
int clients_paused; /* True if clients are currently paused */
|
int clients_paused; /* True if clients are currently paused */
|
||||||
@ -2140,6 +2141,7 @@ void disconnectAllBlockedClients(void);
|
|||||||
void handleClientsBlockedOnKeys(void);
|
void handleClientsBlockedOnKeys(void);
|
||||||
void signalKeyAsReady(redisDb *db, robj *key);
|
void signalKeyAsReady(redisDb *db, robj *key);
|
||||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids);
|
||||||
|
void addClientToShortTimeoutTable(client *c);
|
||||||
|
|
||||||
/* expire.c -- Handling of expired keys */
|
/* expire.c -- Handling of expired keys */
|
||||||
void activeExpireCycle(int type);
|
void activeExpireCycle(int type);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user