From c3b268a0bcded2d730790a772176483efb7c31fb Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 Mar 2020 16:34:45 +0100 Subject: [PATCH] timeout.c created: move client timeouts code there. --- src/Makefile | 2 +- src/blocked.c | 39 ---------- src/server.c | 128 +-------------------------------- src/server.h | 4 ++ src/timeout.c | 192 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 198 insertions(+), 167 deletions(-) create mode 100644 src/timeout.c diff --git a/src/Makefile b/src/Makefile index bbfb06440..3f982cc8e 100644 --- a/src/Makefile +++ b/src/Makefile @@ -206,7 +206,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crc64.o siphash.o crc16.o REDIS_BENCHMARK_NAME=redis-benchmark diff --git a/src/blocked.c b/src/blocked.c index 795985ea1..e3a803ae3 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -31,9 +31,6 @@ * * API: * - * getTimeoutFromObjectOrReply() is just an utility function to parse a - * timeout argument since blocking operations usually require a timeout. - * * blockClient() set the CLIENT_BLOCKED flag in the client, and set the * specified block type 'btype' filed to one of BLOCKED_* macros. * @@ -67,42 +64,6 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); -/* Get a timeout value from an object and store it into 'timeout'. - * The final timeout is always stored as milliseconds as a time where the - * timeout will expire, however the parsing is performed according to - * the 'unit' that can be seconds or milliseconds. - * - * Note that if the timeout is zero (usually from the point of view of - * commands API this means no timeout) the value stored into 'timeout' - * is zero. */ -int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) { - long long tval; - long double ftval; - - if (unit == UNIT_SECONDS) { - if (getLongDoubleFromObjectOrReply(c,object,&ftval, - "timeout is not an float or out of range") != C_OK) - return C_ERR; - tval = (long long) (ftval * 1000.0); - } else { - if (getLongLongFromObjectOrReply(c,object,&tval, - "timeout is not an integer or out of range") != C_OK) - return C_ERR; - } - - if (tval < 0) { - addReplyError(c,"timeout is negative"); - return C_ERR; - } - - if (tval > 0) { - tval += mstime(); - } - *timeout = tval; - - return C_OK; -} - /* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ diff --git a/src/server.c b/src/server.c index 9a89290ca..bbc7ba5ff 100644 --- a/src/server.c +++ b/src/server.c @@ -1473,132 +1473,6 @@ int allPersistenceDisabled(void) { return server.saveparamslen == 0 && server.aof_state == AOF_OFF; } -/* ========================== Clients timeouts ============================= */ - -/* Check if this blocked client timedout (does nothing if the client is - * not blocked right now). If so send a reply, unblock it, and return 1. - * Otherwise 0 is returned and no operation is performed. */ -int checkBlockedClientTimeout(client *c, mstime_t now) { - if (c->flags & CLIENT_BLOCKED && - c->bpop.timeout != 0 - && c->bpop.timeout < now) - { - /* Handle blocking operation specific timeout. */ - replyToBlockedClientTimedOut(c); - unblockClient(c); - return 1; - } else { - return 0; - } -} - -/* Check for timeouts. Returns non-zero if the client was terminated. - * The function gets the current time in milliseconds as argument since - * it gets called multiple times in a loop, so calling gettimeofday() for - * each iteration would be costly without any actual gain. */ -int clientsCronHandleTimeout(client *c, mstime_t now_ms) { - time_t now = now_ms/1000; - - if (server.maxidletime && - /* This handles the idle clients connection timeout if set. */ - !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ - !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ - !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ - !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ - (now - c->lastinteraction > server.maxidletime)) - { - serverLog(LL_VERBOSE,"Closing idle client"); - freeClient(c); - return 1; - } else if (c->flags & CLIENT_BLOCKED) { - /* Cluster: handle unblock & redirect of clients blocked - * into keys no longer served by this server. */ - if (server.cluster_enabled) { - if (clusterRedirectBlockedClientIfNeeded(c)) - unblockClient(c); - } - } - return 0; -} - -/* For blocked clients timeouts we populate a radix tree of 128 bit keys - * composed as such: - * - * [8 byte big endian expire time]+[8 byte client ID] - * - * We don't do any cleanup in the Radix tree: when we run the clients that - * reached the timeout already, if they are no longer existing or no longer - * blocked with such timeout, we just go forward. - * - * Every time a client blocks with a timeout, we add the client in - * the tree. In beforeSleep() we call clientsHandleTimeout() to run - * the tree and unblock the clients. */ - -#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ - -/* Given client ID and timeout, write the resulting radix tree key in buf. */ -void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { - timeout = htonu64(timeout); - memcpy(buf,&timeout,sizeof(timeout)); - memcpy(buf+8,&id,sizeof(id)); -} - -/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write - * the timeout into *toptr and the client ID into *idptr. */ -void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { - memcpy(toptr,buf,sizeof(*toptr)); - *toptr = ntohu64(*toptr); - memcpy(idptr,buf+8,sizeof(*idptr)); -} - -/* Add the specified client id / timeout as a key in the radix tree we use - * to handle blocked clients timeouts. The client is not added to the list - * if its timeout is zero (block forever). */ -void addClientToTimeoutTable(client *c) { - if (c->bpop.timeout == 0) return; - uint64_t timeout = c->bpop.timeout; - uint64_t id = c->id; - unsigned char buf[CLIENT_ST_KEYLEN]; - encodeTimeoutKey(buf,timeout,id); - if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) - c->flags |= CLIENT_IN_TO_TABLE; -} - -/* Remove the client from the table when it is unblocked for reasons - * different than timing out. */ -void removeClientFromTimeoutTable(client *c) { - if (!(c->flags & CLIENT_IN_TO_TABLE)) return; - c->flags &= ~CLIENT_IN_TO_TABLE; - uint64_t timeout = c->bpop.timeout; - uint64_t id = c->id; - unsigned char buf[CLIENT_ST_KEYLEN]; - encodeTimeoutKey(buf,timeout,id); - raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); -} - -/* This function is called in beforeSleep() in order to unblock clients - * that are waiting in blocking operations with a timeout set. */ -void clientsHandleTimeout(void) { - if (raxSize(server.clients_timeout_table) == 0) return; - uint64_t now = mstime(); - raxIterator ri; - raxStart(&ri,server.clients_timeout_table); - raxSeek(&ri,"^",NULL,0); - - while(raxNext(&ri)) { - uint64_t id, timeout; - decodeTimeoutKey(ri.key,&timeout,&id); - if (timeout >= now) break; /* All the timeouts are in the future. */ - client *c = lookupClientByID(id); - if (c) { - c->flags &= ~CLIENT_IN_TO_TABLE; - checkBlockedClientTimeout(c,now); - } - raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); - raxSeek(&ri,"^",NULL,0); - } -} - /* ======================= Cron: called every 100 ms ======================== */ /* Add a sample to the operations per second array of samples. */ @@ -2183,7 +2057,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Handle precise timeouts of blocked clients. */ - clientsHandleTimeout(); + handleBlockedClientsTimeout(); /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); diff --git a/src/server.h b/src/server.h index 03a153c7b..2c122d259 100644 --- a/src/server.h +++ b/src/server.h @@ -2139,8 +2139,12 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); + +/* timeout.c -- Blocked clients timeout and connections timeout. */ void addClientToTimeoutTable(client *c); void removeClientFromTimeoutTable(client *c); +void handleBlockedClientsTimeout(void); +int clientsCronHandleTimeout(client *c, mstime_t now_ms); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); diff --git a/src/timeout.c b/src/timeout.c new file mode 100644 index 000000000..ea2032e2a --- /dev/null +++ b/src/timeout.c @@ -0,0 +1,192 @@ +/* Copyright (c) 2009-2020, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include "cluster.h" + +/* ========================== Clients timeouts ============================= */ + +/* Check if this blocked client timedout (does nothing if the client is + * not blocked right now). If so send a reply, unblock it, and return 1. + * Otherwise 0 is returned and no operation is performed. */ +int checkBlockedClientTimeout(client *c, mstime_t now) { + if (c->flags & CLIENT_BLOCKED && + c->bpop.timeout != 0 + && c->bpop.timeout < now) + { + /* Handle blocking operation specific timeout. */ + replyToBlockedClientTimedOut(c); + unblockClient(c); + return 1; + } else { + return 0; + } +} + +/* Check for timeouts. Returns non-zero if the client was terminated. + * The function gets the current time in milliseconds as argument since + * it gets called multiple times in a loop, so calling gettimeofday() for + * each iteration would be costly without any actual gain. */ +int clientsCronHandleTimeout(client *c, mstime_t now_ms) { + time_t now = now_ms/1000; + + if (server.maxidletime && + /* This handles the idle clients connection timeout if set. */ + !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ + !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ + !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ + !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ + (now - c->lastinteraction > server.maxidletime)) + { + serverLog(LL_VERBOSE,"Closing idle client"); + freeClient(c); + return 1; + } else if (c->flags & CLIENT_BLOCKED) { + /* Cluster: handle unblock & redirect of clients blocked + * into keys no longer served by this server. */ + if (server.cluster_enabled) { + if (clusterRedirectBlockedClientIfNeeded(c)) + unblockClient(c); + } + } + return 0; +} + +/* For blocked clients timeouts we populate a radix tree of 128 bit keys + * composed as such: + * + * [8 byte big endian expire time]+[8 byte client ID] + * + * We don't do any cleanup in the Radix tree: when we run the clients that + * reached the timeout already, if they are no longer existing or no longer + * blocked with such timeout, we just go forward. + * + * Every time a client blocks with a timeout, we add the client in + * the tree. In beforeSleep() we call handleBlockedClientsTimeout() to run + * the tree and unblock the clients. */ + +#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ + +/* Given client ID and timeout, write the resulting radix tree key in buf. */ +void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { + timeout = htonu64(timeout); + memcpy(buf,&timeout,sizeof(timeout)); + memcpy(buf+8,&id,sizeof(id)); +} + +/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write + * the timeout into *toptr and the client ID into *idptr. */ +void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { + memcpy(toptr,buf,sizeof(*toptr)); + *toptr = ntohu64(*toptr); + memcpy(idptr,buf+8,sizeof(*idptr)); +} + +/* Add the specified client id / timeout as a key in the radix tree we use + * to handle blocked clients timeouts. The client is not added to the list + * if its timeout is zero (block forever). */ +void addClientToTimeoutTable(client *c) { + if (c->bpop.timeout == 0) return; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) + c->flags |= CLIENT_IN_TO_TABLE; +} + +/* Remove the client from the table when it is unblocked for reasons + * different than timing out. */ +void removeClientFromTimeoutTable(client *c) { + if (!(c->flags & CLIENT_IN_TO_TABLE)) return; + c->flags &= ~CLIENT_IN_TO_TABLE; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); +} + +/* This function is called in beforeSleep() in order to unblock clients + * that are waiting in blocking operations with a timeout set. */ +void handleBlockedClientsTimeout(void) { + if (raxSize(server.clients_timeout_table) == 0) return; + uint64_t now = mstime(); + raxIterator ri; + raxStart(&ri,server.clients_timeout_table); + raxSeek(&ri,"^",NULL,0); + + while(raxNext(&ri)) { + uint64_t id, timeout; + decodeTimeoutKey(ri.key,&timeout,&id); + if (timeout >= now) break; /* All the timeouts are in the future. */ + client *c = lookupClientByID(id); + if (c) { + c->flags &= ~CLIENT_IN_TO_TABLE; + checkBlockedClientTimeout(c,now); + } + raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); + raxSeek(&ri,"^",NULL,0); + } +} + +/* Get a timeout value from an object and store it into 'timeout'. + * The final timeout is always stored as milliseconds as a time where the + * timeout will expire, however the parsing is performed according to + * the 'unit' that can be seconds or milliseconds. + * + * Note that if the timeout is zero (usually from the point of view of + * commands API this means no timeout) the value stored into 'timeout' + * is zero. */ +int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) { + long long tval; + long double ftval; + + if (unit == UNIT_SECONDS) { + if (getLongDoubleFromObjectOrReply(c,object,&ftval, + "timeout is not an float or out of range") != C_OK) + return C_ERR; + tval = (long long) (ftval * 1000.0); + } else { + if (getLongLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != C_OK) + return C_ERR; + } + + if (tval < 0) { + addReplyError(c,"timeout is negative"); + return C_ERR; + } + + if (tval > 0) { + tval += mstime(); + } + *timeout = tval; + + return C_OK; +}