diff --git a/src/config.cpp b/src/config.cpp index 9d7f14007..b546ef607 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2347,6 +2347,7 @@ static int updateReplBacklogSize(long long val, long long prev, const char **err UNUSED(err); g_pserver->repl_backlog_size = prev; resizeReplicationBacklog(val); + g_pserver->repl_backlog_config_size = g_pserver->repl_backlog_size; return 1; } diff --git a/src/evict.cpp b/src/evict.cpp index 31cadeae5..36837e17d 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -392,9 +392,16 @@ size_t freeMemoryGetNotCountedMemory(void) { while((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); std::unique_lock(replica->lock); - overhead += getClientOutputBufferMemoryUsage(replica); + /* we don't wish to multiple count the replication backlog shared usage */ + overhead += (getClientOutputBufferMemoryUsage(replica) - getClientReplicationBacklogSharedUsage(replica)); } } + + /* also don't count the replication backlog memory + * that's where the replication clients get their memory from */ + overhead += (g_pserver->repl_backlog_size - g_pserver->repl_backlog_config_size); + + if (g_pserver->aof_state != AOF_OFF) { overhead += sdsalloc(g_pserver->aof_buf)+aofRewriteBufferSize(); } @@ -516,6 +523,7 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) { if (g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free; /* We need to free memory, but policy forbids. */ + serverLog(LL_NOTICE, "evicting i guess lol, the overhead was %ld, the repl_backlog_size, %lld", freeMemoryGetNotCountedMemory(), g_pserver->repl_backlog_size); while (mem_freed < mem_tofree) { int j, k, i; static unsigned int next_db = 0; diff --git a/src/networking.cpp b/src/networking.cpp index cac58ff07..c51a02a1d 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -224,7 +224,6 @@ void clientInstallWriteHandler(client *c) { (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { - // serverLog(LL_NOTICE, "we installing boyz"); AssertCorrectThread(c); serverAssert(c->lock.fOwnLock()); /* Here instead of installing the write handler, we just flag the @@ -1801,6 +1800,9 @@ int writeToClient(client *c, int handler_installed) { if (nwrittenPart2 == -1) nwritten = -1; } + if (c->flags & CLIENT_SLAVE && handler_installed) + serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed); + g_pserver->stat_net_output_bytes += totwritten; if (nwritten == -1) { if (connGetState(c->conn) == CONN_STATE_CONNECTED) { @@ -1821,6 +1823,11 @@ int writeToClient(client *c, int handler_installed) { if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime; } if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) { + if(c->flags & CLIENT_SLAVE && handler_installed){ + serverLog(LL_NOTICE, "Uninstalling handler"); + serverLog(LL_NOTICE, "handler repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); + serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); + } c->sentlen = 0; if (handler_installed) connSetWriteHandler(c->conn, NULL); @@ -1836,6 +1843,7 @@ int writeToClient(client *c, int handler_installed) { /* Write event handler. Just send data to the client. */ void sendReplyToClient(connection *conn) { client *c = (client*)connGetPrivateData(conn); + serverLog(LL_NOTICE, "called the sendreplytoclient"); if (writeToClient(c,1) == C_ERR) { AeLocker ae; @@ -1970,6 +1978,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write); processed += (int)vec.size(); + // serverLog(LL_NOTICE, "entered handleClientsWithPendingWrites"); for (client *c : vec) { serverAssertDebug(FCorrectThread(c)); @@ -2008,8 +2017,10 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c) || c->repl_curr_idx != -1) { - if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) + serverLog(LL_NOTICE, "Setting a write handler for later"); + if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) { freeClientAsync(c); + } } } @@ -3359,11 +3370,6 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * that writes to said replica are using data from the replication backlog * as opposed to it's own internal buffer, this number should keep track of that */ unsigned long getClientReplicationBacklogSharedUsage(client *c) { - if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1){ - // serverLog(LL_NOTICE, "repl_backlog_size %lld, repl_backlog_idx %lld, master_repl_offset %lld, repl_curr_idx %lld, repl_curr_off %lld", - // g_pserver->repl_backlog_size, g_pserver->repl_backlog_idx, g_pserver->master_repl_offset, c->repl_curr_idx, c->repl_curr_off); - } - return (!(c->flags & CLIENT_SLAVE) || c->repl_curr_idx == -1) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; } diff --git a/src/replication.cpp b/src/replication.cpp index 1bae2773a..60f25052a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4684,5 +4684,21 @@ void flushReplBacklogToClients() // This may be called multiple times per "frame" so update with our progress flushing to clients g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + } else if (getLowestOffsetAmongReplicas() != -1){ + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + + std::unique_lock ul(replica->lock, std::defer_lock); + if (FCorrectThread(replica)) + ul.lock(); + + /* try to force prepare client to write i guess? */ + if (replica->repl_curr_idx != -1){ + if (prepareClientToWrite(replica) != C_OK) continue; + } + } } }