diff --git a/src/dict.cpp b/src/dict.cpp index c682e2ec9..831a32f8f 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -373,6 +373,7 @@ int dictRehash(dict *d, int n) { dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { queue.reserve(c_targetQueueSize); __atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE); + this->rehashIdxBase = d->rehashidx; } dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { @@ -931,12 +932,18 @@ dictEntry *dictGetRandomKey(dict *d) if (dictSize(d) == 0) return NULL; if (dictIsRehashing(d)) _dictRehashStep(d); if (dictIsRehashing(d)) { + long rehashidx = d->rehashidx; + auto async = d->asyncdata; + while (async != nullptr) { + rehashidx = std::min((long)async->rehashIdxBase, rehashidx); + async = async->next; + } do { /* We are sure there are no elements in indexes from 0 * to rehashidx-1 */ - h = d->rehashidx + (random() % (d->ht[0].size + + h = rehashidx + (random() % (d->ht[0].size + d->ht[1].size - - d->rehashidx)); + rehashidx)); he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] : d->ht[0].table[h]; } while(he == NULL); diff --git a/src/dict.h b/src/dict.h index ab57a7d7f..f24108d32 100644 --- a/src/dict.h +++ b/src/dict.h @@ -100,6 +100,7 @@ struct dictAsyncRehashCtl { struct dict *dict = nullptr; std::vector queue; size_t hashIdx = 0; + long rehashIdxBase; dictAsyncRehashCtl *next = nullptr; std::atomic done { false }; std::atomic abondon { false }; diff --git a/src/networking.cpp b/src/networking.cpp index 15aa6f43a..0774694ae 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +void _clientAsyncReplyBufferReserve(client *c, size_t len) { + if (c->replyAsync != nullptr) + return; + size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES); + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = 0; + c->replyAsync = replyNew; +} + /* Attempts to add the reply to the static buffer in the client struct. * Returns C_ERR if the buffer is full, or the reply list is not empty, * in which case the reply must be added to the reply list. */ diff --git a/src/replication.cpp b/src/replication.cpp index 9f13dcda6..2533bae52 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long sdsfree(szFromObj(&objTtl)); } +void _clientAsyncReplyBufferReserve(client *c, size_t len); void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); @@ -4486,6 +4487,8 @@ void flushReplBacklogToClients() addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); } else { auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + if (fAsyncWrite) + _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));