diff --git a/src/Makefile b/src/Makefile index d76255fd4..927782e51 100644 --- a/src/Makefile +++ b/src/Makefile @@ -15,7 +15,7 @@ release_hdr := $(shell sh -c './mkreleasehdr.sh') uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') -OPTIMIZATION?=-O2 +OPTIMIZATION?=-O2 -flto DEPENDENCY_TARGETS=hiredis linenoise lua rocksdb NODEPS:=clean distclean diff --git a/src/db.cpp b/src/db.cpp index 70fd2d1dc..dfbcf5a25 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -35,6 +35,11 @@ #include #include +// Needed for prefetch +#if defined(__x86_64__) || defined(__i386__) +#include +#endif + /* Database backup. */ struct dbBackup { const redisDbPersistentDataSnapshot **dbarray; @@ -2581,6 +2586,8 @@ void redisDbPersistentData::updateValue(dict_iter itr, robj *val) void redisDbPersistentData::ensure(const char *key) { + if (m_pdbSnapshot == nullptr && m_spstorage == nullptr) + return; dictEntry *de = dictFind(m_pdict, key); ensure(key, &de); } @@ -3000,8 +3007,30 @@ int dbnumFromDb(redisDb *db) bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command, bool fExecOK) { - if (m_spstorage == nullptr) - return false; + if (m_spstorage == nullptr) { +#if defined(__x86_64__) || defined(__i386__) + // We do a quick 'n dirty check for set & get. Anything else is too slow. + // Should the user do something weird like remap them then the worst that will + // happen is we don't prefetch or we prefetch wrong data. A mild perf hit, but + // not dangerous + if (command.argc >= 2) { + const char *cmd = szFromObj(command.argv[0]); + if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) { + auto h = dictSdsHash(szFromObj(command.argv[1])); + for (int iht = 0; iht < 2; ++iht) { + auto hT = h & c->db->m_pdict->ht[iht].sizemask; + dictEntry **table; + __atomic_load(&c->db->m_pdict->ht[iht].table, &table, __ATOMIC_RELAXED); + if (table != nullptr) + _mm_prefetch(table[hT], _MM_HINT_T2); + if (!dictIsRehashing(c->db->m_pdict)) + break; + } + } + } +#endif + return; + } AeLocker lock; diff --git a/src/dict.cpp b/src/dict.cpp index 831a32f8f..9acc01dc5 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -74,6 +74,8 @@ static int _dictInit(dict *ht, dictType *type, void *privDataPtr); static uint8_t dict_hash_function_seed[16]; +extern "C" void asyncFreeDictTable(dictEntry **de); + void dictSetHashFunctionSeed(uint8_t *seed) { memcpy(dict_hash_function_seed,seed,sizeof(dict_hash_function_seed)); } @@ -359,7 +361,7 @@ int dictRehash(dict *d, int n) { /* Check if we already rehashed the whole table... */ if (d->ht[0].used == 0) { - zfree(d->ht[0].table); + asyncFreeDictTable(d->ht[0].table); d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; @@ -487,7 +489,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { /* Check if we already rehashed the whole table... */ if (d->ht[0].used == 0 && d->asyncdata == nullptr) { - zfree(d->ht[0].table); + asyncFreeDictTable(d->ht[0].table); d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; @@ -544,7 +546,7 @@ int dictRehashMilliseconds(dict *d, int ms) { static void _dictRehashStep(dict *d) { unsigned iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); - if (iterators == 0) dictRehash(d,2); + if (iterators == 0) dictRehash(d,1); } /* Add an element to the target hash table */ @@ -762,7 +764,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { } } /* Free the table and the allocated cache structure */ - zfree(ht->table); + asyncFreeDictTable(ht->table); /* Re-initialize the table */ _dictReset(ht); return DICT_OK; /* never fails */ diff --git a/src/gc.h b/src/gc.h index 4715bc8de..5d92e38cb 100644 --- a/src/gc.h +++ b/src/gc.h @@ -52,7 +52,7 @@ public: void endEpoch(uint64_t epoch, bool fNoFree = false) { std::unique_lock lock(m_lock); - assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); + serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); bool fMinElement = *std::min_element(m_setepochOutstanding.begin(), m_setepochOutstanding.end()); m_setepochOutstanding.erase(epoch); if (fNoFree) @@ -91,8 +91,8 @@ public: void enqueue(uint64_t epoch, std::unique_ptr &&sp) { std::unique_lock lock(m_lock); - assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); - assert(sp->FWillFreeChildDebug() == false); + serverAssert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); + serverAssert(sp->FWillFreeChildDebug() == false); auto itr = std::find(m_vecepochs.begin(), m_vecepochs.end(), m_epochNext+1); if (itr == m_vecepochs.end()) diff --git a/src/networking.cpp b/src/networking.cpp index e9959c263..fce10c52d 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +void _clientAsyncReplyBufferReserve(client *c, size_t len) { + if (c->replyAsync != nullptr) + return; + size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES); + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = 0; + c->replyAsync = replyNew; +} + /* Attempts to add the reply to the static buffer in the client struct. * Returns C_ERR if the buffer is full, or the reply list is not empty, * in which case the reply must be added to the reply list. */ @@ -1662,7 +1672,7 @@ int writeToClient(client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; clientReplyBlock *o; - AssertCorrectThread(c); + serverAssertDebug(FCorrectThread(c)); std::unique_locklock)> lock(c->lock); @@ -1881,7 +1891,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { processed += (int)vec.size(); for (client *c : vec) { - AssertCorrectThread(c); + serverAssertDebug(FCorrectThread(c)); uint64_t flags = c->flags.fetch_and(~CLIENT_PENDING_WRITE, std::memory_order_relaxed); @@ -2358,8 +2368,8 @@ void parseClientCommandBuffer(client *c) { serverAssert(c->vecqueuedcmd.back().reploff >= 0); } - /* Prefetch if we have a storage provider and we're not in the global lock */ - if (cqueriesStart < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) { + /* Prefetch outside the lock for better perf */ + if (cqueries < c->vecqueuedcmd.size() && !GlobalLocksAcquired()) { auto &query = c->vecqueuedcmd.back(); if (query.argc > 0 && query.argc == query.argcMax) { if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) { @@ -2430,8 +2440,8 @@ void readQueryFromClient(connection *conn) { int nread, readlen; size_t qblen; - serverAssert(FCorrectThread(c)); - serverAssert(!GlobalLocksAcquired()); + serverAssertDebug(FCorrectThread(c) sdfsdf); + serverAssertDebug(!GlobalLocksAcquired()); AeLocker aelock; AssertCorrectThread(c); diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 41449099d..3b130679d 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -813,6 +813,10 @@ static int ipow(int base, int exp) { return result; } +extern "C" void asyncFreeDictTable(dictEntry **de) { + zfree(de); +} + static void showLatencyReport(void) { int i, curlat = 0; int usbetweenlat = ipow(10, MAX_LATENCY_PRECISION-config.precision); diff --git a/src/redis-cli.c b/src/redis-cli.c index c555a711e..1bfe4c8a2 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -144,6 +144,11 @@ static void cliRefreshPrompt(void) { sdsfree(prompt); } +struct dictEntry; +void asyncFreeDictTable(struct dictEntry **de) { + zfree(de); +} + /* Return the name of the dotfile for the specified 'dotfilename'. * Normally it just concatenates user $HOME to the file specified * in 'dotfilename'. However if the environment variable 'envoverride' diff --git a/src/replication.cpp b/src/replication.cpp index 0589868cb..e78df9a62 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long sdsfree(szFromObj(&objTtl)); } +void _clientAsyncReplyBufferReserve(client *c, size_t len); void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); @@ -4486,6 +4487,8 @@ void flushReplBacklogToClients() addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); } else { auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + if (fAsyncWrite) + _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart)); diff --git a/src/server.cpp b/src/server.cpp index 814909bf6..97e050bc1 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2485,6 +2485,15 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData return 1000/g_pserver->hz; } +extern "C" void asyncFreeDictTable(dictEntry **de) +{ + if (de == nullptr || serverTL == nullptr || serverTL->gcEpoch.isReset()) { + zfree(de); + } else { + g_pserver->garbageCollector.enqueueCPtr(serverTL->gcEpoch, de); + } +} + extern int ProcessingEventsWhileBlocked; void processClients(); diff --git a/src/server.h b/src/server.h index 9ffccd6a0..6c3ec9869 100644 --- a/src/server.h +++ b/src/server.h @@ -1786,6 +1786,25 @@ class GarbageCollectorCollection GarbageCollector garbageCollectorSnapshot; GarbageCollector garbageCollectorGeneric; + class CPtrCollectable : public ICollectable + { + void *m_pv; + + public: + CPtrCollectable(void *pv) + : m_pv(pv) + {} + + CPtrCollectable(CPtrCollectable &&move) { + m_pv = move.m_pv; + move.m_pv = nullptr; + } + + virtual ~CPtrCollectable() { + zfree(m_pv); + } + }; + public: struct Epoch { @@ -1797,6 +1816,20 @@ public: epochGeneric = 0; } + Epoch() = default; + + Epoch (const Epoch &other) { + epochSnapshot = other.epochSnapshot; + epochGeneric = other.epochGeneric; + } + + Epoch &operator=(const Epoch &other) { + serverAssert(isReset()); + epochSnapshot = other.epochSnapshot; + epochGeneric = other.epochGeneric; + return *this; + } + bool isReset() const { return epochSnapshot == 0 && epochGeneric == 0; } @@ -1810,10 +1843,13 @@ public: return e; } - void endEpoch(Epoch e, bool fNoFree = false) + void endEpoch(Epoch &e, bool fNoFree = false) { - garbageCollectorSnapshot.endEpoch(e.epochSnapshot, fNoFree); - garbageCollectorGeneric.endEpoch(e.epochGeneric, fNoFree); + auto epochSnapshot = e.epochSnapshot; + auto epochGeneric = e.epochGeneric; + e.reset(); // We must do this early as GC'd dtors can themselves try to enqueue more data + garbageCollectorSnapshot.endEpoch(epochSnapshot, fNoFree); + garbageCollectorGeneric.endEpoch(epochGeneric, fNoFree); } void shutdown() @@ -1831,6 +1867,13 @@ public: { garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp)); } + + template + void enqueueCPtr(Epoch e, T p) + { + auto sp = std::make_unique(reinterpret_cast(p)); + enqueue(e, std::move(sp)); + } }; // Per-thread variabels that may be accessed without a lock diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index c0a6ec4d7..5bf69787b 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -154,7 +154,7 @@ start_server {tags {"defrag"} overrides {server-threads 1} } { $rd read ; # Discard replies } - set expected_frag 1.7 + set expected_frag 1.5 if {$::accurate} { # scale the hash to 1m fields in order to have a measurable the latency for {set j 10000} {$j < 1000000} {incr j} { @@ -265,7 +265,7 @@ start_server {tags {"defrag"} overrides {server-threads 1} } { # create big keys with 10k items set rd [redis_deferring_client] - set expected_frag 1.7 + set expected_frag 1.5 # add a mass of list nodes to two lists (allocations are interlaced) set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation set elements 500000 @@ -543,4 +543,5 @@ start_server {tags {"defrag"} overrides {server-threads 1 active-replica yes} } } {OK} } } -} ;# run solo \ No newline at end of file +} ;# run solo +