diff --git a/src/Makefile b/src/Makefile index 1b12c3c52..78568b736 100644 --- a/src/Makefile +++ b/src/Makefile @@ -259,7 +259,7 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o motd.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark diff --git a/src/blocked.cpp b/src/blocked.cpp index 55b8ae3ed..6d2d99687 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -31,9 +31,6 @@ * * API: * - * getTimeoutFromObjectOrReply() is just an utility function to parse a - * timeout argument since blocking operations usually require a timeout. - * * blockClient() set the CLIENT_BLOCKED flag in the client, and set the * specified block type 'btype' filed to one of BLOCKED_* macros. * @@ -68,42 +65,6 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); -/* Get a timeout value from an object and store it into 'timeout'. - * The final timeout is always stored as milliseconds as a time where the - * timeout will expire, however the parsing is performed according to - * the 'unit' that can be seconds or milliseconds. - * - * Note that if the timeout is zero (usually from the point of view of - * commands API this means no timeout) the value stored into 'timeout' - * is zero. */ -int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) { - long long tval; - long double ftval; - - if (unit == UNIT_SECONDS) { - if (getLongDoubleFromObjectOrReply(c,object,&ftval, - "timeout is not an float or out of range") != C_OK) - return C_ERR; - tval = (long long) (ftval * 1000.0); - } else { - if (getLongLongFromObjectOrReply(c,object,&tval, - "timeout is not an integer or out of range") != C_OK) - return C_ERR; - } - - if (tval < 0) { - addReplyErrorAsync(c,"timeout is negative"); - return C_ERR; - } - - if (tval > 0) { - tval += mstime(); - } - *timeout = tval; - - return C_OK; -} - /* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ diff --git a/src/server.cpp b/src/server.cpp index 74fac4557..5a0082325 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1501,137 +1501,6 @@ int allPersistenceDisabled(void) { return g_pserver->saveparamslen == 0 && g_pserver->aof_state == AOF_OFF; } -/* ========================== Clients timeouts ============================= */ - -/* Check if this blocked client timedout (does nothing if the client is - * not blocked right now). If so send a reply, unblock it, and return 1. - * Otherwise 0 is returned and no operation is performed. */ -int checkBlockedClientTimeout(client *c, mstime_t now) { - if (c->flags & CLIENT_BLOCKED && - c->bpop.timeout != 0 - && c->bpop.timeout < now) - { - /* Handle blocking operation specific timeout. */ - replyToBlockedClientTimedOut(c); - unblockClient(c); - return 1; - } else { - return 0; - } -} - -/* Check for timeouts. Returns non-zero if the client was terminated. - * The function gets the current time in milliseconds as argument since - * it gets called multiple times in a loop, so calling gettimeofday() for - * each iteration would be costly without any actual gain. */ -int clientsCronHandleTimeout(client *c, mstime_t now_ms) { - time_t now = now_ms/1000; - - if (cserver.maxidletime && - /* This handles the idle clients connection timeout if set. */ - !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ - !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ - !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ - !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ - (now - c->lastinteraction > cserver.maxidletime)) - { - serverLog(LL_VERBOSE,"Closing idle client"); - freeClient(c); - return 1; - } else if (c->flags & CLIENT_BLOCKED) { - /* Cluster: handle unblock & redirect of clients blocked - * into keys no longer served by this server. */ - if (g_pserver->cluster_enabled) { - if (clusterRedirectBlockedClientIfNeeded(c)) - unblockClient(c); - } - } - return 0; -} - -/* For blocked clients timeouts we populate a radix tree of 128 bit keys - * composed as such: - * - * [8 byte big endian expire time]+[8 byte client ID] - * - * We don't do any cleanup in the Radix tree: when we run the clients that - * reached the timeout already, if they are no longer existing or no longer - * blocked with such timeout, we just go forward. - * - * Every time a client blocks with a timeout, we add the client in - * the tree. In beforeSleep() we call clientsHandleTimeout() to run - * the tree and unblock the clients. */ - -#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ - -/* Given client ID and timeout, write the resulting radix tree key in buf. */ -void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { - timeout = htonu64(timeout); - memcpy(buf,&timeout,sizeof(timeout)); - memcpy(buf+8,&id,sizeof(id)); -} - -/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write - * the timeout into *toptr and the client ID into *idptr. */ -void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { - memcpy(toptr,buf,sizeof(*toptr)); - *toptr = ntohu64(*toptr); - memcpy(idptr,buf+8,sizeof(*idptr)); -} - -/* Add the specified client id / timeout as a key in the radix tree we use - * to handle blocked clients timeouts. The client is not added to the list - * if its timeout is zero (block forever). */ -void addClientToTimeoutTable(client *c) { - if (c->bpop.timeout == 0) return; - uint64_t timeout = c->bpop.timeout; - uint64_t id = c->id; - unsigned char buf[CLIENT_ST_KEYLEN]; - encodeTimeoutKey(buf,timeout,id); - if (raxTryInsert(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL,NULL)) - c->flags |= CLIENT_IN_TO_TABLE; -} - -/* Remove the client from the table when it is unblocked for reasons - * different than timing out. */ -void removeClientFromTimeoutTable(client *c) { - if (!(c->flags & CLIENT_IN_TO_TABLE)) return; - c->flags &= ~CLIENT_IN_TO_TABLE; - uint64_t timeout = c->bpop.timeout; - uint64_t id = c->id; - unsigned char buf[CLIENT_ST_KEYLEN]; - encodeTimeoutKey(buf,timeout,id); - raxRemove(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL); -} - -/* This function is called in beforeSleep() in order to unblock clients - * that are waiting in blocking operations with a timeout set. */ -void clientsHandleTimeout(void) { - serverAssert(GlobalLocksAcquired()); - - if (raxSize(g_pserver->clients_timeout_table) == 0) return; - uint64_t now = mstime(); - raxIterator ri; - raxStart(&ri,g_pserver->clients_timeout_table); - raxSeek(&ri,"^",NULL,0); - - while(raxNext(&ri)) { - uint64_t id, timeout; - decodeTimeoutKey(ri.key,&timeout,&id); - if (timeout >= now) break; /* All the timeouts are in the future. */ - client *c = lookupClientByID(id); - if (c) { - std::unique_lock l(c->lock, std::defer_lock); - if (FCorrectThread(c)) - l.lock(); - c->flags &= ~CLIENT_IN_TO_TABLE; - checkBlockedClientTimeout(c,now); - } - raxRemove(g_pserver->clients_timeout_table,ri.key,ri.key_len,NULL); - raxSeek(&ri,"^",NULL,0); - } -} - /* ======================= Cron: called every 100 ms ======================== */ /* Add a sample to the operations per second array of samples. */ @@ -2265,7 +2134,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Handle precise timeouts of blocked clients. */ - clientsHandleTimeout(); + handleBlockedClientsTimeout(); /* Handle TLS pending data. (must be done before flushAppendOnlyFile) */ tlsProcessPendingData(); diff --git a/src/server.h b/src/server.h index 7e6564d9c..a3c0f8dec 100644 --- a/src/server.h +++ b/src/server.h @@ -2724,8 +2724,12 @@ void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids); + +/* timeout.c -- Blocked clients timeout and connections timeout. */ void addClientToTimeoutTable(client *c); void removeClientFromTimeoutTable(client *c); +void handleBlockedClientsTimeout(void); +int clientsCronHandleTimeout(client *c, mstime_t now_ms); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); diff --git a/src/timeout.c b/src/timeout.c new file mode 100644 index 000000000..6328bcd5d --- /dev/null +++ b/src/timeout.c @@ -0,0 +1,221 @@ +/* Copyright (c) 2009-2020, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "server.h" +#include "cluster.h" + +/* ========================== Clients timeouts ============================= */ + +/* Check if this blocked client timedout (does nothing if the client is + * not blocked right now). If so send a reply, unblock it, and return 1. + * Otherwise 0 is returned and no operation is performed. */ +int checkBlockedClientTimeout(client *c, mstime_t now) { + if (c->flags & CLIENT_BLOCKED && + c->bpop.timeout != 0 + && c->bpop.timeout < now) + { + /* Handle blocking operation specific timeout. */ + replyToBlockedClientTimedOut(c); + unblockClient(c); + return 1; + } else { + return 0; + } +} + +/* Check for timeouts. Returns non-zero if the client was terminated. + * The function gets the current time in milliseconds as argument since + * it gets called multiple times in a loop, so calling gettimeofday() for + * each iteration would be costly without any actual gain. */ +int clientsCronHandleTimeout(client *c, mstime_t now_ms) { + time_t now = now_ms/1000; + + if (cserver.maxidletime && + /* This handles the idle clients connection timeout if set. */ + !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ + !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ + !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ + !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ + (now - c->lastinteraction > cserver.maxidletime)) + { + serverLog(LL_VERBOSE,"Closing idle client"); + freeClient(c); + return 1; + } else if (c->flags & CLIENT_BLOCKED) { + /* Cluster: handle unblock & redirect of clients blocked + * into keys no longer served by this server. */ + if (g_pserver->cluster_enabled) { + if (clusterRedirectBlockedClientIfNeeded(c)) + unblockClient(c); + } + } + return 0; +} + +/* For blocked clients timeouts we populate a radix tree of 128 bit keys + * composed as such: + * + * [8 byte big endian expire time]+[8 byte client ID] + * + * We don't do any cleanup in the Radix tree: when we run the clients that + * reached the timeout already, if they are no longer existing or no longer + * blocked with such timeout, we just go forward. + * + * Every time a client blocks with a timeout, we add the client in + * the tree. In beforeSleep() we call clientsHandleTimeout() to run + * the tree and unblock the clients. */ + +#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ + +/* Given client ID and timeout, write the resulting radix tree key in buf. */ +void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { + timeout = htonu64(timeout); + memcpy(buf,&timeout,sizeof(timeout)); + memcpy(buf+8,&id,sizeof(id)); +} + +/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write + * the timeout into *toptr and the client ID into *idptr. */ +void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { + memcpy(toptr,buf,sizeof(*toptr)); + *toptr = ntohu64(*toptr); + memcpy(idptr,buf+8,sizeof(*idptr)); +} + +/* Add the specified client id / timeout as a key in the radix tree we use + * to handle blocked clients timeouts. The client is not added to the list + * if its timeout is zero (block forever). */ +void addClientToTimeoutTable(client *c) { + if (c->bpop.timeout == 0) return; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + if (raxTryInsert(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL,NULL)) + c->flags |= CLIENT_IN_TO_TABLE; +} + +/* Remove the client from the table when it is unblocked for reasons + * different than timing out. */ +void removeClientFromTimeoutTable(client *c) { + if (!(c->flags & CLIENT_IN_TO_TABLE)) return; + c->flags &= ~CLIENT_IN_TO_TABLE; + uint64_t timeout = c->bpop.timeout; + uint64_t id = c->id; + unsigned char buf[CLIENT_ST_KEYLEN]; + encodeTimeoutKey(buf,timeout,id); + raxRemove(g_pserver->clients_timeout_table,buf,sizeof(buf),NULL); +} + +/* This function is called in beforeSleep() in order to unblock clients + * that are waiting in blocking operations with a timeout set. */ +void clientsHandleTimeout(void) { + serverAssert(GlobalLocksAcquired()); + + if (raxSize(g_pserver->clients_timeout_table) == 0) return; + uint64_t now = mstime(); + raxIterator ri; + raxStart(&ri,g_pserver->clients_timeout_table); + raxSeek(&ri,"^",NULL,0); + + while(raxNext(&ri)) { + uint64_t id, timeout; + decodeTimeoutKey(ri.key,&timeout,&id); + if (timeout >= now) break; /* All the timeouts are in the future. */ + client *c = lookupClientByID(id); + if (c) { + std::unique_lock l(c->lock, std::defer_lock); + if (FCorrectThread(c)) + l.lock(); + c->flags &= ~CLIENT_IN_TO_TABLE; + checkBlockedClientTimeout(c,now); + } + raxRemove(g_pserver->clients_timeout_table,ri.key,ri.key_len,NULL); + raxSeek(&ri,"^",NULL,0); + } +} + +/* This function is called in beforeSleep() in order to unblock clients + * that are waiting in blocking operations with a timeout set. */ +void handleBlockedClientsTimeout(void) { + if (raxSize(server.clients_timeout_table) == 0) return; + uint64_t now = mstime(); + raxIterator ri; + raxStart(&ri,server.clients_timeout_table); + raxSeek(&ri,"^",NULL,0); + + while(raxNext(&ri)) { + uint64_t id, timeout; + decodeTimeoutKey(ri.key,&timeout,&id); + if (timeout >= now) break; /* All the timeouts are in the future. */ + client *c = lookupClientByID(id); + if (c) { + c->flags &= ~CLIENT_IN_TO_TABLE; + checkBlockedClientTimeout(c,now); + } + raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); + raxSeek(&ri,"^",NULL,0); + } +} + +/* Get a timeout value from an object and store it into 'timeout'. + * The final timeout is always stored as milliseconds as a time where the + * timeout will expire, however the parsing is performed according to + * the 'unit' that can be seconds or milliseconds. + * + * Note that if the timeout is zero (usually from the point of view of + * commands API this means no timeout) the value stored into 'timeout' + * is zero. */ +int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) { + long long tval; + long double ftval; + + if (unit == UNIT_SECONDS) { + if (getLongDoubleFromObjectOrReply(c,object,&ftval, + "timeout is not an float or out of range") != C_OK) + return C_ERR; + tval = (long long) (ftval * 1000.0); + } else { + if (getLongLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != C_OK) + return C_ERR; + } + + if (tval < 0) { + addReplyErrorAsync(c,"timeout is negative"); + return C_ERR; + } + + if (tval > 0) { + tval += mstime(); + } + *timeout = tval; + + return C_OK; +} +