From e68f9cfea1e533f64ace208ab96db3e47319f50a Mon Sep 17 00:00:00 2001 From: vivek Date: Fri, 25 Jun 2021 03:10:56 +0000 Subject: [PATCH 1/4] Primitive implementation of bypassing client buffer, stats are all messed up and print statements everywhere Former-commit-id: 59b2ae8ff451f8a5ac2f3baf3c7b509f6872895e --- src/evict.cpp | 10 ++- src/networking.cpp | 115 +++++++++++++++++++++++------ src/replication.cpp | 155 +++++++++++++++++++++++++-------------- src/server.cpp | 2 +- src/server.h | 17 +++++ tests/unit/maxmemory.tcl | 25 ++++--- 6 files changed, 231 insertions(+), 93 deletions(-) diff --git a/src/evict.cpp b/src/evict.cpp index 009713d73..d336bc8b8 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -354,6 +354,8 @@ unsigned long LFUDecrAndReturn(robj_roptr o) { return counter; } +unsigned long getClientReplicationBacklogSharedUsage(client *c); + /* We don't want to count AOF buffers and slaves output buffers as * used memory: the eviction should use mostly data size. This function * returns the sum of AOF and slaves buffer. */ @@ -370,9 +372,15 @@ size_t freeMemoryGetNotCountedMemory(void) { while((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); std::unique_lock(replica->lock); - overhead += getClientOutputBufferMemoryUsage(replica); + /* we don't wish to multiple count the replication backlog shared usage */ + overhead += (getClientOutputBufferMemoryUsage(replica) - getClientReplicationBacklogSharedUsage(replica)); } } + + /* also don't count the replication backlog memory + * that's where the replication clients get their memory from */ + overhead += g_pserver->repl_backlog_size; + if (g_pserver->aof_state != AOF_OFF) { overhead += sdsalloc(g_pserver->aof_buf)+aofRewriteBufferSize(); } diff --git a/src/networking.cpp b/src/networking.cpp index 1c0644ec0..767fe9c2b 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -136,6 +136,7 @@ client *createClient(connection *conn, int iel) { client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; c->id = client_id; + sprintf(c->lock.szName, "client %lu", client_id); c->resp = 2; c->conn = conn; c->name = NULL; @@ -157,6 +158,7 @@ client *createClient(connection *conn, int iel) { c->flags = 0; c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWriteHandler = FALSE; + c->fPendingReplicaWrite = FALSE; c->ctime = c->lastinteraction = g_pserver->unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ @@ -234,6 +236,7 @@ void clientInstallWriteHandler(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the replica can actually receive * writes at this stage. */ + if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) @@ -315,7 +318,7 @@ int prepareClientToWrite(client *c) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ - if (!fAsync && !clientHasPendingReplies(c)) clientInstallWriteHandler(c); + if (!fAsync && !clientHasPendingReplies(c) && !c->fPendingReplicaWrite) clientInstallWriteHandler(c); if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1762,6 +1765,8 @@ client *lookupClientByID(uint64_t id) { return (c == raxNotFound) ? NULL : c; } +long long getReplIndexFromOffset(long long offset); + /* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed because of some * error. If handler_installed is set, it will attempt to clear the @@ -1779,8 +1784,9 @@ int writeToClient(client *c, int handler_installed) { serverAssertDebug(FCorrectThread(c)); std::unique_locklock)> lock(c->lock); - + while(clientHasPendingReplies(c)) { + serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); if (c->bufpos > 0) { nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; @@ -1788,7 +1794,7 @@ int writeToClient(client *c, int handler_installed) { totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ + * the remainder of the reply. */ if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; @@ -1834,7 +1840,49 @@ int writeToClient(client *c, int handler_installed) { zmalloc_used_memory() < g_pserver->maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } - + + /* We can only directly read from the replication backlog if the client + is a replica, so only attempt to do so if that's the case. */ + if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { + + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); + long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); + serverAssert(c->repl_curr_off != -1); + + if (c->repl_curr_off != c->repl_end_off){ + long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); + long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog + * in the event of a wrap around write */ + /* normal case with no wrap around */ + if (repl_end_idx >= repl_curr_idx){ + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); + /* wrap around case */ + } else { + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); + /* only attempt wrapping if we write the correct number of bytes */ + if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ + nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); + if (nwritten2ndStage != -1) + nwritten += nwritten2ndStage; + } + } + + /* only increment bytes if an error didn't occur */ + if (nwritten > 0){ + totwritten += nwritten; + c->repl_curr_off += nwritten; + serverAssert(c->repl_curr_off <= c->repl_end_off); + /* If the client's current offset matches the last offset it can read from, there is no pending write */ + if (c->repl_curr_off == c->repl_end_off){ + c->fPendingReplicaWrite = false; + } + } + + /* If the second part of a write didn't go through, we still need to register that */ + if (nwritten2ndStage == -1) nwritten = -1; + } + } + g_pserver->stat_net_output_bytes += totwritten; if (nwritten == -1) { if (connGetState(c->conn) != CONN_STATE_CONNECTED) { @@ -1852,7 +1900,7 @@ int writeToClient(client *c, int handler_installed) { * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime; } - if (!clientHasPendingReplies(c)) { + if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) { c->sentlen = 0; if (handler_installed) connSetWriteHandler(c->conn, NULL); @@ -1896,27 +1944,37 @@ void ProcessPendingAsyncWrites() serverAssert(c->fPendingAsyncWrite); if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) { - zfree(c->replyAsync); - c->replyAsync = nullptr; + if (c->replyAsync != nullptr){ + zfree(c->replyAsync); + c->replyAsync = nullptr; + } c->fPendingAsyncWrite = FALSE; continue; } - int size = c->replyAsync->used; + /* since writes from master to replica can come directly from the replication backlog, + * writes may have been signalled without having been copied to the replyAsync buffer, + * thus causing the buffer to be NULL */ + if (c->replyAsync != nullptr){ + int size = c->replyAsync->used; - if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) { - memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size); - c->bufpos += size; - } else { - c->reply_bytes += c->replyAsync->size; - listAddNodeTail(c->reply, c->replyAsync); + if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) { + memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size); + c->bufpos += size; + } else { + c->reply_bytes += c->replyAsync->size; + listAddNodeTail(c->reply, c->replyAsync); + c->replyAsync = nullptr; + } + + zfree(c->replyAsync); c->replyAsync = nullptr; + } else { + /* Only replicas should have empty async reply buffers */ + serverAssert(c->flags & CLIENT_SLAVE); } - zfree(c->replyAsync); - c->replyAsync = nullptr; c->fPendingAsyncWrite = FALSE; - // Now install the write event handler int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; /* For the fsync=always policy, we want that a given FD is never @@ -1929,17 +1987,17 @@ void ProcessPendingAsyncWrites() { ae_flags |= AE_BARRIER; } - + if (!((c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) continue; - + asyncCloseClientOnOutputBufferLimitReached(c); if (c->flags & CLIENT_CLOSE_ASAP) continue; // we will never write this so don't post an op - + std::atomic_thread_fence(std::memory_order_seq_cst); - + if (FCorrectThread(c)) { prepareClientToWrite(c); // queue an event @@ -2022,9 +2080,10 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c)) { - if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) + if (clientHasPendingReplies(c) || c->fPendingReplicaWrite) { + if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) { freeClientAsync(c); + } } } @@ -3643,6 +3702,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } } +/* In the case of a replica client, writes to said replica are using data from the replication backlog + * as opposed to it's own internal buffer, this number should keep track of that */ +unsigned long getClientReplicationBacklogSharedUsage(client *c) { + return (!(c->flags & CLIENT_SLAVE) || !c->fPendingReplicaWrite ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; +} + /* This function returns the number of bytes that Redis is * using to store the reply still not read by the client. * @@ -3651,9 +3716,11 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * enforcing the client output length limits. */ unsigned long getClientOutputBufferMemoryUsage(client *c) { unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); - return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0); + return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0) + getClientReplicationBacklogSharedUsage(c); } + + /* Get the class of a client, used in order to enforce limits to different * classes of clients. * diff --git a/src/replication.cpp b/src/replication.cpp index 748c50c7d..b9465680e 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -189,6 +189,7 @@ void createReplicationBacklog(void) { g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); g_pserver->repl_backlog_histlen = 0; g_pserver->repl_backlog_idx = 0; + g_pserver->repl_backlog_start = g_pserver->master_repl_offset; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the @@ -200,6 +201,15 @@ void createReplicationBacklog(void) { g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } +/* Compute the corresponding index from a replication backlog offset + * Since this computation needs the size of the replication backlog, + * you need to have the repl_backlog_lock in order to call it */ +long long getReplIndexFromOffset(long long offset){ + serverAssert(g_pserver->repl_backlog_lock.fOwnLock()); + long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size; + return index; +} + /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the * g_pserver->repl_backlog_size and to resize the buffer and setup it so that @@ -211,6 +221,8 @@ void resizeReplicationBacklog(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); + if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. @@ -218,19 +230,23 @@ void resizeReplicationBacklog(long long newsize) { * worse often we need to alloc additional space before freeing the * old buffer. */ - if (g_pserver->repl_batch_idxStart >= 0) { - // We need to keep critical data so we can't shrink less than the hot data in the buffer - newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart); - char *backlog = (char*)zmalloc(newsize); - g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart; + /* get the critical client size, i.e. the size of the data unflushed to clients */ + long long earliest_off = g_pserver->repl_lowest_off.load(); - if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { - auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; - memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog); + if (earliest_off != -1) { + // We need to keep critical data so we can't shrink less than the hot data in the buffer + newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off); + char *backlog = (char*)zmalloc(newsize); + g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off; + long long earliest_idx = getReplIndexFromOffset(earliest_off); + + if (g_pserver->repl_backlog_idx >= earliest_idx) { + auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx; + memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog); serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); } else { - auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; - memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); + auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx; + memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbPhase1); memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); @@ -238,7 +254,10 @@ void resizeReplicationBacklog(long long newsize) { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = backlog; g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; - g_pserver->repl_batch_idxStart = 0; + g_pserver->repl_batch_idxStart -= earliest_idx; + if (g_pserver->repl_batch_idxStart < 0) + g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size; + g_pserver->repl_backlog_start = earliest_off; } else { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = (char*)zmalloc(newsize); @@ -246,11 +265,13 @@ void resizeReplicationBacklog(long long newsize) { g_pserver->repl_backlog_idx = 0; /* Next byte we have is... the next since the buffer is empty. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + g_pserver->repl_backlog_start = g_pserver->master_repl_offset; } } g_pserver->repl_backlog_size = newsize; } + void freeReplicationBacklog(void) { serverAssert(GlobalLocksAcquired()); listIter li; @@ -273,11 +294,20 @@ void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); const unsigned char *p = (const unsigned char*)ptr; + if (g_pserver->repl_batch_idxStart >= 0) { - long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; + /* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */ + long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); + if (lower_bound == -1) + lower_bound = g_pserver->repl_batch_offStart; + long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; if (minimumsize > g_pserver->repl_backlog_size) { flushReplBacklogToClients(); - minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; + lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); + if (lower_bound == -1) + lower_bound = g_pserver->repl_batch_offStart; + + minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; if (minimumsize > g_pserver->repl_backlog_size) { // This is an emergency overflow, we better resize to fit @@ -292,6 +322,7 @@ void feedReplicationBacklog(const void *ptr, size_t len) { /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ + while(len) { size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx; if (thislen > len) thislen = len; @@ -478,7 +509,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) if (fSendRaw) { char aux[LONG_STR_SIZE+3]; - /* Add the multi bulk reply length. */ aux[0] = '*'; int multilen = ll2string(aux+1,sizeof(aux)-1,argc); @@ -652,15 +682,19 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, decrRefCount(cmdobj); } +int prepareClientToWrite(client *c); + /* Feed the replica 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { - long long j, skip, len; + long long skip, len; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); if (g_pserver->repl_backlog_histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); + c->repl_curr_off = g_pserver->master_repl_offset; + c->repl_end_off = g_pserver->master_repl_offset; return 0; } @@ -677,31 +711,20 @@ long long addReplyReplicationBacklog(client *c, long long offset) { skip = offset - g_pserver->repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - /* Point j to the oldest byte, that is actually our - * g_pserver->repl_backlog_off byte. */ - j = (g_pserver->repl_backlog_idx + - (g_pserver->repl_backlog_size-g_pserver->repl_backlog_histlen)) % - g_pserver->repl_backlog_size; - serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); - - /* Discard the amount of data to seek to the specified 'offset'. */ - j = (j + skip) % g_pserver->repl_backlog_size; - - /* Feed replica with data. Since it is a circular buffer we have to - * split the reply in two parts if we are cross-boundary. */ len = g_pserver->repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); - while(len) { - long long thislen = - ((g_pserver->repl_backlog_size - j) < len) ? - (g_pserver->repl_backlog_size - j) : len; - serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); - addReplySds(c,sdsnewlen(g_pserver->repl_backlog + j, thislen)); - len -= thislen; - j = 0; - } - return g_pserver->repl_backlog_histlen - skip; + /* Set the start and end offsets for the replica so that a future + * writeToClient will send the backlog from the given offset to + * the current end of the backlog to said replica */ + c->repl_curr_off = offset - 1; + c->repl_end_off = g_pserver->master_repl_offset; + + /* Force the partial sync to be queued */ + prepareClientToWrite(c); + c->fPendingReplicaWrite = true; + + return len; } /* Return the offset to provide as reply to the PSYNC command received @@ -734,6 +757,10 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) { replica->psync_initial_offset = offset; replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + + replica->repl_curr_off = offset; + replica->repl_end_off = g_pserver->master_repl_offset; + /* We are going to accumulate the incremental changes for this * replica as well. Set replicaseldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ @@ -1356,6 +1383,7 @@ void replconfCommand(client *c) { * 4) Update the count of "good replicas". */ void putSlaveOnline(client *replica) { replica->replstate = SLAVE_STATE_ONLINE; + replica->repl_put_online_on_ack = 0; replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */ @@ -3058,6 +3086,11 @@ void syncWithMaster(connection *conn) { if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); + /* Reset the bulklen information in case it is lingering from the last connection + * The partial sync will start from the beginning of a command so these should be reset */ + mi->master->reqtype = 0; + mi->master->multibulklen = 0; + mi->master->bulklen = -1; if (cserver.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n"); } @@ -4897,14 +4930,18 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long } void _clientAsyncReplyBufferReserve(client *c, size_t len); + void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); + /* If we have the repl backlog lock, we will deadlock */ + serverAssert(!g_pserver->repl_backlog_lock.fOwnLock()); if (g_pserver->repl_batch_offStart < 0) return; if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { bool fAsyncWrite = false; + long long min_offset = LONG_LONG_MAX; // Ensure no overflow serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset); serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); @@ -4913,33 +4950,36 @@ void flushReplBacklogToClients() listIter li; listNode *ln; listRewind(g_pserver->slaves, &li); + /* We don't actually write any data in this function since we send data + * directly from the replication backlog to replicas in writeToClient. + * + * What we do however, is set the end offset of each replica here. This way, + * future calls to writeToClient will know up to where in the replication + * backlog is valid for writing. */ while ((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); if (!canFeedReplicaReplBuffer(replica)) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; - std::unique_lock ul(replica->lock, std::defer_lock); - if (FCorrectThread(replica)) - ul.lock(); - else + std::unique_lock ul(replica->lock); + if (!FCorrectThread(replica)) fAsyncWrite = true; - - if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { - long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; - serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy); - serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy)); - serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size); - - addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); - } else { - auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; - if (fAsyncWrite) - _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); - addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); - addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); - serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart)); + + /* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */ + serverAssert(replica->repl_curr_off != -1); + + min_offset = std::min(min_offset, replica->repl_curr_off); + + replica->repl_end_off = g_pserver->master_repl_offset; + + /* Only if the there isn't already a pending write do we prepare the client to write */ + if (!replica->fPendingReplicaWrite){ + serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset); + prepareClientToWrite(replica); + replica->fPendingReplicaWrite = true; } + } if (fAsyncWrite) ProcessPendingAsyncWrites(); @@ -4947,7 +4987,8 @@ void flushReplBacklogToClients() // This may be called multiple times per "frame" so update with our progress flushing to clients g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - } + g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst); + } } diff --git a/src/server.cpp b/src/server.cpp index 0d540c98b..b51634364 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2021,7 +2021,6 @@ void clientsCron(int iel) { while(listLength(g_pserver->clients) && iterations--) { client *c; listNode *head; - /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */ @@ -3245,6 +3244,7 @@ void initServerConfig(void) { g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER; g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; g_pserver->master_repl_offset = 0; + g_pserver->repl_lowest_off.store(-1, std::memory_order_seq_cst); /* Replication partial resync backlog */ g_pserver->repl_backlog = NULL; diff --git a/src/server.h b/src/server.h index 129a3d716..0d6f766ce 100644 --- a/src/server.h +++ b/src/server.h @@ -1589,6 +1589,13 @@ struct client { long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this replica output buffer should use. */ + + long long repl_curr_off = -1;/* Replication offset of the replica, also where in the backlog we need to start from + * when sending data to this replica. */ + long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset + * to prevent needing the global lock */ + int fPendingReplicaWrite; /* Is there a write queued for this replica? */ + char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ char *slave_addr; /* Optionally given by REPLCONF ip-address */ @@ -2356,6 +2363,9 @@ struct redisServer { that is the next byte will'll write to.*/ long long repl_backlog_off; /* Replication "master offset" of first byte in the replication backlog buffer.*/ + long long repl_backlog_start; /* Used to compute indicies from offsets + basically, index = (offset - start) % size */ + fastlock repl_backlog_lock {"replication backlog"}; time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time. @@ -2367,6 +2377,8 @@ struct redisServer { int repl_diskless_load; /* Slave parse RDB directly from the socket. * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ + std::atomic repl_lowest_off; /* The lowest offset amongst all replicas + -1 if there are no replicas */ /* Replication (replica) */ list *masters; int enable_multimaster; @@ -3712,6 +3724,8 @@ void mixDigest(unsigned char *digest, const void *ptr, size_t len); void xorDigest(unsigned char *digest, const void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags); + + int moduleGILAcquiredByModule(void); extern int g_fInCrash; static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate @@ -3779,6 +3793,7 @@ void tlsCleanup(void); int tlsConfigure(redisTLSContextConfig *ctx_config); + class ShutdownException {}; @@ -3790,3 +3805,5 @@ class ShutdownException int iAmMaster(void); #endif + + diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index e57c7e1e5..d1db6cc57 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -33,7 +33,8 @@ start_server {tags {"maxmemory"}} { # Get the current memory limit and calculate a new limit. # We just add 100k to the current memory size so that it is # fast for us to reach that limit. - set used [s used_memory] + set overhead [s mem_not_counted_for_evict] + set used [expr [s used_memory] - $overhead] set limit [expr {$used+100*1024}] r config set maxmemory $limit r config set maxmemory-policy $policy @@ -42,7 +43,7 @@ start_server {tags {"maxmemory"}} { while 1 { r setex [randomKey] 10000 x incr numkeys - if {[s used_memory]+4096 > $limit} { + if {[expr {[s used_memory] - $overhead + 4096}] > $limit} { assert {$numkeys > 10} break } @@ -52,7 +53,8 @@ start_server {tags {"maxmemory"}} { for {set j 0} {$j < $numkeys} {incr j} { r setex [randomKey] 10000 x } - assert {[s used_memory] < ($limit+4096)} + set used_amt [expr [s used_memory] - $overhead] + assert {$used_amt < ($limit+4096)} } } @@ -65,7 +67,8 @@ start_server {tags {"maxmemory"}} { # Get the current memory limit and calculate a new limit. # We just add 100k to the current memory size so that it is # fast for us to reach that limit. - set used [s used_memory] + set overhead [s mem_not_counted_for_evict] + set used [expr [s used_memory] - $overhead] set limit [expr {$used+100*1024}] r config set maxmemory $limit r config set maxmemory-policy $policy @@ -74,7 +77,7 @@ start_server {tags {"maxmemory"}} { while 1 { r set [randomKey] x incr numkeys - if {[s used_memory]+4096 > $limit} { + if {[expr [s used_memory] - $overhead]+4096 > $limit} { assert {$numkeys > 10} break } @@ -91,7 +94,7 @@ start_server {tags {"maxmemory"}} { } } if {[string match allkeys-* $policy]} { - assert {[s used_memory] < ($limit+4096)} + assert {[expr [s used_memory] - $overhead] < ($limit+4096)} } else { assert {$err == 1} } @@ -107,7 +110,8 @@ start_server {tags {"maxmemory"}} { # Get the current memory limit and calculate a new limit. # We just add 100k to the current memory size so that it is # fast for us to reach that limit. - set used [s used_memory] + set overhead [s mem_not_counted_for_evict] + set used [expr [s used_memory] - $overhead] set limit [expr {$used+100*1024}] r config set maxmemory $limit r config set maxmemory-policy $policy @@ -121,7 +125,7 @@ start_server {tags {"maxmemory"}} { } else { r set "key:$numkeys" x } - if {[s used_memory]+4096 > $limit} { + if {[expr [s used_memory] - $overhead]+4096 > $limit} { assert {$numkeys > 10} break } @@ -135,7 +139,7 @@ start_server {tags {"maxmemory"}} { catch {r setex "foo:$j" 10000 x} } # We should still be under the limit. - assert {[s used_memory] < ($limit+4096)} + assert {[expr [s used_memory] - $overhead] < ($limit+4096)} # However all our non volatile keys should be here. for {set j 0} {$j < $numkeys} {incr j 2} { assert {[r exists "key:$j"]} @@ -305,7 +309,8 @@ start_server {tags {"maxmemory"} overrides {server-threads 1}} { # we need to make sure to evict keynames of a total size of more than # 16kb since the (PROTO_REPLY_CHUNK_BYTES), only after that the # invalidation messages have a chance to trigger further eviction. - set used [s used_memory] + set overhead [s mem_not_counted_for_evict] + set used [expr [s used_memory] - $overhead] set limit [expr {$used - 40000}] r config set maxmemory $limit From f477186faa02565b50f32086ceb466dee1705196 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Jun 2021 06:10:13 +0000 Subject: [PATCH 2/4] Reenable LTO Former-commit-id: e7c1e1c9d8021f48c4081a9dfb84dba9da2521fc --- src/Makefile | 12 +++++++++--- src/motd.cpp | 6 ++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/Makefile b/src/Makefile index 92bb346f4..e2ae02720 100644 --- a/src/Makefile +++ b/src/Makefile @@ -15,7 +15,7 @@ release_hdr := $(shell sh -c './mkreleasehdr.sh') uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') -OPTIMIZATION?=-O2 +OPTIMIZATION?=-O2 -flto DEPENDENCY_TARGETS=hiredis linenoise lua hdr_histogram rocksdb NODEPS:=clean distclean @@ -349,9 +349,9 @@ endif REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX) -REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) +REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX) REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o release.o crcspeed.o crc64.o siphash.o redis-benchmark.o storage-lite.o fastlock.o new.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_CHECK_RDB_NAME=keydb-check-rdb$(PROG_SUFFIX) @@ -435,6 +435,12 @@ DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ # Because the jemalloc.h header is generated as a part of the jemalloc build, # building it should complete before building any other object. Instead of # depending on a single artifact, build all dependencies first. +motd_client.o: motd.cpp .make-prerequisites + $(REDIS_CXX) -MMD -o motd_client.o -c $< -DCLIENT -fno-lto + +motd_server.o: motd.cpp .make-prerequisites + $(REDIS_CXX) -MMD -o motd_server.o -c $< -DSERVER + %.o: %.c .make-prerequisites $(REDIS_CC) -MMD -o $@ -c $< diff --git a/src/motd.cpp b/src/motd.cpp index 370a11e68..795281734 100644 --- a/src/motd.cpp +++ b/src/motd.cpp @@ -1,7 +1,11 @@ +#ifdef CLIENT extern "C" { #include #include } +#else +#include "sds.h" +#endif #include #include #include @@ -15,6 +19,7 @@ extern "C" { #ifdef MOTD #include +#ifdef CLIENT extern "C" { __attribute__ ((weak)) hisds hi_sdscatlen(hisds s, const void *t, size_t len) { return sdscatlen(s, t, len); @@ -23,6 +28,7 @@ __attribute__ ((weak)) hisds hi_sdscat(hisds s, const char *t) { return sdscat(s, t); } } +#endif static const char *szMotdCachePath() { From d5246a79aa6b868714cfb517081230ec9be89fe2 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Jun 2021 06:11:01 +0000 Subject: [PATCH 3/4] OPTIMIZATION: Only notify the condition variable when needed Former-commit-id: 11f07b49c613f54cef682da1e3c8fc54918809b0 --- src/server.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index b51634364..e7a1e1aaf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6996,9 +6996,10 @@ void OnTerminate() void wakeTimeThread() { updateCachedTime(); std::lock_guard lock(time_thread_mutex); + if (sleeping_threads >= cserver.cthreads) + time_thread_cv.notify_one(); sleeping_threads--; serverAssert(sleeping_threads >= 0); - time_thread_cv.notify_one(); } void *timeThreadMain(void*) { From ae94ce343838069a9d576fc56d95f31dbaf68069 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Jun 2021 06:11:14 +0000 Subject: [PATCH 4/4] Avoid taking locks when we don't need to Former-commit-id: 0d8d3ee9e217cd1f1366a117e6e212f610a028e1 --- src/db.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index 71cb27a03..ecda7e55b 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2726,6 +2726,8 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) serverAssert(sdsKey != nullptr); serverAssert(FImplies(*pde != nullptr, dictGetVal(*pde) != nullptr)); // early versions set a NULL object, this is no longer valid serverAssert(m_refCount == 0); + if (m_pdbSnapshot == nullptr && g_pserver->m_pstorageFactory == nullptr) + return; std::unique_lock ul(g_expireLock); // First see if the key can be obtained from a snapshot