diff --git a/src/blocked.c b/src/blocked.c index 06aa5850e..c470cba00 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -619,6 +619,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo listAddNodeTail(l,c); } blockClient(c,btype); + addClientToShortTimeoutTable(c); } /* Unblock a client that's waiting in a blocking operation such as BLPOP. diff --git a/src/server.c b/src/server.c index e657f1196..26581d979 100644 --- a/src/server.c +++ b/src/server.c @@ -1473,34 +1473,7 @@ int allPersistenceDisabled(void) { return server.saveparamslen == 0 && server.aof_state == AOF_OFF; } -/* ======================= Cron: called every 100 ms ======================== */ - -/* Add a sample to the operations per second array of samples. */ -void trackInstantaneousMetric(int metric, long long current_reading) { - long long t = mstime() - server.inst_metric[metric].last_sample_time; - long long ops = current_reading - - server.inst_metric[metric].last_sample_count; - long long ops_sec; - - ops_sec = t > 0 ? (ops*1000/t) : 0; - - server.inst_metric[metric].samples[server.inst_metric[metric].idx] = - ops_sec; - server.inst_metric[metric].idx++; - server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; - server.inst_metric[metric].last_sample_time = mstime(); - server.inst_metric[metric].last_sample_count = current_reading; -} - -/* Return the mean of all the samples. */ -long long getInstantaneousMetric(int metric) { - int j; - long long sum = 0; - - for (j = 0; j < STATS_METRIC_SAMPLES; j++) - sum += server.inst_metric[metric].samples[j]; - return sum / STATS_METRIC_SAMPLES; -} +/* ========================== Clients timeouts ============================= */ /* Check if this blocked client timedout (does nothing if the client is * not blocked right now). If so send a reply, unblock it, and return 1. @@ -1555,6 +1528,107 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) { return 0; } +/* For shor timeouts, less than < CLIENT_SHORT_TIMEOUT milliseconds, we + * populate a radix tree of 128 bit keys composed as such: + * + * [8 byte big endian expire time]+[8 byte client ID] + * + * We don't do any cleanup in the Radix tree: when we run the clients that + * reached the timeout already, if they are no longer existing or no longer + * blocked with such timeout, we just go forward. + * + * Every time a client blocks with a short timeout, we add the client in + * the tree. In beforeSleep() we call clientsHandleShortTimeout() to run + * the tree and unblock the clients. + * + * Design hint: why we block only clients with short timeouts? For frugality: + * Clients blocking for 30 seconds usually don't need to be unblocked + * precisely, and anyway for the nature of Redis to *guarantee* unblock time + * precision is hard, so we can avoid putting a large number of clients in + * the radix tree without a good reason. This idea also has a role in memory + * usage as well given that we don't do cleanup, the shorter a client timeout, + * the less time it will stay in the radix tree. */ + +#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ + +/* Given client ID and timeout, write the resulting radix tree key in buf. */ +void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { + timeout = htonu64(timeout); + memcpy(buf,&timeout,sizeof(timeout)); + memcpy(buf+8,&id,sizeof(id)); +} + +/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write + * the timeout into *toptr and the client ID into *idptr. */ +void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { + memcpy(toptr,buf,sizeof(*toptr)); + *toptr = ntohu64(*toptr); + memcpy(idptr,buf+8,sizeof(*idptr)); +} + +/* Add the specified client id / timeout as a key in the radix tree we use + * to handle short timeouts. The client is not added to the list if its + * timeout is longer than CLIENT_SHORT_TIMEOUT milliseconds. */ +void addClientToShortTimeoutTable(client *c) { + if (c->bpop.timeout == 0 || + c->bpop.timeout - mstime() > CLIENT_SHORT_TIMEOUT) + { + return; + } + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL); +} + +/* This function is called in beforeSleep() in order to unblock ASAP clients + * that are waiting in blocking operations with a short timeout set. */ +void clientsHandleShortTimeout(void) { + uint64_t now = mstime(); + raxIterator ri; + raxStart(&ri,server.clients_timeout_table); + + while(raxNext(&ri)) { + uint64_t id, timeout; + decodeTimeoutKey(ri.key,&timeout,&id); + if (timeout >= now) break; /* All the timeouts are in the future. */ + client *c = lookupClientByID(id); + if (c) checkBlockedClientTimeout(c,now); + raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); + raxSeek(&ri,"^",NULL,0); + } +} + +/* ======================= Cron: called every 100 ms ======================== */ + +/* Add a sample to the operations per second array of samples. */ +void trackInstantaneousMetric(int metric, long long current_reading) { + long long t = mstime() - server.inst_metric[metric].last_sample_time; + long long ops = current_reading - + server.inst_metric[metric].last_sample_count; + long long ops_sec; + + ops_sec = t > 0 ? (ops*1000/t) : 0; + + server.inst_metric[metric].samples[server.inst_metric[metric].idx] = + ops_sec; + server.inst_metric[metric].idx++; + server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES; + server.inst_metric[metric].last_sample_time = mstime(); + server.inst_metric[metric].last_sample_count = current_reading; +} + +/* Return the mean of all the samples. */ +long long getInstantaneousMetric(int metric) { + int j; + long long sum = 0; + + for (j = 0; j < STATS_METRIC_SAMPLES; j++) + sum += server.inst_metric[metric].samples[j]; + return sum / STATS_METRIC_SAMPLES; +} + /* The client query buffer is an sds.c string that can end with a lot of * free space not used, this function reclaims space if needed. * @@ -2109,11 +2183,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); + /* Handle precise timeouts of blocked clients. */ + clientsHandleShortTimeout(); + /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ tlsProcessPendingData(); + /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); @@ -2738,6 +2816,7 @@ void initServer(void) { server.monitors = listCreate(); server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); + server.clients_timeout_table = raxNew(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); diff --git a/src/server.h b/src/server.h index d697e79a2..c20f70136 100644 --- a/src/server.h +++ b/src/server.h @@ -1070,6 +1070,7 @@ struct redisServer { list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ + rax *clients_timeout_table; /* Radix tree for clients with short timeout. */ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ int clients_paused; /* True if clients are currently paused */ @@ -2140,6 +2141,7 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); +void addClientToShortTimeoutTable(client *c); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type);