From 3d501dfb11e635f041bb34900ab6d0cdddad480a Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Mar 2021 19:58:51 +0000 Subject: [PATCH 1/9] Fix bug where we skip valid dict elements in dictGetRandomKey Former-commit-id: c25a9a3b84c967428b3598c99a65b14ed2417571 --- src/dict.cpp | 11 +++++++++-- src/dict.h | 1 + 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index c682e2ec9..831a32f8f 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -373,6 +373,7 @@ int dictRehash(dict *d, int n) { dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { queue.reserve(c_targetQueueSize); __atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE); + this->rehashIdxBase = d->rehashidx; } dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { @@ -931,12 +932,18 @@ dictEntry *dictGetRandomKey(dict *d) if (dictSize(d) == 0) return NULL; if (dictIsRehashing(d)) _dictRehashStep(d); if (dictIsRehashing(d)) { + long rehashidx = d->rehashidx; + auto async = d->asyncdata; + while (async != nullptr) { + rehashidx = std::min((long)async->rehashIdxBase, rehashidx); + async = async->next; + } do { /* We are sure there are no elements in indexes from 0 * to rehashidx-1 */ - h = d->rehashidx + (random() % (d->ht[0].size + + h = rehashidx + (random() % (d->ht[0].size + d->ht[1].size - - d->rehashidx)); + rehashidx)); he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] : d->ht[0].table[h]; } while(he == NULL); diff --git a/src/dict.h b/src/dict.h index ab57a7d7f..f24108d32 100644 --- a/src/dict.h +++ b/src/dict.h @@ -100,6 +100,7 @@ struct dictAsyncRehashCtl { struct dict *dict = nullptr; std::vector queue; size_t hashIdx = 0; + long rehashIdxBase; dictAsyncRehashCtl *next = nullptr; std::atomic done { false }; std::atomic abondon { false }; From f0f4377822685e324f96d842d825e4050bb0a932 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 28 Mar 2021 18:27:14 +0000 Subject: [PATCH 2/9] Enable LTO Former-commit-id: 3ec75184bae92c0e7af579eda8cbe6cfa2375327 --- src/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index d76255fd4..927782e51 100644 --- a/src/Makefile +++ b/src/Makefile @@ -15,7 +15,7 @@ release_hdr := $(shell sh -c './mkreleasehdr.sh') uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') -OPTIMIZATION?=-O2 +OPTIMIZATION?=-O2 -flto DEPENDENCY_TARGETS=hiredis linenoise lua rocksdb NODEPS:=clean distclean From 1da03185e60e995a307e99d9d9a0cf751f3c441b Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 26 Mar 2021 23:44:42 +0000 Subject: [PATCH 3/9] Eliminate unnecessary lookup in ensure when there is no snapshot Former-commit-id: 1f363ed7c13c186f0c120ab4f3e321144667f50f --- src/db.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index 20b70fe02..524cdb66c 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2581,6 +2581,8 @@ void redisDbPersistentData::updateValue(dict_iter itr, robj *val) void redisDbPersistentData::ensure(const char *key) { + if (m_pdbSnapshot == nullptr && m_spstorage == nullptr) + return; dictEntry *de = dictFind(m_pdict, key); ensure(key, &de); } From 01fb2a99bd21c22921ca960c012a4d8fc889e920 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 26 Mar 2021 23:48:24 +0000 Subject: [PATCH 4/9] Prefetch keys even in pure RAM scenarios Former-commit-id: d7219de186d60a5a437c1828ac97117eaad34819 --- src/db.cpp | 25 ++++++++++++++++++++++++- src/networking.cpp | 4 ++-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 524cdb66c..9978a8ca5 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -35,6 +35,11 @@ #include #include +// Needed for prefetch +#if defined(__x86_64__) || defined(__i386__) +#include +#endif + /* Database backup. */ struct dbBackup { const redisDbPersistentDataSnapshot **dbarray; @@ -3002,8 +3007,26 @@ int dbnumFromDb(redisDb *db) void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command) { - if (m_spstorage == nullptr) + if (m_spstorage == nullptr) { +#if defined(__x86_64__) || defined(__i386__) + // We do a quick 'n dirty check for set & get. Anything else is too slow. + // Should the user do something weird like remap them then the worst that will + // happen is we don't prefetch or we prefetch wrong data. A mild perf hit, but + // not dangerous + const char *cmd = szFromObj(command.argv[0]); + if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) { + auto h = dictSdsHash(szFromObj(command.argv[1])); + for (int iht = 0; iht < 2; ++iht) { + auto hT = h & c->db->m_pdict->ht[iht].sizemask; + if (c->db->m_pdict->ht[iht].table != nullptr) + _mm_prefetch(c->db->m_pdict->ht[iht].table[hT], _MM_HINT_T1); + if (!dictIsRehashing(c->db->m_pdict)) + break; + } + } +#endif return; + } AeLocker lock; diff --git a/src/networking.cpp b/src/networking.cpp index 15aa6f43a..2919750bc 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2358,8 +2358,8 @@ void parseClientCommandBuffer(client *c) { serverAssert(c->vecqueuedcmd.back().reploff >= 0); } - /* Prefetch if we have a storage provider and we're not in the global lock */ - if (cqueries < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) { + /* Prefetch outside the lock for better perf */ + if (cqueries < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) { auto &query = c->vecqueuedcmd.back(); if (query.argc > 0 && query.argc == query.argcMax) { c->db->prefetchKeysAsync(c, query); From 15d9f45b4e4fac01f0e19267a3f0a4e6d9be07a1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 28 Mar 2021 17:58:43 +0000 Subject: [PATCH 5/9] Fix thread safety issues with the cache prefetch logic Former-commit-id: 4892122fc02109d98684a350bd732a0b08a8c7b4 --- src/db.cpp | 22 +++++++++++++--------- src/dict.cpp | 8 +++++--- src/redis-benchmark.cpp | 4 ++++ src/redis-cli.c | 5 +++++ src/server.cpp | 9 +++++++++ src/server.h | 20 ++++++++++++++++++++ 6 files changed, 56 insertions(+), 12 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 9978a8ca5..59eb08a68 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3013,15 +3013,19 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command // Should the user do something weird like remap them then the worst that will // happen is we don't prefetch or we prefetch wrong data. A mild perf hit, but // not dangerous - const char *cmd = szFromObj(command.argv[0]); - if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) { - auto h = dictSdsHash(szFromObj(command.argv[1])); - for (int iht = 0; iht < 2; ++iht) { - auto hT = h & c->db->m_pdict->ht[iht].sizemask; - if (c->db->m_pdict->ht[iht].table != nullptr) - _mm_prefetch(c->db->m_pdict->ht[iht].table[hT], _MM_HINT_T1); - if (!dictIsRehashing(c->db->m_pdict)) - break; + if (command.argc >= 2) { + const char *cmd = szFromObj(command.argv[0]); + if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) { + auto h = dictSdsHash(szFromObj(command.argv[1])); + for (int iht = 0; iht < 2; ++iht) { + auto hT = h & c->db->m_pdict->ht[iht].sizemask; + dictEntry **table; + __atomic_load(&c->db->m_pdict->ht[iht].table, &table, __ATOMIC_RELAXED); + if (table != nullptr) + _mm_prefetch(table[hT], _MM_HINT_T2); + if (!dictIsRehashing(c->db->m_pdict)) + break; + } } } #endif diff --git a/src/dict.cpp b/src/dict.cpp index 831a32f8f..4b9f0f6d1 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -74,6 +74,8 @@ static int _dictInit(dict *ht, dictType *type, void *privDataPtr); static uint8_t dict_hash_function_seed[16]; +extern "C" void asyncFreeDictTable(dictEntry **de); + void dictSetHashFunctionSeed(uint8_t *seed) { memcpy(dict_hash_function_seed,seed,sizeof(dict_hash_function_seed)); } @@ -359,7 +361,7 @@ int dictRehash(dict *d, int n) { /* Check if we already rehashed the whole table... */ if (d->ht[0].used == 0) { - zfree(d->ht[0].table); + asyncFreeDictTable(d->ht[0].table); d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; @@ -487,7 +489,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { /* Check if we already rehashed the whole table... */ if (d->ht[0].used == 0 && d->asyncdata == nullptr) { - zfree(d->ht[0].table); + asyncFreeDictTable(d->ht[0].table); d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; @@ -762,7 +764,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { } } /* Free the table and the allocated cache structure */ - zfree(ht->table); + asyncFreeDictTable(ht->table); /* Re-initialize the table */ _dictReset(ht); return DICT_OK; /* never fails */ diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 41449099d..3b130679d 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -813,6 +813,10 @@ static int ipow(int base, int exp) { return result; } +extern "C" void asyncFreeDictTable(dictEntry **de) { + zfree(de); +} + static void showLatencyReport(void) { int i, curlat = 0; int usbetweenlat = ipow(10, MAX_LATENCY_PRECISION-config.precision); diff --git a/src/redis-cli.c b/src/redis-cli.c index c555a711e..1bfe4c8a2 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -144,6 +144,11 @@ static void cliRefreshPrompt(void) { sdsfree(prompt); } +struct dictEntry; +void asyncFreeDictTable(struct dictEntry **de) { + zfree(de); +} + /* Return the name of the dotfile for the specified 'dotfilename'. * Normally it just concatenates user $HOME to the file specified * in 'dotfilename'. However if the environment variable 'envoverride' diff --git a/src/server.cpp b/src/server.cpp index 96d62e4ea..e4eba3453 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2479,6 +2479,15 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData return 1000/g_pserver->hz; } +extern "C" void asyncFreeDictTable(dictEntry **de) +{ + if (de == nullptr || serverTL == nullptr || serverTL->gcEpoch.isReset()) { + zfree(de); + } else { + g_pserver->garbageCollector.enqueueCPtr(serverTL->gcEpoch, de); + } +} + extern int ProcessingEventsWhileBlocked; void processClients(); diff --git a/src/server.h b/src/server.h index 07f025c46..3612d93f0 100644 --- a/src/server.h +++ b/src/server.h @@ -1786,6 +1786,19 @@ class GarbageCollectorCollection GarbageCollector garbageCollectorSnapshot; GarbageCollector garbageCollectorGeneric; + class CPtrCollectable : public ICollectable + { + void *m_pv; + + public: + CPtrCollectable(void *pv) + : m_pv(pv) + {} + ~CPtrCollectable() { + zfree(m_pv); + } + }; + public: struct Epoch { @@ -1831,6 +1844,13 @@ public: { garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp)); } + + template + void enqueueCPtr(Epoch e, T p) + { + auto sp = std::make_unique(reinterpret_cast(p)); + enqueue(e, std::move(sp)); + } }; // Per-thread variabels that may be accessed without a lock From 2e3fd28dcb47b59a5075af060eb814f94450de27 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 28 Mar 2021 17:59:02 +0000 Subject: [PATCH 6/9] Excessive rehashing adds latency Former-commit-id: ee5a4528d61420a18f89a07f4ac63e2181a19738 --- src/dict.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dict.cpp b/src/dict.cpp index 4b9f0f6d1..9acc01dc5 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -546,7 +546,7 @@ int dictRehashMilliseconds(dict *d, int ms) { static void _dictRehashStep(dict *d) { unsigned iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); - if (iterators == 0) dictRehash(d,2); + if (iterators == 0) dictRehash(d,1); } /* Add an element to the target hash table */ From 84e07e5d24a55238e84f79cf205275ea178c38b5 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 28 Mar 2021 18:27:00 +0000 Subject: [PATCH 7/9] Make some asserts debug only for perf Former-commit-id: dc66209f2cf8eadb794dad302bd1ea92890e75b0 --- src/networking.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 2919750bc..ff7f83d06 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1662,7 +1662,7 @@ int writeToClient(client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; clientReplyBlock *o; - AssertCorrectThread(c); + serverAssertDebug(FCorrectThread(c)); std::unique_locklock)> lock(c->lock); @@ -1881,7 +1881,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { processed += (int)vec.size(); for (client *c : vec) { - AssertCorrectThread(c); + serverAssertDebug(FCorrectThread(c)); uint64_t flags = c->flags.fetch_and(~CLIENT_PENDING_WRITE, std::memory_order_relaxed); @@ -2428,8 +2428,8 @@ void readQueryFromClient(connection *conn) { int nread, readlen; size_t qblen; - serverAssert(FCorrectThread(c)); - serverAssert(!GlobalLocksAcquired()); + serverAssertDebug(FCorrectThread(c) sdfsdf); + serverAssertDebug(!GlobalLocksAcquired()); AeLocker aelock; AssertCorrectThread(c); From a0ea81d682039a06d157606c52d279a62f602c20 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 25 Mar 2021 23:14:48 +0000 Subject: [PATCH 8/9] Prevent unnecessary copies in replication scenarios Former-commit-id: b152a9bd88c081ce98eebe9a7af49649e60e5523 --- src/networking.cpp | 10 ++++++++++ src/replication.cpp | 3 +++ 2 files changed, 13 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index ff7f83d06..e2819ff19 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +void _clientAsyncReplyBufferReserve(client *c, size_t len) { + if (c->replyAsync != nullptr) + return; + size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES); + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = 0; + c->replyAsync = replyNew; +} + /* Attempts to add the reply to the static buffer in the client struct. * Returns C_ERR if the buffer is full, or the reply list is not empty, * in which case the reply must be added to the reply list. */ diff --git a/src/replication.cpp b/src/replication.cpp index 0589868cb..e78df9a62 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long sdsfree(szFromObj(&objTtl)); } +void _clientAsyncReplyBufferReserve(client *c, size_t len); void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); @@ -4486,6 +4487,8 @@ void flushReplBacklogToClients() addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); } else { auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + if (fAsyncWrite) + _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart)); From 98f27709c652a35bd357fd9ed14db57662de893a Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 30 Mar 2021 20:44:22 +0000 Subject: [PATCH 9/9] Fix crash in RDB save Former-commit-id: b032809b3e978fe571b791179d32ecdc9c067045 --- src/gc.h | 6 +++--- src/server.h | 31 +++++++++++++++++++++++++++---- tests/unit/memefficiency.tcl | 7 ++++--- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/gc.h b/src/gc.h index 4715bc8de..5d92e38cb 100644 --- a/src/gc.h +++ b/src/gc.h @@ -52,7 +52,7 @@ public: void endEpoch(uint64_t epoch, bool fNoFree = false) { std::unique_lock lock(m_lock); - assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); + serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); bool fMinElement = *std::min_element(m_setepochOutstanding.begin(), m_setepochOutstanding.end()); m_setepochOutstanding.erase(epoch); if (fNoFree) @@ -91,8 +91,8 @@ public: void enqueue(uint64_t epoch, std::unique_ptr &&sp) { std::unique_lock lock(m_lock); - assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); - assert(sp->FWillFreeChildDebug() == false); + serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); + serverAssert(sp->FWillFreeChildDebug() == false); auto itr = std::find(m_vecepochs.begin(), m_vecepochs.end(), m_epochNext+1); if (itr == m_vecepochs.end()) diff --git a/src/server.h b/src/server.h index 3612d93f0..1fbb77c60 100644 --- a/src/server.h +++ b/src/server.h @@ -1794,7 +1794,13 @@ class GarbageCollectorCollection CPtrCollectable(void *pv) : m_pv(pv) {} - ~CPtrCollectable() { + + CPtrCollectable(CPtrCollectable &&move) { + m_pv = move.m_pv; + move.m_pv = nullptr; + } + + virtual ~CPtrCollectable() { zfree(m_pv); } }; @@ -1810,6 +1816,20 @@ public: epochGeneric = 0; } + Epoch() = default; + + Epoch (const Epoch &other) { + epochSnapshot = other.epochSnapshot; + epochGeneric = other.epochGeneric; + } + + Epoch &operator=(const Epoch &other) { + serverAssert(isReset()); + epochSnapshot = other.epochSnapshot; + epochGeneric = other.epochGeneric; + return *this; + } + bool isReset() const { return epochSnapshot == 0 && epochGeneric == 0; } @@ -1823,10 +1843,13 @@ public: return e; } - void endEpoch(Epoch e, bool fNoFree = false) + void endEpoch(Epoch &e, bool fNoFree = false) { - garbageCollectorSnapshot.endEpoch(e.epochSnapshot, fNoFree); - garbageCollectorGeneric.endEpoch(e.epochGeneric, fNoFree); + auto epochSnapshot = e.epochSnapshot; + auto epochGeneric = e.epochGeneric; + e.reset(); // We must do this early as GC'd dtors can themselves try to enqueue more data + garbageCollectorSnapshot.endEpoch(epochSnapshot, fNoFree); + garbageCollectorGeneric.endEpoch(epochGeneric, fNoFree); } void shutdown() diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index c0a6ec4d7..5bf69787b 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -154,7 +154,7 @@ start_server {tags {"defrag"} overrides {server-threads 1} } { $rd read ; # Discard replies } - set expected_frag 1.7 + set expected_frag 1.5 if {$::accurate} { # scale the hash to 1m fields in order to have a measurable the latency for {set j 10000} {$j < 1000000} {incr j} { @@ -265,7 +265,7 @@ start_server {tags {"defrag"} overrides {server-threads 1} } { # create big keys with 10k items set rd [redis_deferring_client] - set expected_frag 1.7 + set expected_frag 1.5 # add a mass of list nodes to two lists (allocations are interlaced) set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation set elements 500000 @@ -543,4 +543,5 @@ start_server {tags {"defrag"} overrides {server-threads 1 active-replica yes} } } {OK} } } -} ;# run solo \ No newline at end of file +} ;# run solo +