From 131a8c9e35c8aa7a0e0a579f85e0ea3e1ae93c57 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Mar 2021 19:58:51 +0000 Subject: [PATCH 1/2] Fix bug where we skip valid dict elements in dictGetRandomKey Former-commit-id: 626b56b00824573660af0c47b210fd1e8d2cfeb2 --- src/dict.cpp | 11 +++++++++-- src/dict.h | 1 + 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index c682e2ec9..831a32f8f 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -373,6 +373,7 @@ int dictRehash(dict *d, int n) { dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { queue.reserve(c_targetQueueSize); __atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE); + this->rehashIdxBase = d->rehashidx; } dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { @@ -931,12 +932,18 @@ dictEntry *dictGetRandomKey(dict *d) if (dictSize(d) == 0) return NULL; if (dictIsRehashing(d)) _dictRehashStep(d); if (dictIsRehashing(d)) { + long rehashidx = d->rehashidx; + auto async = d->asyncdata; + while (async != nullptr) { + rehashidx = std::min((long)async->rehashIdxBase, rehashidx); + async = async->next; + } do { /* We are sure there are no elements in indexes from 0 * to rehashidx-1 */ - h = d->rehashidx + (random() % (d->ht[0].size + + h = rehashidx + (random() % (d->ht[0].size + d->ht[1].size - - d->rehashidx)); + rehashidx)); he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] : d->ht[0].table[h]; } while(he == NULL); diff --git a/src/dict.h b/src/dict.h index ab57a7d7f..f24108d32 100644 --- a/src/dict.h +++ b/src/dict.h @@ -100,6 +100,7 @@ struct dictAsyncRehashCtl { struct dict *dict = nullptr; std::vector queue; size_t hashIdx = 0; + long rehashIdxBase; dictAsyncRehashCtl *next = nullptr; std::atomic done { false }; std::atomic abondon { false }; From dbe8a10fbf8e1dde4bc1e62441f492be636822b3 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 25 Mar 2021 23:14:48 +0000 Subject: [PATCH 2/2] Prevent unnecessary copies in replication scenarios Former-commit-id: 84c671cdafae7e84beef722fcc8a90b92918c89d --- src/networking.cpp | 10 ++++++++++ src/replication.cpp | 3 +++ 2 files changed, 13 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index 15aa6f43a..0774694ae 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +void _clientAsyncReplyBufferReserve(client *c, size_t len) { + if (c->replyAsync != nullptr) + return; + size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES); + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = 0; + c->replyAsync = replyNew; +} + /* Attempts to add the reply to the static buffer in the client struct. * Returns C_ERR if the buffer is full, or the reply list is not empty, * in which case the reply must be added to the reply list. */ diff --git a/src/replication.cpp b/src/replication.cpp index 9f13dcda6..2533bae52 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long sdsfree(szFromObj(&objTtl)); } +void _clientAsyncReplyBufferReserve(client *c, size_t len); void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); @@ -4486,6 +4487,8 @@ void flushReplBacklogToClients() addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); } else { auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + if (fAsyncWrite) + _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));