diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 000000000..cc67d754a --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,147 @@ +build: + rules: + - if: '$COVERAGE' + when: never + - if: '$ENDURANCE' + when: never + - when: always + tags: + - docker + stage: build + script: + - git submodule init && git submodule update + - make distclean + - make -j + +make-test: + rules: + - if: '$COVERAGE' + when: never + - if: '$ENDURANCE' + when: never + - when: always + tags: + - docker + stage: test + script: + - git submodule init && git submodule update + - make distclean + - make -j + - make test -j + +node-redis-test: + rules: + - if: '$COVERAGE' + when: never + - if: '$ENDURANCE' + when: never + - when: always + tags: + - docker + - ipv6 + stage: test + script: + - git submodule init && git submodule update + - make distclean + - make -j + - make install + - git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/node-redis.git + - cd node-redis + - npm install + - npm run test + +jedis-test: + rules: + - if: '$COVERAGE' + when: never + - if: '$ENDURANCE' + when: never + - when: always + tags: + - docker + - ipv4 + stage: test + script: + - git submodule init && git submodule update + - make distclean + - make -j + - make install + - git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/jedis.git + - cd jedis + - make test + +redis-rs-test: + rules: + - if: '$COVERAGE' + when: never + - if: '$ENDURANCE' + when: never + - when: always + tags: + - docker + stage: test + script: + - git submodule init && git submodule update + - make distclean + - make -j + - make install + - git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/redis-rs.git + - cd redis-rs + - make test + +endurance-test: + rules: + - if: '$ENDURANCE' + tags: + - docker + stage: test + script: + - git submodule init && git submodule update + - make distclean + - make -j + - ./runtest --loop --stop + +coverage-test: + rules: + - if: '$COVERAGE' + tags: + - docker + stage: test + script: + - git submodule init && git submodule update + - make distclean + - make gcov -j + - make install + - ./runtest || true + - pkill keydb-server || true + - pkill stunnel || true + - ./runtest-cluster || true + - pkill keydb-server || true + - pkill stunnel || true + - ./runtest-sentinel || true + - pkill keydb-server || true + - pkill stunnel || true + - ./runtest-moduleapi || true + - pkill keydb-server || true + - pkill stunnel || true + - git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/redis-rs.git + - cd redis-rs + - make test || true + - pkill keydb-server || true + - pkill stunnel || true + - cd .. + - git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/jedis.git + - cd jedis + - make test || true + - pkill keydb-server || true + - pkill stunnel || true + - cd .. + - git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/node-redis.git + - cd node-redis + - npm install + - npm run test || true + - pkill keydb-server || true + - pkill stunnel || true + - cd .. + - geninfo -o KeyDB.info --no-external . + - genhtml --legend -o lcov-html KeyDB.info \ No newline at end of file diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 8a1405e41..4f6cb9978 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -11,6 +11,40 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Redis 6.2.3 Released Mon May 3 19:00:00 IST 2021 +================================================================================ + +Upgrade urgency: SECURITY, Contains fixes to security issues that affect +authenticated client connections. LOW otherwise. + +Integer overflow in STRALGO LCS command (CVE-2021-29477): +An integer overflow bug in Redis version 6.0 or newer could be exploited using +the STRALGO LCS command to corrupt the heap and potentially result in remote +code execution. The integer overflow bug exists in all versions of Redis +starting with 6.0. + +Integer overflow in COPY command for large intsets (CVE-2021-29478): +An integer overflow bug in Redis 6.2 could be exploited to corrupt the heap and +potentially result with remote code execution. The vulnerability involves +changing the default set-max-intset-entries configuration value, creating a +large set key that consists of integer values and using the COPY command to +duplicate it. The integer overflow bug exists in all versions of Redis starting +with 2.6, where it could result with a corrupted RDB or DUMP payload, but not +exploited through COPY (which did not exist before 6.2). + +Bug fixes that are only applicable to previous releases of Redis 6.2: +* Fix memory leak in moduleDefragGlobals (#8853) +* Fix memory leak when doing lazy freeing client tracking table (#8822) +* Block abusive replicas from sending command that could assert and crash redis (#8868) + +Other bug fixes: +* Use a monotonic clock to check for Lua script timeout (#8812) +* redis-cli: Do not use unix socket when we got redirected in cluster mode (#8870) + +Modules: +* Fix RM_GetClusterNodeInfo() to correctly populate master id (#8846) + ================================================================================ Redis 6.2.2 Released Mon April 19 19:00:00 IST 2021 ================================================================================ diff --git a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h index b6e2f8c6d..343198244 100644 --- a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h +++ b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h @@ -241,7 +241,7 @@ iget_defrag_hint(tsdn_t *tsdn, void* ptr) { int free_in_slab = extent_nfree_get(slab); if (free_in_slab) { const bin_info_t *bin_info = &bin_infos[binind]; - int curslabs = binshard->stats.curslabs; + ssize_t curslabs = binshard->stats.curslabs; size_t curregs = binshard->stats.curregs; if (binshard->slabcur) { /* remove slabcur from the overall utilization */ diff --git a/monkey/monkey.py b/monkey/monkey.py index 653970728..37e5e4440 100644 --- a/monkey/monkey.py +++ b/monkey/monkey.py @@ -1,21 +1,17 @@ -import keydb import random -import sched, time +import time import socket import asyncore import threading -import sys +import argparse from pprint import pprint -# Parameters -numclients = 50 -#numkeys = 1000000 -numkeys = 100000 - # Globals -ops=0 -s = sched.scheduler(time.time, time.sleep) -g_exit = False +ops = {} +numclients = 0 +numkeys = 0 +runtime = 0 +clients = [] def _buildResp(*args): result = "*" + str(len(args)) + "\r\n" @@ -32,6 +28,8 @@ class Client(asyncore.dispatcher): self.buf = b'' self.inbuf = b'' self.callbacks = list() + self.client_id = 0 + self.get_client_id() def handle_connect(self): pass @@ -55,6 +53,9 @@ class Client(asyncore.dispatcher): endrange = self.inbuf[startpos+1:].find(ord('\r')) + 1 + startpos assert(endrange > 0) numargs = int(self.inbuf[startpos+1:endrange]) + if numargs == -1: # Nil array, used in some returns + startpos = endrange + 2 + return startpos, [] assert(numargs > 0) args = list() startpos = endrange + 2 # plus 1 gets us to the '\n' and the next gets us to the start char @@ -127,10 +128,18 @@ class Client(asyncore.dispatcher): self.buf += _buildResp("lpush", key, val) self.callbacks.append(callback) + def blpop(self, *keys, timeout=0, callback=default_result_handler): + self.buf += _buildResp("blpop", *keys, str(timeout)) + self.callbacks.append(callback) + def delete(self, key, callback = default_result_handler): self.buf += _buildResp("del", key) self.callbacks.append(callback) + def unblock(self, client_id, callback=default_result_handler): + self.buf += _buildResp("client", "unblock", str(client_id)) + self.callbacks.append(callback) + def scan(self, iter, match=None, count=None, callback = default_result_handler): args = ["scan", str(iter)] if match != None: @@ -142,68 +151,133 @@ class Client(asyncore.dispatcher): self.buf += _buildResp(*args) self.callbacks.append(callback) + def get_client_id(self): + self.buf += _buildResp("client", "id") + self.callbacks.append(self.store_client_id) + + def store_client_id(self, c, resp): + assert(resp[0] == ord(':')) + self.client_id = int(resp[1:]) + assert(self.client_id == c.client_id) + def get(self, key, callback = None): return - def getrandomkey(): return str(random.randrange(0, numkeys)) -def handle_lpush_response(c, resp): +def handle_lpush_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['lpush'] += 1 assert(resp[0] == ord(':')) c.lpush("list_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_lpush_response) -def handle_set_response(c, resp): +def handle_blpop_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['blpop'] += 1 + c.blpop("list_" + getrandomkey(), callback=handle_blpop_response) + +def handle_set_response(c, resp=None): + global ops + if resp != None: + ops['set'] += 1 assert(resp[0] == ord('+')) c.set("str_" + getrandomkey(), 'bardsklfjkldsjfdlsjflksdfjklsdjflksd kldsjflksd jlkdsjf lksdjklds jrfklsdjfklsdjfkl', handle_set_response) -def handle_del_response(c, resp): +def handle_del_response(c, resp=None): global ops if resp != None: - ops = ops + 1 + ops['del'] += 1 c.delete("list_" + getrandomkey(), handle_del_response) -def scan_callback(c, resp): +def scan_callback(c, resp=None): global ops nextstart = int(resp[0]) c.scan(nextstart, count=500, callback=scan_callback) - ops = ops+1 + ops['scan'] += 1 + +def clear_ops(): + global ops + ops = {'lpush': 0, 'blpop': 0, 'del': 0, 'scan': 0, 'set': 0, 'get': 0} def stats_thread(): global ops - global g_exit - while not g_exit: + global runtime + i = 0 + while i < runtime or not runtime: time.sleep(1) - print("Ops per second: " + str(ops)) - ops = 0 + print("Ops per second: " + str({k:v for (k,v) in ops.items() if v})) + clear_ops() + i += 1 + asyncore.close_all() -def main(): - global g_exit - clients = [] +def flush_db_sync(): + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.connect(('127.0.0.1', 6379)) + server.send(_buildResp("flushdb")) + resp = server.recv(8192) + assert(resp[:3] == "+OK".encode('utf-8')) +def init_blocking(): + global clients + if numkeys > 100 * numclients: + print("WARNING: High ratio of keys to clients. Most lpushes will not be popped and unblocking will take a long time!") + for i in range(numclients): + clients.append(Client('127.0.0.1', 6379)) + if i % 2: + handle_blpop_response(clients[-1]) + else: + handle_lpush_response(clients[-1]) + +def init_lpush(): + global clients for i in range(numclients): clients.append(Client('127.0.0.1', 6379)) for i in range (10): - handle_lpush_response(clients[-1], None) + handle_lpush_response(clients[-1]) #handle_set_response(clients[-1], None) scan_client = Client('127.0.0.1', 6379) scan_client.scan(0, count=500, callback=scan_callback) del_client = Client('127.0.0.1', 6379) - handle_del_response(del_client, None) + handle_del_response(del_client) + +def main(test, flush): + clear_ops() + + if flush: + flush_db_sync() + + try: + globals()[f"init_{test}"]() + except KeyError: + print(f"Test \"{test}\" not found. Exiting...") + exit() + except ConnectionRefusedError: + print("Could not connect to server. Is it running?") + print("Exiting...") + exit() threading.Thread(target=stats_thread).start() asyncore.loop() - g_exit = True - sys.exit(0) - print("DONE") + print("Done.") + +parser = argparse.ArgumentParser(description="Test use cases for KeyDB.") +parser.add_argument('test', choices=[x[5:] for x in filter(lambda name: name.startswith("init_"), globals().keys())], help="which test to run") +parser.add_argument('-c', '--clients', type=int, default=50, help="number of running clients to use") +parser.add_argument('-k', '--keys', type=int, default=100000, help="number of keys to choose from for random tests") +parser.add_argument('-t', '--runtime', type=int, default=0, help="how long to run the test for (default: 0 for infinite)") +parser.add_argument('-f', '--flush', action="store_true", help="flush the db before running the test") if __name__ == "__main__": - main() + try: + args = parser.parse_args() + except: + exit() + numclients = args.clients + numkeys = args.keys + runtime = args.runtime + main(args.test, args.flush) diff --git a/src/Makefile b/src/Makefile index 92bb346f4..f6b8ae93d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -15,7 +15,7 @@ release_hdr := $(shell sh -c './mkreleasehdr.sh') uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') -OPTIMIZATION?=-O2 +OPTIMIZATION?=-O2 -flto DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram rocksdb NODEPS:=clean distclean @@ -140,6 +140,10 @@ DEBUG=-g -ggdb ifneq ($(uname_S),Darwin) FINAL_LIBS+=-latomic endif +# Linux ARM32 needs -latomic at linking time +ifneq (,$(findstring armv,$(uname_M))) + FINAL_LIBS+=-latomic +endif ifeq ($(uname_S),SunOS) @@ -349,9 +353,9 @@ endif REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX) -REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) +REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX) REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o redis-benchmark.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_CHECK_RDB_NAME=keydb-check-rdb$(PROG_SUFFIX) @@ -435,6 +439,12 @@ DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ # Because the jemalloc.h header is generated as a part of the jemalloc build, # building it should complete before building any other object. Instead of # depending on a single artifact, build all dependencies first. +motd_client.o: motd.cpp .make-prerequisites + $(REDIS_CXX) -MMD -o motd_client.o -c $< -DCLIENT -fno-lto + +motd_server.o: motd.cpp .make-prerequisites + $(REDIS_CXX) -MMD -o motd_server.o -c $< -DSERVER + %.o: %.c .make-prerequisites $(REDIS_CC) -MMD -o $@ -c $< diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index e33c97ff7..ad56a253a 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -25,6 +25,12 @@ StorageCache::StorageCache(IStorage *storage, bool fCache) m_pdict = dictCreate(&dbStorageCacheType, nullptr); } +StorageCache::~StorageCache() +{ + if (m_pdict != nullptr) + dictRelease(m_pdict); +} + void StorageCache::clear() { std::unique_lock ul(m_lock); @@ -130,9 +136,10 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const size_t StorageCache::count() const { - std::unique_lock ul(m_lock); + std::unique_lock ul(m_lock, std::defer_lock); + bool fLocked = ul.try_lock(); size_t count = m_spstorage->count(); - if (m_pdict != nullptr) { + if (m_pdict != nullptr && fLocked) { serverAssert(bulkInsertsInProgress.load(std::memory_order_seq_cst) || count == (dictSize(m_pdict) + m_collisionCount)); } return count; @@ -140,6 +147,5 @@ size_t StorageCache::count() const void StorageCache::beginWriteBatch() { serverAssert(GlobalLocksAcquired()); // Otherwise we deadlock - m_lock.lock(); m_spstorage->beginWriteBatch(); } \ No newline at end of file diff --git a/src/StorageCache.h b/src/StorageCache.h index 2536cf2ec..9f92f75c0 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -29,6 +29,8 @@ class StorageCache } public: + ~StorageCache(); + static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) { StorageCache *cache = new StorageCache(nullptr, pfactory->FSlow() /*fCache*/); load_iter_data data = {cache, fn, privdata}; @@ -45,11 +47,11 @@ public: bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } void beginWriteBatch(); - void endWriteBatch() { m_spstorage->endWriteBatch(); m_lock.unlock(); } + void endWriteBatch() { m_spstorage->endWriteBatch(); } void batch_lock() { return m_spstorage->batch_lock(); } void batch_unlock() { return m_spstorage->batch_unlock(); } size_t count() const; const StorageCache *clone(); -}; \ No newline at end of file +}; diff --git a/src/cluster.cpp b/src/cluster.cpp index c9087cc7f..79ea3e84d 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -5588,9 +5588,10 @@ try_again: if (ttl < 1) ttl = 1; } - /* Relocate valid (non expired) keys into the array in successive + /* Relocate valid (non expired) keys and values into the array in successive * positions to remove holes created by the keys that were present * in the first lookup but are now expired after the second lookup. */ + ov[non_expired] = ov[j]; kv[non_expired++] = kv[j]; serverAssertWithInfo(c,NULL, diff --git a/src/config.cpp b/src/config.cpp index d1228d837..701565e1e 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2470,6 +2470,7 @@ static int updateReplBacklogSize(long long val, long long prev, const char **err * being able to tell when the size changes, so restore prev before calling it. */ UNUSED(err); g_pserver->repl_backlog_size = prev; + g_pserver->repl_backlog_config_size = val; resizeReplicationBacklog(val); return 1; } diff --git a/src/db.cpp b/src/db.cpp index 1315bc2c2..0079598f2 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2018,7 +2018,7 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) { * script execution, making propagation to slaves / AOF consistent. * See issue #1525 on Github for more information. */ if (g_pserver->lua_caller) { - now = g_pserver->lua_time_start; + now = g_pserver->lua_time_snapshot; } /* If we are in the middle of a command execution, we still want to use * a reference time that does not change: in that case we just use the @@ -2079,14 +2079,17 @@ int expireIfNeeded(redisDb *db, robj *key) { if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; /* Delete the key */ + if (g_pserver->lazyfree_lazy_expire) { + dbAsyncDelete(db,key); + } else { + dbSyncDelete(db,key); + } g_pserver->stat_expiredkeys++; propagateExpire(db,key,g_pserver->lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); - int retval = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(db,key) : - dbSyncDelete(db,key); - if (retval) signalModifiedKey(NULL,db,key); - return retval; + signalModifiedKey(NULL,db,key); + return 1; } /* ----------------------------------------------------------------------------- @@ -2726,6 +2729,8 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) serverAssert(sdsKey != nullptr); serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid serverAssert(m_refCount == 0); + if (m_pdbSnapshot == nullptr && g_pserver->m_pstorageFactory == nullptr) + return; std::unique_lock ul(g_expireLock); // First see if the key can be obtained from a snapshot @@ -2986,13 +2991,13 @@ dict_iter redisDbPersistentData::random() return dict_iter(m_pdict, de); } -size_t redisDbPersistentData::size() const +size_t redisDbPersistentData::size(bool fCachedOnly) const { - if (m_spstorage != nullptr && !m_fAllChanged) + if (m_spstorage != nullptr && !m_fAllChanged && !fCachedOnly) return m_spstorage->count() + m_cnewKeysPending; return dictSize(m_pdict) - + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); + + (m_pdbSnapshot ? (m_pdbSnapshot->size(fCachedOnly) - dictSize(m_pdictTombstone)) : 0); } bool redisDbPersistentData::removeCachedValue(const char *key, dictEntry **ppde) @@ -3047,13 +3052,13 @@ void redisDbPersistentData::removeAllCachedValues() trackChanges(false); } - if (m_pdict->pauserehash == 0) { + if (m_pdict->pauserehash == 0 && m_pdict->refcount == 1) { dict *dT = m_pdict; m_pdict = dictCreate(&dbDictType, this); dictExpand(m_pdict, dictSize(dT)/2, false); // Make room for about half so we don't excessively rehash g_pserver->asyncworkqueue->AddWorkFunction([dT]{ dictRelease(dT); - }, true); + }, false); } else { dictEmpty(m_pdict, nullptr); } diff --git a/src/dict.cpp b/src/dict.cpp index de0445ea5..1ed414b69 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -535,7 +535,8 @@ dictAsyncRehashCtl::~dictAsyncRehashCtl() { while (deGCList != nullptr) { auto next = deGCList->next; dictFreeKey(dict, deGCList); - dictFreeVal(dict, deGCList); + if (deGCList->v.val != nullptr) + dictFreeVal(dict, deGCList); zfree(deGCList); deGCList = next; } @@ -694,6 +695,8 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { d->ht[table].table[idx] = he->next; if (!nofree) { if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) { + dictFreeVal(d, he); + he->v.val = nullptr; he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { @@ -752,6 +755,8 @@ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) { if (he == NULL) return; if (d->asyncdata) { + dictFreeVal(d, he); + he->v.val = nullptr; he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { @@ -775,6 +780,8 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { while(he) { nextHe = he->next; if (d->asyncdata && (ssize_t)i < d->rehashidx) { + dictFreeVal(d, he); + he->v.val = nullptr; he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { diff --git a/src/evict.cpp b/src/evict.cpp index b673e165d..84bf21c36 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -355,6 +355,8 @@ unsigned long LFUDecrAndReturn(robj_roptr o) { return counter; } +unsigned long getClientReplicationBacklogSharedUsage(client *c); + /* We don't want to count AOF buffers and slaves output buffers as * used memory: the eviction should use mostly data size. This function * returns the sum of AOF and slaves buffer. */ @@ -371,9 +373,15 @@ size_t freeMemoryGetNotCountedMemory(void) { while((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); std::unique_lock(replica->lock); - overhead += getClientOutputBufferMemoryUsage(replica); + /* we don't wish to multiple count the replication backlog shared usage */ + overhead += (getClientOutputBufferMemoryUsage(replica) - getClientReplicationBacklogSharedUsage(replica)); } } + + /* also don't count the replication backlog memory + * that's where the replication clients get their memory from */ + overhead += g_pserver->repl_backlog_size; + if (g_pserver->aof_state != AOF_OFF) { overhead += sdsalloc(g_pserver->aof_buf)+aofRewriteBufferSize(); } @@ -470,11 +478,13 @@ public: FreeMemoryLazyFree(FreeMemoryLazyFree&&) = default; ~FreeMemoryLazyFree() { + aeAcquireLock(); for (auto &pair : vecdictvecde) { for (auto de : pair.second) { dictFreeUnlinkedEntry(pair.first, de); } } + aeReleaseLock(); --s_clazyFreesInProgress; } @@ -822,9 +832,9 @@ int performEvictions(bool fPreSnapshot) { /* After some time, exit the loop early - even if memory limit * hasn't been reached. If we suddenly need to free a lot of * memory, don't want to spend too much time here. */ - if (elapsedUs(evictionTimer) > eviction_time_limit_us) { + if (g_pserver->m_pstorageFactory == nullptr && elapsedUs(evictionTimer) > eviction_time_limit_us) { // We still need to free memory - start eviction timer proc - if (!isEvictionProcRunning) { + if (!isEvictionProcRunning && serverTL->el != nullptr) { isEvictionProcRunning = 1; aeCreateTimeEvent(serverTL->el, 0, evictionTimeProc, NULL, NULL); diff --git a/src/intset.c b/src/intset.c index 93963209e..fd634ed9d 100644 --- a/src/intset.c +++ b/src/intset.c @@ -281,7 +281,7 @@ uint32_t intsetLen(const intset *is) { /* Return intset blob size in bytes. */ size_t intsetBlobLen(intset *is) { - return sizeof(intset)+intrev32ifbe(is->length)*intrev32ifbe(is->encoding); + return sizeof(intset)+(size_t)intrev32ifbe(is->length)*intrev32ifbe(is->encoding); } /* Validate the integrity of the data structure. diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index cb0ba8a8d..e4b5fb466 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -39,12 +39,11 @@ void lazyfreeFreeSlotsMap(void *args[]) { atomicIncr(lazyfreed_objects,len); } -/* Release the rax mapping Redis Cluster keys to slots in the - * lazyfree thread. */ +/* Release the key tracking table. */ void lazyFreeTrackingTable(void *args[]) { rax *rt = (rax*)args[0]; size_t len = rt->numele; - raxFree(rt); + freeTrackingRadixTree(rt); atomicDecr(lazyfree_objects,len); atomicIncr(lazyfreed_objects,len); } diff --git a/src/lolwut.c b/src/lolwut.c index eebd5da6a..931f311cd 100644 --- a/src/lolwut.c +++ b/src/lolwut.c @@ -94,8 +94,8 @@ lwCanvas *lwCreateCanvas(int width, int height, int bgcolor) { lwCanvas *canvas = zmalloc(sizeof(*canvas)); canvas->width = width; canvas->height = height; - canvas->pixels = zmalloc(width*height); - memset(canvas->pixels,bgcolor,width*height); + canvas->pixels = zmalloc((size_t)width*height); + memset(canvas->pixels,bgcolor,(size_t)width*height); return canvas; } diff --git a/src/memtest.c b/src/memtest.c index cb4d35e83..bc0ac3a66 100644 --- a/src/memtest.c +++ b/src/memtest.c @@ -71,7 +71,7 @@ void memtest_progress_start(char *title, int pass) { printf("\x1b[H\x1b[2K"); /* Cursor home, clear current line. */ printf("%s [%d]\n", title, pass); /* Print title. */ progress_printed = 0; - progress_full = ws.ws_col*(ws.ws_row-3); + progress_full = (size_t)ws.ws_col*(ws.ws_row-3); fflush(stdout); } diff --git a/src/module.cpp b/src/module.cpp index c096c1ea4..7b227d279 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -6367,7 +6367,7 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m /* If the information is not available, the function will set the * field to zero bytes, so that when the field can't be populated the * function kinda remains predictable. */ - if (node->flags & CLUSTER_NODE_MASTER && node->slaveof) + if (node->flags & CLUSTER_NODE_SLAVE && node->slaveof) memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN); else memset(master_id,0,REDISMODULE_NODE_ID_LEN); @@ -9430,6 +9430,7 @@ long moduleDefragGlobals(void) { module->defrag_cb(&defrag_ctx); defragged += defrag_ctx.defragged; } + dictReleaseIterator(di); return defragged; } diff --git a/src/motd.cpp b/src/motd.cpp index 370a11e68..795281734 100644 --- a/src/motd.cpp +++ b/src/motd.cpp @@ -1,7 +1,11 @@ +#ifdef CLIENT extern "C" { #include #include } +#else +#include "sds.h" +#endif #include #include #include @@ -15,6 +19,7 @@ extern "C" { #ifdef MOTD #include +#ifdef CLIENT extern "C" { __attribute__ ((weak)) hisds hi_sdscatlen(hisds s, const void *t, size_t len) { return sdscatlen(s, t, len); @@ -23,6 +28,7 @@ __attribute__ ((weak)) hisds hi_sdscat(hisds s, const char *t) { return sdscat(s, t); } } +#endif static const char *szMotdCachePath() { diff --git a/src/networking.cpp b/src/networking.cpp index f798caf3c..e2e033be3 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -136,6 +136,7 @@ client *createClient(connection *conn, int iel) { client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; c->id = client_id; + sprintf(c->lock.szName, "client %lu", client_id); c->resp = 2; c->conn = conn; c->name = NULL; @@ -234,6 +235,7 @@ void clientInstallWriteHandler(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the replica can actually receive * writes at this stage. */ + if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) @@ -315,7 +317,7 @@ int prepareClientToWrite(client *c) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ - if (!fAsync && !clientHasPendingReplies(c)) clientInstallWriteHandler(c); + if (!fAsync && (c->flags & CLIENT_SLAVE || !clientHasPendingReplies(c))) clientInstallWriteHandler(c); if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1129,7 +1131,7 @@ void copyClientOutputBuffer(client *dst, client *src) { /* Return true if the specified client has pending reply buffers to write to * the socket. */ int clientHasPendingReplies(client *c) { - return (c->bufpos || listLength(c->reply)); + return (c->bufpos || listLength(c->reply) || c->FPendingReplicaWrite()); } static std::atomic rgacceptsInFlight[MAX_EVENT_LOOPS]; @@ -1764,6 +1766,8 @@ client *lookupClientByID(uint64_t id) { return (c == raxNotFound) ? NULL : c; } +long long getReplIndexFromOffset(long long offset); + /* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed because of some * error. If handler_installed is set, it will attempt to clear the @@ -1781,62 +1785,99 @@ int writeToClient(client *c, int handler_installed) { serverAssertDebug(FCorrectThread(c)); std::unique_locklock)> lock(c->lock); - - while(clientHasPendingReplies(c)) { - if (c->bufpos > 0) { - nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ - if ((int)c->sentlen == c->bufpos) { - c->bufpos = 0; - c->sentlen = 0; - } - } else { - o = (clientReplyBlock*)listNodeValue(listFirst(c->reply)); - if (o->used == 0) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - continue; + /* We can only directly read from the replication backlog if the client + is a replica, so only attempt to do so if that's the case. */ + if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); + long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); + serverAssert(c->repl_curr_off != -1); + + if (c->repl_curr_off != c->repl_end_off){ + long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); + long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog + * in the event of a wrap around write */ + /* normal case with no wrap around */ + if (repl_end_idx >= repl_curr_idx){ + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); + /* wrap around case */ + } else { + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); + /* only attempt wrapping if we write the correct number of bytes */ + if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ + nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); + if (nwritten2ndStage != -1) + nwritten += nwritten2ndStage; + } } - nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen); - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == o->used) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; - /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ - if (listLength(c->reply) == 0) - serverAssert(c->reply_bytes == 0); + /* only increment bytes if an error didn't occur */ + if (nwritten > 0){ + totwritten += nwritten; + c->repl_curr_off += nwritten; + serverAssert(c->repl_curr_off <= c->repl_end_off); } + + /* If the second part of a write didn't go through, we still need to register that */ + if (nwritten2ndStage == -1) nwritten = -1; + } + } else { + while(clientHasPendingReplies(c)) { + serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); + if (c->bufpos > 0) { + nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (nwritten <= 0) break; + c->sentlen += nwritten; + totwritten += nwritten; + + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if ((int)c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + } else { + o = (clientReplyBlock*)listNodeValue(listFirst(c->reply)); + if (o->used == 0) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); + continue; + } + + nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen); + if (nwritten <= 0) break; + c->sentlen += nwritten; + totwritten += nwritten; + + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == o->used) { + c->reply_bytes -= o->size; + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + /* If there are no longer objects in the list, we expect + * the count of reply bytes to be exactly zero. */ + if (listLength(c->reply) == 0) + serverAssert(c->reply_bytes == 0); + } + } + /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT + * bytes, in a single threaded server it's a good idea to serve + * other clients as well, even if a very large request comes from + * super fast link that is always able to accept data (in real world + * scenario think about 'KEYS *' against the loopback interface). + * + * However if we are over the maxmemory limit we ignore that and + * just deliver as much data as it is possible to deliver. + * + * Moreover, we also send as much as possible if the client is + * a replica or a monitor (otherwise, on high-speed traffic, the + * replication/output buffer will grow indefinitely) */ + if (totwritten > NET_MAX_WRITES_PER_EVENT && + (g_pserver->maxmemory == 0 || + zmalloc_used_memory() < g_pserver->maxmemory) && + !(c->flags & CLIENT_SLAVE)) break; } - /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT - * bytes, in a single threaded server it's a good idea to serve - * other clients as well, even if a very large request comes from - * super fast link that is always able to accept data (in real world - * scenario think about 'KEYS *' against the loopback interface). - * - * However if we are over the maxmemory limit we ignore that and - * just deliver as much data as it is possible to deliver. - * - * Moreover, we also send as much as possible if the client is - * a replica or a monitor (otherwise, on high-speed traffic, the - * replication/output buffer will grow indefinitely) */ - if (totwritten > NET_MAX_WRITES_PER_EVENT && - (g_pserver->maxmemory == 0 || - zmalloc_used_memory() < g_pserver->maxmemory) && - !(c->flags & CLIENT_SLAVE)) break; } - g_pserver->stat_net_output_bytes += totwritten; if (nwritten == -1) { if (connGetState(c->conn) != CONN_STATE_CONNECTED) { @@ -1898,27 +1939,37 @@ void ProcessPendingAsyncWrites() serverAssert(c->fPendingAsyncWrite); if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) { - zfree(c->replyAsync); - c->replyAsync = nullptr; + if (c->replyAsync != nullptr){ + zfree(c->replyAsync); + c->replyAsync = nullptr; + } c->fPendingAsyncWrite = FALSE; continue; } - int size = c->replyAsync->used; + /* since writes from master to replica can come directly from the replication backlog, + * writes may have been signalled without having been copied to the replyAsync buffer, + * thus causing the buffer to be NULL */ + if (c->replyAsync != nullptr){ + int size = c->replyAsync->used; - if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) { - memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size); - c->bufpos += size; - } else { - c->reply_bytes += c->replyAsync->size; - listAddNodeTail(c->reply, c->replyAsync); + if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) { + memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size); + c->bufpos += size; + } else { + c->reply_bytes += c->replyAsync->size; + listAddNodeTail(c->reply, c->replyAsync); + c->replyAsync = nullptr; + } + + zfree(c->replyAsync); c->replyAsync = nullptr; + } else { + /* Only replicas should have empty async reply buffers */ + serverAssert(c->flags & CLIENT_SLAVE); } - zfree(c->replyAsync); - c->replyAsync = nullptr; c->fPendingAsyncWrite = FALSE; - // Now install the write event handler int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; /* For the fsync=always policy, we want that a given FD is never @@ -1931,17 +1982,17 @@ void ProcessPendingAsyncWrites() { ae_flags |= AE_BARRIER; } - + if (!((c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) continue; - + asyncCloseClientOnOutputBufferLimitReached(c); if (c->flags & CLIENT_CLOSE_ASAP) continue; // we will never write this so don't post an op - + std::atomic_thread_fence(std::memory_order_seq_cst); - + if (FCorrectThread(c)) { prepareClientToWrite(c); // queue an event @@ -1968,7 +2019,6 @@ void ProcessPendingAsyncWrites() * need to use a syscall in order to install the writable event handler, * get it called, and so forth. */ int handleClientsWithPendingWrites(int iel, int aof_state) { - std::unique_lock lockf(g_pserver->rgthreadvar[iel].lockPendingWrite); int processed = 0; serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); @@ -1991,7 +2041,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { ae_flags |= AE_BARRIER; } + std::unique_lock lockf(g_pserver->rgthreadvar[iel].lockPendingWrite); auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write); + lockf.unlock(); processed += (int)vec.size(); for (client *c : vec) { @@ -2025,8 +2077,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { - if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) + if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) { freeClientAsync(c); + } } } @@ -3647,6 +3700,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } } +/* In the case of a replica client, writes to said replica are using data from the replication backlog + * as opposed to it's own internal buffer, this number should keep track of that */ +unsigned long getClientReplicationBacklogSharedUsage(client *c) { + return (!(c->flags & CLIENT_SLAVE) || !c->FPendingReplicaWrite() ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; +} + /* This function returns the number of bytes that Redis is * using to store the reply still not read by the client. * @@ -3655,9 +3714,11 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * enforcing the client output length limits. */ unsigned long getClientOutputBufferMemoryUsage(client *c) { unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); - return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0); + return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0) + getClientReplicationBacklogSharedUsage(c); } + + /* Get the class of a client, used in order to enforce limits to different * classes of clients. * diff --git a/src/object.cpp b/src/object.cpp index b86066c52..6ecaf3ba5 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -902,7 +902,7 @@ size_t objectComputeSize(robj_roptr o, size_t sample_size) { if (samples) asize += (double)elesize/samples*dictSize(d); } else if (o->encoding == OBJ_ENCODING_INTSET) { intset *is = (intset*)ptrFromObj(o); - asize = sizeof(*o)+sizeof(*is)+is->encoding*is->length; + asize = sizeof(*o)+sizeof(*is)+(size_t)is->encoding*is->length; } else { serverPanic("Unknown set encoding"); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 500291794..cef3d15dd 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2788,6 +2788,8 @@ public: if (fHighMemory) performEvictions(false /* fPreSnapshot*/); } + g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); + serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); } } @@ -2916,6 +2918,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { /* Key-specific attributes, set by opcodes before the key type. */ long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now; long long lru_clock = 0; + unsigned long long ckeysLoaded = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; now = mstime(); rdbAsyncWorkThread wqueue(rsi, rdbflags, now); @@ -3189,6 +3192,13 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; fLastKeyExpired = fStaleMvccKey || fExpiredKey; + ckeysLoaded++; + if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0) + { + g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); + serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); + } + if (g_pserver->key_load_delay) debugDelay(g_pserver->key_load_delay); diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 07403a041..2f217822c 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -103,7 +103,6 @@ static struct config { int randomkeys_keyspacelen; int keepalive; int pipeline; - int showerrors; long long start; long long totlatency; const char *title; @@ -313,7 +312,9 @@ static redisContext *getRedisContext(const char *ip, int port, fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str); else fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str); - goto cleanup; + freeReplyObject(reply); + redisFree(ctx); + exit(1); } freeReplyObject(reply); return ctx; @@ -370,9 +371,15 @@ fail: fprintf(stderr, "ERROR: failed to fetch CONFIG from "); if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port); else fprintf(stderr, "%s\n", hostsocket); + int abort_test = 0; + if (!strncmp(reply->str,"NOAUTH",5) || + !strncmp(reply->str,"WRONGPASS",9) || + !strncmp(reply->str,"NOPERM",5)) + abort_test = 1; freeReplyObject(reply); redisFree(c); freeRedisConfig(cfg); + if (abort_test) exit(1); return NULL; } static void freeRedisConfig(redisConfig *cfg) { @@ -517,44 +524,39 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { exit(1); } redisReply *r = (redisReply*)reply; - int is_err = (r->type == REDIS_REPLY_ERROR); - - if (is_err && config.showerrors) { - /* TODO: static lasterr_time not thread-safe */ - static time_t lasterr_time = 0; - time_t now = time(NULL); - if (lasterr_time != now) { - lasterr_time = now; - if (c->cluster_node) { - printf("Error from server %s:%d: %s\n", + if (r->type == REDIS_REPLY_ERROR) { + /* Try to update slots configuration if reply error is + * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command + * contain(s) the slot hash tag. + * If the error is not topology-update related then we + * immediately exit to avoid false results. */ + if (c->cluster_node && c->staglen) { + int fetch_slots = 0, do_wait = 0; + if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3)) + fetch_slots = 1; + else if (!strncmp(r->str,"CLUSTERDOWN",11)) { + /* Usually the cluster is able to recover itself after + * a CLUSTERDOWN error, so try to sleep one second + * before requesting the new configuration. */ + fetch_slots = 1; + do_wait = 1; + printf("Error from server %s:%d: %s.\n", c->cluster_node->ip, c->cluster_node->port, r->str); + } + if (do_wait) sleep(1); + if (fetch_slots && !fetchClusterSlotsConfiguration(c)) + exit(1); + } else { + if (c->cluster_node) { + printf("Error from server %s:%d: %s\n", + c->cluster_node->ip, + c->cluster_node->port, + r->str); } else printf("Error from server: %s\n", r->str); - } - } - - /* Try to update slots configuration if reply error is - * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command - * contain(s) the slot hash tag. */ - if (is_err && c->cluster_node && c->staglen) { - int fetch_slots = 0, do_wait = 0; - if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3)) - fetch_slots = 1; - else if (!strncmp(r->str,"CLUSTERDOWN",11)) { - /* Usually the cluster is able to recover itself after - * a CLUSTERDOWN error, so try to sleep one second - * before requesting the new configuration. */ - fetch_slots = 1; - do_wait = 1; - printf("Error from server %s:%d: %s\n", - c->cluster_node->ip, - c->cluster_node->port, - r->str); - } - if (do_wait) sleep(1); - if (fetch_slots && !fetchClusterSlotsConfiguration(c)) exit(1); + } } freeReplyObject(reply); @@ -1302,8 +1304,7 @@ static int fetchClusterSlotsConfiguration(client c) { atomicGetIncr(config.is_fetching_slots, is_fetching_slots, 1); if (is_fetching_slots) return -1; //TODO: use other codes || errno ? atomicSet(config.is_fetching_slots, 1); - if (config.showerrors) - printf("Cluster slots configuration changed, fetching new one...\n"); + printf("WARNING: Cluster slots configuration changed, fetching new one...\n"); const char *errmsg = "Failed to update cluster slots configuration"; static dictType dtype = { dictSdsHash, /* hash function */ @@ -1479,7 +1480,8 @@ int parseOptions(int argc, const char **argv) { } else if (!strcmp(argv[i],"-I")) { config.idlemode = 1; } else if (!strcmp(argv[i],"-e")) { - config.showerrors = 1; + printf("WARNING: -e option has been deprecated. " + "We now immediatly exit on error to avoid false results.\n"); } else if (!strcmp(argv[i],"-t")) { if (lastarg) goto invalid; /* We get the list of tests to run as a string in the form @@ -1582,8 +1584,6 @@ usage: " is executed. Default tests use this to hit random keys in the\n" " specified range.\n" " -P Pipeline requests. Default 1 (no pipeline).\n" -" -e If server replies with errors, show them on stdout.\n" -" (no more than 1 error per second is displayed)\n" " -q Quiet. Just show query/sec values\n" " --precision Number of decimal places to display in latency output (default 0)\n" " --csv Output in CSV format\n" @@ -1711,7 +1711,6 @@ int main(int argc, const char **argv) { config.keepalive = 1; config.datasize = 3; config.pipeline = 1; - config.showerrors = 0; config.randomkeys = 0; config.randomkeys_keyspacelen = 0; config.quiet = 0; @@ -1794,8 +1793,9 @@ int main(int argc, const char **argv) { } else { config.redis_config = getRedisConfig(config.hostip, config.hostport, config.hostsocket); - if (config.redis_config == NULL) + if (config.redis_config == NULL) { fprintf(stderr, "WARN: could not fetch server CONFIG\n"); + } } if (config.num_threads > 0) { pthread_mutex_init(&(config.liveclients_mutex), NULL); @@ -1958,8 +1958,8 @@ int main(int argc, const char **argv) { } if (test_is_selected("lrange") || test_is_selected("lrange_500")) { - len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 449",tag); - benchmark("LRANGE_500 (first 450 elements)",cmd,len); + len = redisFormatCommand(&cmd,"LRANGE mylist%s 0 499",tag); + benchmark("LRANGE_500 (first 500 elements)",cmd,len); free(cmd); } @@ -1987,6 +1987,7 @@ int main(int argc, const char **argv) { } while(config.loop); zfree(data); + zfree(data); if (config.redis_config != NULL) freeRedisConfig(config.redis_config); return 0; diff --git a/src/redis-check-aof.cpp b/src/redis-check-aof.cpp index de9ab1f77..09bc66023 100644 --- a/src/redis-check-aof.cpp +++ b/src/redis-check-aof.cpp @@ -39,12 +39,14 @@ static char error[1044]; static off_t epos; +static long long line = 1; int consumeNewline(char *buf) { if (strncmp(buf,"\r\n",2) != 0) { ERROR("Expected \\r\\n, got: %02x%02x",buf[0],buf[1]); return 0; } + line += 1; return 1; } @@ -201,8 +203,8 @@ int redis_check_aof_main(int argc, char **argv) { off_t pos = process(fp); off_t diff = size-pos; - printf("AOF analyzed: size=%lld, ok_up_to=%lld, diff=%lld\n", - (long long) size, (long long) pos, (long long) diff); + printf("AOF analyzed: size=%lld, ok_up_to=%lld, ok_up_to_line=%lld, diff=%lld\n", + (long long) size, (long long) pos, line, (long long) diff); if (diff > 0) { if (fix) { char buf[2]; diff --git a/src/redis-check-rdb.cpp b/src/redis-check-rdb.cpp index 1dbf47fbd..954bb34c8 100644 --- a/src/redis-check-rdb.cpp +++ b/src/redis-check-rdb.cpp @@ -250,7 +250,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) { rdbstate.doing = RDB_CHECK_DOING_READ_LEN; if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR) goto eoferr; - rdbCheckInfo("Selecting DB ID %d", dbid); + rdbCheckInfo("Selecting DB ID %llu", (unsigned long long)dbid); continue; /* Read type again. */ } else if (type == RDB_OPCODE_RESIZEDB) { /* RESIZEDB: Hint about the size of the keys in the currently diff --git a/src/redis-cli.c b/src/redis-cli.c index 3ccbc4105..7e1e7e08e 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -472,7 +472,7 @@ static void cliOutputHelp(int argc, char **argv) { help = entry->org; if (group == -1) { /* Compare all arguments */ - if (argc == entry->argc) { + if (argc <= entry->argc) { for (j = 0; j < argc; j++) { if (strcasecmp(argv[j],entry->argv[j]) != 0) break; } @@ -653,7 +653,9 @@ static int cliConnect(int flags) { cliRefreshPrompt(); } - if (config.hostsocket == NULL) { + /* Do not use hostsocket when we got redirected in cluster mode */ + if (config.hostsocket == NULL || + (config.cluster_mode && config.cluster_reissue_command)) { context = redisConnect(config.hostip,config.hostport); } else { context = redisConnectUnix(config.hostsocket); @@ -4559,7 +4561,7 @@ static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array) { static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array, clusterManagerNode **nodeptr) { - assert(array->nodes < (array->nodes + array->len)); + assert(array->len > 0); /* If the first node to be shifted is not NULL, decrement count. */ if (*array->nodes != NULL) array->count--; /* Store the first node to be shifted into 'nodeptr'. */ @@ -4572,7 +4574,7 @@ static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array, static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array, clusterManagerNode *node) { - assert(array->nodes < (array->nodes + array->len)); + assert(array->len > 0); assert(node != NULL); assert(array->count < array->len); array->nodes[array->count++] = node; @@ -5949,7 +5951,7 @@ void showLatencyDistSamples(struct distsamples *samples, long long tot) { printf("\033[38;5;0m"); /* Set foreground color to black. */ for (j = 0; ; j++) { int coloridx = - ceil((float) samples[j].count / tot * (spectrum_palette_size-1)); + ceil((double) samples[j].count / tot * (spectrum_palette_size-1)); int color = spectrum_palette[coloridx]; printf("\033[48;5;%dm%c", (int)color, samples[j].character); samples[j].count = 0; diff --git a/src/replication.cpp b/src/replication.cpp index 1cc444a54..2725a9ce4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -189,6 +189,7 @@ void createReplicationBacklog(void) { g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); g_pserver->repl_backlog_histlen = 0; g_pserver->repl_backlog_idx = 0; + g_pserver->repl_backlog_start = g_pserver->master_repl_offset; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the @@ -200,6 +201,15 @@ void createReplicationBacklog(void) { g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } +/* Compute the corresponding index from a replication backlog offset + * Since this computation needs the size of the replication backlog, + * you need to have the repl_backlog_lock in order to call it */ +long long getReplIndexFromOffset(long long offset){ + serverAssert(g_pserver->repl_backlog_lock.fOwnLock()); + long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size; + return index; +} + /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the * g_pserver->repl_backlog_size and to resize the buffer and setup it so that @@ -211,6 +221,8 @@ void resizeReplicationBacklog(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); + if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. @@ -218,19 +230,23 @@ void resizeReplicationBacklog(long long newsize) { * worse often we need to alloc additional space before freeing the * old buffer. */ - if (g_pserver->repl_batch_idxStart >= 0) { - // We need to keep critical data so we can't shrink less than the hot data in the buffer - newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart); - char *backlog = (char*)zmalloc(newsize); - g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart; + /* get the critical client size, i.e. the size of the data unflushed to clients */ + long long earliest_off = g_pserver->repl_lowest_off.load(); - if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { - auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; - memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog); + if (earliest_off != -1) { + // We need to keep critical data so we can't shrink less than the hot data in the buffer + newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off); + char *backlog = (char*)zmalloc(newsize); + g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off; + long long earliest_idx = getReplIndexFromOffset(earliest_off); + + if (g_pserver->repl_backlog_idx >= earliest_idx) { + auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx; + memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog); serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); } else { - auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; - memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); + auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx; + memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbPhase1); memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); @@ -238,7 +254,12 @@ void resizeReplicationBacklog(long long newsize) { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = backlog; g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; - g_pserver->repl_batch_idxStart = 0; + if (g_pserver->repl_batch_idxStart >= 0) { + g_pserver->repl_batch_idxStart -= earliest_idx; + if (g_pserver->repl_batch_idxStart < 0) + g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size; + } + g_pserver->repl_backlog_start = earliest_off; } else { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = (char*)zmalloc(newsize); @@ -246,11 +267,13 @@ void resizeReplicationBacklog(long long newsize) { g_pserver->repl_backlog_idx = 0; /* Next byte we have is... the next since the buffer is empty. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + g_pserver->repl_backlog_start = g_pserver->master_repl_offset; } } g_pserver->repl_backlog_size = newsize; } + void freeReplicationBacklog(void) { serverAssert(GlobalLocksAcquired()); listIter li; @@ -273,18 +296,63 @@ void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); const unsigned char *p = (const unsigned char*)ptr; - if (g_pserver->repl_batch_idxStart >= 0) { - long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; - if (minimumsize > g_pserver->repl_backlog_size) { - flushReplBacklogToClients(); - serverAssert(g_pserver->master_repl_offset == g_pserver->repl_batch_offStart); - minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; - if (minimumsize > g_pserver->repl_backlog_size && minimumsize < (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes) { + if (g_pserver->repl_batch_idxStart >= 0) { + /* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */ + long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); + if (lower_bound == -1) + lower_bound = g_pserver->repl_batch_offStart; + long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; + + if (minimumsize > g_pserver->repl_backlog_size) { + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes; + if (maxClientBuffer <= 0) + maxClientBuffer = LLONG_MAX; // infinite essentially + long long min_offset = LLONG_MAX; + int listening_replicas = 0; + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + if (!canFeedReplicaReplBuffer(replica)) continue; + if (replica->flags & CLIENT_CLOSE_ASAP) continue; + + std::unique_lock ul(replica->lock); + + // Would this client overflow? If so close it + long long neededBuffer = g_pserver->master_repl_offset + len - replica->repl_curr_off + 1; + if (neededBuffer > maxClientBuffer) { + + sds clientInfo = catClientInfoString(sdsempty(),replica); + freeClientAsync(replica); + serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP due to exceeding output buffer hard limit.", clientInfo); + sdsfree(clientInfo); + continue; + } + min_offset = std::min(min_offset, replica->repl_curr_off); + ++listening_replicas; + } + + if (min_offset == LLONG_MAX) { + min_offset = g_pserver->repl_batch_offStart; + g_pserver->repl_lowest_off = -1; + } else { + g_pserver->repl_lowest_off = min_offset; + } + + minimumsize = g_pserver->master_repl_offset + len - min_offset + 1; + serverAssert(listening_replicas == 0 || minimumsize <= maxClientBuffer); + + if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) { // This is an emergency overflow, we better resize to fit long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); - serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); + serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld bytes", newsize); resizeReplicationBacklog(newsize); + } else if (!listening_replicas) { + // We need to update a few variables or later asserts will notice we dropped data + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len; + g_pserver->repl_lowest_off = -1; } } } @@ -293,6 +361,7 @@ void feedReplicationBacklog(const void *ptr, size_t len) { /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ + while(len) { size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx; if (thislen > len) thislen = len; @@ -479,7 +548,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) if (fSendRaw) { char aux[LONG_STR_SIZE+3]; - /* Add the multi bulk reply length. */ aux[0] = '*'; int multilen = ll2string(aux+1,sizeof(aux)-1,argc); @@ -653,15 +721,19 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, decrRefCount(cmdobj); } +int prepareClientToWrite(client *c); + /* Feed the replica 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { - long long j, skip, len; + long long skip, len; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); if (g_pserver->repl_backlog_histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); + c->repl_curr_off = g_pserver->master_repl_offset; + c->repl_end_off = g_pserver->master_repl_offset; return 0; } @@ -678,31 +750,19 @@ long long addReplyReplicationBacklog(client *c, long long offset) { skip = offset - g_pserver->repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - /* Point j to the oldest byte, that is actually our - * g_pserver->repl_backlog_off byte. */ - j = (g_pserver->repl_backlog_idx + - (g_pserver->repl_backlog_size-g_pserver->repl_backlog_histlen)) % - g_pserver->repl_backlog_size; - serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); - - /* Discard the amount of data to seek to the specified 'offset'. */ - j = (j + skip) % g_pserver->repl_backlog_size; - - /* Feed replica with data. Since it is a circular buffer we have to - * split the reply in two parts if we are cross-boundary. */ len = g_pserver->repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); - while(len) { - long long thislen = - ((g_pserver->repl_backlog_size - j) < len) ? - (g_pserver->repl_backlog_size - j) : len; - serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); - addReplySds(c,sdsnewlen(g_pserver->repl_backlog + j, thislen)); - len -= thislen; - j = 0; - } - return g_pserver->repl_backlog_histlen - skip; + /* Set the start and end offsets for the replica so that a future + * writeToClient will send the backlog from the given offset to + * the current end of the backlog to said replica */ + c->repl_curr_off = offset - 1; + c->repl_end_off = g_pserver->master_repl_offset; + + /* Force the partial sync to be queued */ + prepareClientToWrite(c); + + return len; } /* Return the offset to provide as reply to the PSYNC command received @@ -735,6 +795,10 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) { replica->psync_initial_offset = offset; replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + + replica->repl_curr_off = offset; + replica->repl_end_off = g_pserver->master_repl_offset; + /* We are going to accumulate the incremental changes for this * replica as well. Set replicaseldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ @@ -1357,6 +1421,7 @@ void replconfCommand(client *c) { * 4) Update the count of "good replicas". */ void putSlaveOnline(client *replica) { replica->replstate = SLAVE_STATE_ONLINE; + replica->repl_put_online_on_ack = 0; replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */ @@ -3059,6 +3124,11 @@ void syncWithMaster(connection *conn) { if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); + /* Reset the bulklen information in case it is lingering from the last connection + * The partial sync will start from the beginning of a command so these should be reset */ + mi->master->reqtype = 0; + mi->master->multibulklen = 0; + mi->master->bulklen = -1; if (cserver.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n"); } @@ -4287,6 +4357,8 @@ void replicationCron(void) { replicationStartPendingFork(); + trimReplicationBacklog(); + /* Remove the RDB file used for replication if Redis is not running * with any persistence. */ removeRDBUsedToSyncReplicas(); @@ -4898,15 +4970,19 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long } void _clientAsyncReplyBufferReserve(client *c, size_t len); + void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); + /* If we have the repl backlog lock, we will deadlock */ + serverAssert(!g_pserver->repl_backlog_lock.fOwnLock()); if (g_pserver->repl_batch_offStart < 0) return; if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { bool fAsyncWrite = false; - + long long min_offset = LONG_LONG_MAX; + // Ensure no overflow serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset); if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) { // We overflowed @@ -4930,33 +5006,32 @@ void flushReplBacklogToClients() listIter li; listNode *ln; listRewind(g_pserver->slaves, &li); + /* We don't actually write any data in this function since we send data + * directly from the replication backlog to replicas in writeToClient. + * + * What we do however, is set the end offset of each replica here. This way, + * future calls to writeToClient will know up to where in the replication + * backlog is valid for writing. */ while ((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); if (!canFeedReplicaReplBuffer(replica)) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; - std::unique_lock ul(replica->lock, std::defer_lock); - if (FCorrectThread(replica)) - ul.lock(); - else + std::unique_lock ul(replica->lock); + if (!FCorrectThread(replica)) fAsyncWrite = true; - - if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { - long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; - serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy); - serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy)); - serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size); - - addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); - } else { - auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; - if (fAsyncWrite) - _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); - addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); - addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); - serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart)); - } + + /* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */ + serverAssert(replica->repl_curr_off != -1); + + min_offset = std::min(min_offset, replica->repl_curr_off); + + replica->repl_end_off = g_pserver->master_repl_offset; + + /* Only if the there isn't already a pending write do we prepare the client to write */ + serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset); + prepareClientToWrite(replica); } if (fAsyncWrite) ProcessPendingAsyncWrites(); @@ -4965,7 +5040,8 @@ LDone: // This may be called multiple times per "frame" so update with our progress flushing to clients g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - } + g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst); + } } @@ -5041,3 +5117,17 @@ void updateFailoverStatus(void) { g_pserver->target_replica_port); } } + +// If we automatically grew the backlog we need to trim it back to +// the config setting when possible +void trimReplicationBacklog() { + serverAssert(GlobalLocksAcquired()); + serverAssert(g_pserver->repl_batch_offStart < 0); // we shouldn't be in a batch + if (g_pserver->repl_backlog_size <= g_pserver->repl_backlog_config_size) + return; // We're already a good size + if (g_pserver->repl_lowest_off > 0 && (g_pserver->master_repl_offset - g_pserver->repl_lowest_off + 1) > g_pserver->repl_backlog_config_size) + return; // There is untransmitted data we can't truncate + + serverLog(LL_NOTICE, "Reclaiming %lld replication backlog bytes", g_pserver->repl_backlog_size - g_pserver->repl_backlog_config_size); + resizeReplicationBacklog(g_pserver->repl_backlog_config_size); +} \ No newline at end of file diff --git a/src/scripting.cpp b/src/scripting.cpp index e7f0a082a..d59080660 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -31,6 +31,7 @@ #include "sha1.h" #include "rand.h" #include "cluster.h" +#include "monotonic.h" extern "C" { #include @@ -1437,7 +1438,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) { /* This is the Lua script "count" hook that we use to detect scripts timeout. */ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) { - long long elapsed = mstime() - g_pserver->lua_time_start; + long long elapsed = elapsedMs(g_pserver->lua_time_start); UNUSED(ar); UNUSED(lua); @@ -1591,7 +1592,8 @@ void evalGenericCommand(client *c, int evalsha) { serverTL->in_eval = 1; g_pserver->lua_caller = c; g_pserver->lua_cur_script = funcname + 2; - g_pserver->lua_time_start = mstime(); + g_pserver->lua_time_start = getMonotonicUs(); + g_pserver->lua_time_snapshot = mstime(); g_pserver->lua_kill = 0; if (g_pserver->lua_time_limit > 0 && ldb.active == 0) { lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000); @@ -2750,7 +2752,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) { /* Check if a timeout occurred. */ if (ar->event == LUA_HOOKCOUNT && ldb.step == 0 && bp == 0) { - mstime_t elapsed = mstime() - g_pserver->lua_time_start; + mstime_t elapsed = elapsedMs(g_pserver->lua_time_start); mstime_t timelimit = g_pserver->lua_time_limit ? g_pserver->lua_time_limit : 5000; if (elapsed >= timelimit) { @@ -2780,6 +2782,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) { lua_pushstring(lua, "timeout during Lua debugging with client closing connection"); lua_error(lua); } - g_pserver->lua_time_start = mstime(); + g_pserver->lua_time_start = getMonotonicUs(); + g_pserver->lua_time_snapshot = mstime(); } } diff --git a/src/sentinel.cpp b/src/sentinel.cpp index c879f235d..ffa0ff60c 100644 --- a/src/sentinel.cpp +++ b/src/sentinel.cpp @@ -4131,16 +4131,16 @@ void sentinelSetCommand(client *c) { int numargs = j-old_j+1; switch(numargs) { case 2: - sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",ptrFromObj(c->argv[old_j]), - ptrFromObj(c->argv[old_j+1])); + sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",szFromObj(c->argv[old_j]), + szFromObj(c->argv[old_j+1])); break; case 3: - sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",ptrFromObj(c->argv[old_j]), - ptrFromObj(c->argv[old_j+1]), - ptrFromObj(c->argv[old_j+2])); + sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",szFromObj(c->argv[old_j]), + szFromObj(c->argv[old_j+1]), + szFromObj(c->argv[old_j+2])); break; default: - sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",ptrFromObj(c->argv[old_j])); + sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",szFromObj(c->argv[old_j])); break; } } diff --git a/src/server.cpp b/src/server.cpp index 8bcc67d68..394d1b811 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2021,7 +2021,6 @@ void clientsCron(int iel) { while(listLength(g_pserver->clients) && iterations--) { client *c; listNode *head; - /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */ @@ -2125,7 +2124,7 @@ void databasesCron(bool fMainThread) { ::dict *dict = g_pserver->db[rehash_db]->dictUnsafeKeyOnly(); /* Are we async rehashing? And if so is it time to re-calibrate? */ /* The recalibration limit is a prime number to ensure balancing across threads */ - if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1) { + if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1 && dictSize(dict) > 2048 && dictIsRehashing(dict) && !g_pserver->loading) { serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms); ++async_rehashes; } @@ -3261,6 +3260,7 @@ void initServerConfig(void) { g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER; g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; g_pserver->master_repl_offset = 0; + g_pserver->repl_lowest_off.store(-1, std::memory_order_seq_cst); /* Replication partial resync backlog */ g_pserver->repl_backlog = NULL; @@ -4677,6 +4677,8 @@ int processCommand(client *c, int callFlags) { return C_OK; } + int is_read_command = (c->cmd->flags & CMD_READONLY) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_READONLY)); int is_write_command = (c->cmd->flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) || @@ -4895,7 +4897,7 @@ int processCommand(client *c, int callFlags) { c->cmd->proc != discardCommand && c->cmd->proc != watchCommand && c->cmd->proc != unwatchCommand && - c->cmd->proc != resetCommand && + c->cmd->proc != resetCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') && @@ -4907,6 +4909,14 @@ int processCommand(client *c, int callFlags) { return C_OK; } + /* Prevent a replica from sending commands that access the keyspace. + * The main objective here is to prevent abuse of client pause check + * from which replicas are exempt. */ + if ((c->flags & CLIENT_SLAVE) && (is_may_replicate_command || is_write_command || is_read_command)) { + rejectCommandFormat(c, "Replica can't interract with the keyspace"); + return C_OK; + } + /* If the server is paused, block the client until * the pause has ended. Replicas are never paused. */ if (!(c->flags & CLIENT_SLAVE) && @@ -6102,10 +6112,11 @@ sds genRedisInfoString(const char *section) { if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Keyspace\r\n"); for (j = 0; j < cserver.dbnum; j++) { - long long keys, vkeys; + long long keys, vkeys, cachedKeys; keys = g_pserver->db[j]->size(); vkeys = g_pserver->db[j]->expireSize(); + cachedKeys = g_pserver->db[j]->size(true /* fCachedOnly */); // Adjust TTL by the current time mstime_t mstime; @@ -6117,8 +6128,8 @@ sds genRedisInfoString(const char *section) { if (keys || vkeys) { info = sdscatprintf(info, - "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", - j, keys, vkeys, static_cast(g_pserver->db[j]->avg_ttl)); + "db%d:keys=%lld,expires=%lld,avg_ttl=%lld,cached_keys=%lld\r\n", + j, keys, vkeys, static_cast(g_pserver->db[j]->avg_ttl), cachedKeys); } } } @@ -6136,7 +6147,11 @@ sds genRedisInfoString(const char *section) { "variant:enterprise\r\n" "license_status:%s\r\n" "mvcc_depth:%d\r\n", +#ifdef NO_LICENSE_CHECK + "OK", +#else cserver.license_key ? "OK" : "Trial", +#endif mvcc_depth ); } @@ -7012,9 +7027,10 @@ void OnTerminate() void wakeTimeThread() { updateCachedTime(); std::lock_guard lock(time_thread_mutex); + if (sleeping_threads >= cserver.cthreads) + time_thread_cv.notify_one(); sleeping_threads--; serverAssert(sleeping_threads >= 0); - time_thread_cv.notify_one(); } void *timeThreadMain(void*) { @@ -7085,6 +7101,8 @@ static void validateConfiguration() serverLog(LL_WARNING, "\tKeyDB will now exit. Please update your configuration file."); exit(EXIT_FAILURE); } + + g_pserver->repl_backlog_config_size = g_pserver->repl_backlog_size; // this is normally set in the update logic, but not on initial config } int iAmMaster(void) { diff --git a/src/server.h b/src/server.h index 41fec843b..d35f12c4a 100644 --- a/src/server.h +++ b/src/server.h @@ -1095,7 +1095,7 @@ public: redisDbPersistentData(redisDbPersistentData &&) = default; size_t slots() const { return dictSlots(m_pdict); } - size_t size() const; + size_t size(bool fCachedOnly = false) const; void expand(uint64_t slots) { dictExpand(m_pdict, slots); } void trackkey(robj_roptr o, bool fUpdate) @@ -1591,6 +1591,12 @@ struct client { long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this replica output buffer should use. */ + + long long repl_curr_off = -1;/* Replication offset of the replica, also where in the backlog we need to start from + * when sending data to this replica. */ + long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset + * to prevent needing the global lock */ + char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ char *slave_addr; /* Optionally given by REPLCONF ip-address */ @@ -1652,6 +1658,10 @@ struct client { robj **argv; size_t argv_len_sumActive = 0; + bool FPendingReplicaWrite() const { + return repl_curr_off != repl_end_off; + } + // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; @@ -1991,7 +2001,7 @@ public: // Per-thread variabels that may be accessed without a lock struct redisServerThreadVars { - aeEventLoop *el; + aeEventLoop *el = nullptr; socketFds ipfd; /* TCP socket file descriptors */ socketFds tlsfd; /* TLS socket file descriptors */ int in_eval; /* Are we inside EVAL? */ @@ -2355,11 +2365,15 @@ struct redisServer { int repl_ping_slave_period; /* Master pings the replica every N seconds */ char *repl_backlog; /* Replication backlog for partial syncs */ long long repl_backlog_size; /* Backlog circular buffer size */ + long long repl_backlog_config_size; /* The repl backlog may grow but we want to know what the user set it to */ long long repl_backlog_histlen; /* Backlog actual data length */ long long repl_backlog_idx; /* Backlog circular buffer current offset, that is the next byte will'll write to.*/ long long repl_backlog_off; /* Replication "master offset" of first byte in the replication backlog buffer.*/ + long long repl_backlog_start; /* Used to compute indicies from offsets + basically, index = (offset - start) % size */ + fastlock repl_backlog_lock {"replication backlog"}; time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. @@ -2371,6 +2385,8 @@ struct redisServer { int repl_diskless_load; /* Slave parse RDB directly from the socket. * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ + std::atomic repl_lowest_off; /* The lowest offset amongst all replicas + -1 if there are no replicas */ /* Replication (replica) */ list *masters; int enable_multimaster; @@ -2475,7 +2491,8 @@ struct redisServer { ::dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */ mstime_t lua_time_limit; /* Script timeout in milliseconds */ - mstime_t lua_time_start; /* Start time of script, milliseconds time */ + monotime lua_time_start; /* monotonic timer to detect timed-out script */ + mstime_t lua_time_snapshot; /* Snapshot of mstime when script is started */ int lua_write_dirty; /* True if a write command was called during the execution of the current script. */ int lua_random_dirty; /* True if a random command was called during the @@ -2865,6 +2882,7 @@ void disableTracking(client *c); void trackingRememberKeys(client *c); void trackingInvalidateKey(client *c, robj *keyobj); void trackingInvalidateKeysOnFlush(int async); +void freeTrackingRadixTree(rax *rt); void freeTrackingRadixTreeAsync(rax *rt); void trackingLimitUsedSlots(void); uint64_t trackingGetTotalItems(void); @@ -3014,6 +3032,8 @@ void clearFailoverState(void); void updateFailoverStatus(void); void abortFailover(redisMaster *mi, const char *err); const char *getFailoverStateString(); +int canFeedReplicaReplBuffer(client *replica); +void trimReplicationBacklog(); /* Generic persistence functions */ void startLoadingFile(FILE* fp, const char * filename, int rdbflags); @@ -3716,6 +3736,8 @@ void mixDigest(unsigned char *digest, const void *ptr, size_t len); void xorDigest(unsigned char *digest, const void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags); + + int moduleGILAcquiredByModule(void); extern int g_fInCrash; static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate @@ -3783,6 +3805,7 @@ void tlsCleanup(void); int tlsConfigure(redisTLSContextConfig *ctx_config); + class ShutdownException {}; @@ -3794,3 +3817,5 @@ class ShutdownException int iAmMaster(void); #endif + + diff --git a/src/t_string.cpp b/src/t_string.cpp index 6cbc64443..c1f2e134a 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -810,7 +810,7 @@ void stralgoLCS(client *c) { /* Setup an uint32_t array to store at LCS[i,j] the length of the * LCS A0..i-1, B0..j-1. Note that we have a linear array here, so * we index it as LCS[j+(blen+1)*j] */ - uint32_t *lcs = (uint32_t*)zmalloc((alen+1)*(blen+1)*sizeof(uint32_t)); + uint32_t *lcs = (uint32_t*)zmalloc((size_t)(alen+1)*(blen+1)*sizeof(uint32_t)); #define LCS(A,B) lcs[(B)+((A)*(blen+1))] /* Start building the LCS table. */ diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 98769e277..6ce12b874 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -158,6 +158,18 @@ tags {"aof"} { assert_match "*not valid*" $result } + test "Short read: Utility should show the abnormal line num in AOF" { + create_aof { + append_to_aof [formatCommand set foo hello] + append_to_aof "!!!" + } + + catch { + exec src/keydb-check-aof $aof_path + } result + assert_match "*ok_up_to_line=8*" $result + } + test "Short read: Utility should be able to fix the AOF" { set result [exec src/keydb-check-aof --fix $aof_path << "y\n"] assert_match "*Successfully truncated AOF*" $result diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl index 8459d2378..eccf6df2d 100644 --- a/tests/integration/psync2.tcl +++ b/tests/integration/psync2.tcl @@ -39,6 +39,7 @@ proc show_cluster_status {} { # all the lists are empty. # # regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} $l - logdate + catch { while 1 { # Find the log with smallest time. set empty 0 @@ -67,6 +68,7 @@ proc show_cluster_status {} { puts "\[$best port $R_port($best)\] [lindex $log($best) 0]" set log($best) [lrange $log($best) 1 end] } + } } } diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index e57c7e1e5..d1db6cc57 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -33,7 +33,8 @@ start_server {tags {"maxmemory"}} { # Get the current memory limit and calculate a new limit. # We just add 100k to the current memory size so that it is # fast for us to reach that limit. - set used [s used_memory] + set overhead [s mem_not_counted_for_evict] + set used [expr [s used_memory] - $overhead] set limit [expr {$used+100*1024}] r config set maxmemory $limit r config set maxmemory-policy $policy @@ -42,7 +43,7 @@ start_server {tags {"maxmemory"}} { while 1 { r setex [randomKey] 10000 x incr numkeys - if {[s used_memory]+4096 > $limit} { + if {[expr {[s used_memory] - $overhead + 4096}] > $limit} { assert {$numkeys > 10} break } @@ -52,7 +53,8 @@ start_server {tags {"maxmemory"}} { for {set j 0} {$j < $numkeys} {incr j} { r setex [randomKey] 10000 x } - assert {[s used_memory] < ($limit+4096)} + set used_amt [expr [s used_memory] - $overhead] + assert {$used_amt < ($limit+4096)} } } @@ -65,7 +67,8 @@ start_server {tags {"maxmemory"}} { # Get the current memory limit and calculate a new limit. # We just add 100k to the current memory size so that it is # fast for us to reach that limit. - set used [s used_memory] + set overhead [s mem_not_counted_for_evict] + set used [expr [s used_memory] - $overhead] set limit [expr {$used+100*1024}] r config set maxmemory $limit r config set maxmemory-policy $policy @@ -74,7 +77,7 @@ start_server {tags {"maxmemory"}} { while 1 { r set [randomKey] x incr numkeys - if {[s used_memory]+4096 > $limit} { + if {[expr [s used_memory] - $overhead]+4096 > $limit} { assert {$numkeys > 10} break } @@ -91,7 +94,7 @@ start_server {tags {"maxmemory"}} { } } if {[string match allkeys-* $policy]} { - assert {[s used_memory] < ($limit+4096)} + assert {[expr [s used_memory] - $overhead] < ($limit+4096)} } else { assert {$err == 1} } @@ -107,7 +110,8 @@ start_server {tags {"maxmemory"}} { # Get the current memory limit and calculate a new limit. # We just add 100k to the current memory size so that it is # fast for us to reach that limit. - set used [s used_memory] + set overhead [s mem_not_counted_for_evict] + set used [expr [s used_memory] - $overhead] set limit [expr {$used+100*1024}] r config set maxmemory $limit r config set maxmemory-policy $policy @@ -121,7 +125,7 @@ start_server {tags {"maxmemory"}} { } else { r set "key:$numkeys" x } - if {[s used_memory]+4096 > $limit} { + if {[expr [s used_memory] - $overhead]+4096 > $limit} { assert {$numkeys > 10} break } @@ -135,7 +139,7 @@ start_server {tags {"maxmemory"}} { catch {r setex "foo:$j" 10000 x} } # We should still be under the limit. - assert {[s used_memory] < ($limit+4096)} + assert {[expr [s used_memory] - $overhead] < ($limit+4096)} # However all our non volatile keys should be here. for {set j 0} {$j < $numkeys} {incr j 2} { assert {[r exists "key:$j"]} @@ -305,7 +309,8 @@ start_server {tags {"maxmemory"} overrides {server-threads 1}} { # we need to make sure to evict keynames of a total size of more than # 16kb since the (PROTO_REPLY_CHUNK_BYTES), only after that the # invalidation messages have a chance to trigger further eviction. - set used [s used_memory] + set overhead [s mem_not_counted_for_evict] + set used [expr [s used_memory] - $overhead] set limit [expr {$used - 40000}] r config set maxmemory $limit diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 4ee6fdbdb..db18e7128 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -395,7 +395,7 @@ start_server {tags {"defrag"} overrides {appendonly yes auto-aof-rewrite-percent # if the current slab is lower in utilization the defragger would have ended up in stagnation, # keept running and not move any allocation. # this test is more consistent on a fresh server with no history - start_server {tags {"defrag"} overrides {save ""}} { + start_server {tags {"defrag"} overrides {save "" server-threads 1}} { r flushdb r config resetstat r config set hz 100 diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 19feee8c3..38b49d45e 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -25,7 +25,7 @@ test {CONFIG SET port number} { test {CONFIG SET bind address} { start_server {} { # non-valid address - catch {r CONFIG SET bind "some.wrong.bind.address"} e + catch {r CONFIG SET bind "999.999.999.999"} e assert_match {*Failed to bind to specified addresses*} $e # make sure server still bound to the previous address @@ -33,4 +33,4 @@ test {CONFIG SET bind address} { $rd PING $rd close } -} \ No newline at end of file +} diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 40f1a2a66..4c75b6f48 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -395,6 +395,17 @@ start_server {tags {"tracking network"}} { assert {[lindex msg 2] eq {} } } + test {Test ASYNC flushall} { + clean_all + r CLIENT TRACKING on REDIRECT $redir_id + r GET key1 + r GET key2 + assert_equal [s 0 tracking_total_keys] 2 + $rd_sg FLUSHALL ASYNC + assert_equal [s 0 tracking_total_keys] 0 + assert_equal [lindex [$rd_redirection read] 2] {} + } + # Keys are defined to be evicted 100 at a time by default. # If after eviction the number of keys still surpasses the limit # defined in tracking-table-max-keys, we increases eviction