Merge branch 'keydbpro' into PRO_RELEASE_6

Former-commit-id: 20d2c4fd62bb1867b8970b2bb282bd2ed64bcf6e
This commit is contained in:
John Sully 2021-03-26 01:50:51 +00:00
commit 8ec77ffbf2
4 changed files with 23 additions and 2 deletions

View File

@ -373,6 +373,7 @@ int dictRehash(dict *d, int n) {
dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
queue.reserve(c_targetQueueSize);
__atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE);
this->rehashIdxBase = d->rehashidx;
}
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
@ -931,12 +932,18 @@ dictEntry *dictGetRandomKey(dict *d)
if (dictSize(d) == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
if (dictIsRehashing(d)) {
long rehashidx = d->rehashidx;
auto async = d->asyncdata;
while (async != nullptr) {
rehashidx = std::min((long)async->rehashIdxBase, rehashidx);
async = async->next;
}
do {
/* We are sure there are no elements in indexes from 0
* to rehashidx-1 */
h = d->rehashidx + (random() % (d->ht[0].size +
h = rehashidx + (random() % (d->ht[0].size +
d->ht[1].size -
d->rehashidx));
rehashidx));
he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] :
d->ht[0].table[h];
} while(he == NULL);

View File

@ -100,6 +100,7 @@ struct dictAsyncRehashCtl {
struct dict *dict = nullptr;
std::vector<workItem> queue;
size_t hashIdx = 0;
long rehashIdxBase;
dictAsyncRehashCtl *next = nullptr;
std::atomic<bool> done { false };
std::atomic<bool> abondon { false };

View File

@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) {
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
void _clientAsyncReplyBufferReserve(client *c, size_t len) {
if (c->replyAsync != nullptr)
return;
size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES);
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
replyNew->used = 0;
c->replyAsync = replyNew;
}
/* Attempts to add the reply to the static buffer in the client struct.
* Returns C_ERR if the buffer is full, or the reply list is not empty,
* in which case the reply must be added to the reply list. */

View File

@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long
sdsfree(szFromObj(&objTtl));
}
void _clientAsyncReplyBufferReserve(client *c, size_t len);
void flushReplBacklogToClients()
{
serverAssert(GlobalLocksAcquired());
@ -4486,6 +4487,8 @@ void flushReplBacklogToClients()
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
if (fAsyncWrite)
_clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx);
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));