diff --git a/src/networking.cpp b/src/networking.cpp index 18ab382bd..cac58ff07 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1680,22 +1680,8 @@ int writeToClient(client *c, int handler_installed) { /* if this is a write to a replica, it's coming straight from the replication backlog */ long long repl_backlog_idx = g_pserver->repl_backlog_idx; - bool wroteFromClientBuffer = false; /* True if you wrote from the client buffer in this function call */ - while(clientHasPendingReplies(c)) { - wroteFromClientBuffer = true; - if (c->flags & CLIENT_SLAVE && listLength(c->reply) % 10 == 0){ - - serverLog(LL_NOTICE, "-----------------------------------------"); - serverLog(LL_NOTICE, "replica w/ pending replies, with a reply list size of: %lu", listLength(c->reply)); - serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); - serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); - serverLog(LL_NOTICE, "-----------------------------------------"); - - } if (c->bufpos > 0) { - // serverLog(LL_NOTICE, "Sending reply %d", x); - // serverLog(LL_NOTICE, "SUSSUS AMOGUS, %ld", c->bufpos); nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; @@ -1753,9 +1739,7 @@ int writeToClient(client *c, int handler_installed) { /* If there are no more pending replies, then we have transmitted the RDB. * This means further replication commands will be taken straight from the * replication backlog from now on. */ - if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c)){ - if (!c->transmittedRDB) - serverLog(LL_NOTICE, "---------->>>>>>>> TRANSMISSION OF THE RDB HAS COMPLETED <<<<<<<<----------"); + if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){ c->transmittedRDB = true; } @@ -1775,49 +1759,27 @@ int writeToClient(client *c, int handler_installed) { /* wrap around case, v. rare */ /* also v. buggy so there's that */ } else { - serverLog(LL_NOTICE, "WRAP CASE"); - serverLog(LL_NOTICE, "-----------------------------------------"); - serverLog(LL_NOTICE, "requested to write: %ld", nrequested); - serverLog(LL_NOTICE, "actually written: %ld", nwritten); - serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); - serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen); - serverLog(LL_NOTICE, "nwritten: %ld", nwritten); - serverLog(LL_NOTICE, "-----------------------------------------"); - nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx; nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx); /* only attempt wrapping if we write the correct number of bytes */ if (nwritten == repl_backlog_size - c->repl_curr_idx){ - serverLog(LL_NOTICE, "SECOND STAGE"); - serverLog(LL_NOTICE, "-----------------------------------------"); - serverLog(LL_NOTICE, "requested to write: %ld", nrequested); - serverLog(LL_NOTICE, "actually written: %ld", nwritten); - serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); - serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen); - serverLog(LL_NOTICE, "-----------------------------------------"); - long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx); if (nwrittenPart2 != -1) nwritten += nwrittenPart2; - serverLog(LL_NOTICE, "nwrittenPart2: %lld", nwrittenPart2); - serverLog(LL_NOTICE, "-----------------------------------------"); - } else { - serverLog(LL_NOTICE, "SUPER SHORT"); - } - + } } /* only update the replica's current index if bytes were sent */ // if (nrequested != nwritten){ - serverLog(LL_NOTICE, "-----------------------------------------"); - serverLog(LL_NOTICE, "AFTER THE FACT"); - serverLog(LL_NOTICE, "requested to write: %ld", nrequested); - serverLog(LL_NOTICE, "actually written: %ld", nwritten); - serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); - serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); - serverLog(LL_NOTICE, "-----------------------------------------"); + // serverLog(LL_NOTICE, "-----------------------------------------"); + // serverLog(LL_NOTICE, "AFTER THE FACT"); + // serverLog(LL_NOTICE, "requested to write: %ld", nrequested); + // serverLog(LL_NOTICE, "actually written: %ld", nwritten); + // serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); + // serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); + // serverLog(LL_NOTICE, "-----------------------------------------"); // } @@ -1902,25 +1864,36 @@ void ProcessPendingAsyncWrites() serverAssert(c->fPendingAsyncWrite); if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) { - zfree(c->replyAsync); - c->replyAsync = nullptr; + if (c->replyAsync != nullptr){ + zfree(c->replyAsync); + c->replyAsync = nullptr; + } c->fPendingAsyncWrite = FALSE; continue; } - int size = c->replyAsync->used; + /* since writes from master to replica can come directly from the replication backlog, + * writes may have been signalled without having been copied to the replyAsync buffer, + * thus causing the buffer to be NULL */ + if (c->replyAsync != nullptr){ + int size = c->replyAsync->used; - if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) { - memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size); - c->bufpos += size; - } else { - c->reply_bytes += c->replyAsync->size; - listAddNodeTail(c->reply, c->replyAsync); + if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) { + memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size); + c->bufpos += size; + } else { + c->reply_bytes += c->replyAsync->size; + listAddNodeTail(c->reply, c->replyAsync); + c->replyAsync = nullptr; + } + + zfree(c->replyAsync); c->replyAsync = nullptr; + } else { + /* Only replicas should have empty async reply buffers */ + serverAssert(c->flags & CLIENT_SLAVE); } - zfree(c->replyAsync); - c->replyAsync = nullptr; c->fPendingAsyncWrite = FALSE; // Now install the write event handler @@ -1935,17 +1908,17 @@ void ProcessPendingAsyncWrites() { ae_flags |= AE_BARRIER; } - + if (!((c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) continue; - + asyncCloseClientOnOutputBufferLimitReached(c); if (c->flags & CLIENT_CLOSE_ASAP) continue; // we will never write this so don't post an op - + std::atomic_thread_fence(std::memory_order_seq_cst); - + if (FCorrectThread(c)) { prepareClientToWrite(c); // queue an event @@ -3386,7 +3359,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * that writes to said replica are using data from the replication backlog * as opposed to it's own internal buffer, this number should keep track of that */ unsigned long getClientReplicationBacklogSharedUsage(client *c) { - return (c->repl_curr_idx == -1 && c->flags & CLIENT_SLAVE) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; + if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1){ + // serverLog(LL_NOTICE, "repl_backlog_size %lld, repl_backlog_idx %lld, master_repl_offset %lld, repl_curr_idx %lld, repl_curr_off %lld", + // g_pserver->repl_backlog_size, g_pserver->repl_backlog_idx, g_pserver->master_repl_offset, c->repl_curr_idx, c->repl_curr_off); + } + + return (!(c->flags & CLIENT_SLAVE) || c->repl_curr_idx == -1) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; } /* This function returns the number of bytes that Redis is diff --git a/src/replication.cpp b/src/replication.cpp index ccb538a69..ef33fbfd9 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -47,8 +47,8 @@ #include #include -#define BYPASS_BUFFER -// #define BYPASS_PSYNC +// #define BYPASS_BUFFER +// #define RESIZE_BACKLOG void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); @@ -57,6 +57,30 @@ void putSlaveOnline(client *replica); int cancelReplicationHandshake(redisMaster *mi); static void propagateMasterStaleKeys(); +/* gets the lowest offset amongst all of the replicas */ +long long getLowestOffsetAmongReplicas(){ + serverAssert(GlobalLocksAcquired()); + long long min_offset = LONG_LONG_MAX; + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + // check for potential overflow first + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (replica->flags & CLIENT_CLOSE_ASAP) continue; + if (replica->repl_curr_idx == -1) continue; + + std::unique_lock ul(replica->lock, std::defer_lock); + if (FCorrectThread(replica)) + ul.lock(); + + min_offset = std::min(min_offset, replica->repl_curr_off); + } + /* return -1 if no other minimum was found */ + return min_offset == LONG_LONG_MAX ? -1 : min_offset; +} /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case * the instance is configured to have no persistence. */ @@ -67,11 +91,13 @@ void resizeReplicationBacklogForClients(long long newsize); void setReplIdx(client *c, long long idx, long long off){ if (prepareClientToWrite(c) != C_OK) return; // serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx); - // serverLog(LL_NOTICE, "What is this value? %lld", c->repl_curr_idx); + // serverLog(LL_NOTICE, "Repl Index started at: %lld", c->repl_curr_idx); if (c->repl_curr_idx == -1){ c->repl_curr_idx = idx; c->repl_curr_off = off; } + // serverLog(LL_NOTICE, "Repl Index has become: %lld", c->repl_curr_idx); + } /* --------------------------- Utility functions ---------------------------- */ @@ -277,7 +303,7 @@ void resizeReplicationBacklogForClients(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; - serverLog(LL_NOTICE, "WE HAD TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize); + serverLog(LL_NOTICE, "WE HAVE TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize); /* get the critical client size, i.e. the size of the data unflushed to clients */ long long earliest_off = LONG_LONG_MAX; long long earliest_idx = -1; @@ -290,6 +316,20 @@ void resizeReplicationBacklogForClients(long long newsize) { earliest_off = replica->repl_curr_off; earliest_idx = replica->repl_curr_idx; } + serverLog(LL_NOTICE, "repl_curr_idx: %lld, earlistidx: %lld", replica->repl_curr_idx, earliest_idx); + } + serverLog(LL_NOTICE, "We are starting with: master_repl_offset: %lld, repl_batch_offStart: %lld, earliest_off: %lld, " + "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, earliest_idx: %lld, repl_backlog_size: %lld", + g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, earliest_off, + g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, earliest_idx, g_pserver->repl_backlog_size + ); + + long long new_off = 0, new_idx = 0; + + /* if no earliest offset is found amongst the clients, they are all up to date with the flushed index */ + if (earliest_off == LONG_LONG_MAX && earliest_idx == -1){ + earliest_idx = g_pserver->repl_batch_idxStart; + earliest_off = g_pserver->repl_batch_offStart; } if (g_pserver->repl_backlog != NULL) { @@ -330,8 +370,11 @@ void resizeReplicationBacklogForClients(long long newsize) { if (replica->repl_curr_idx < 0) replica->repl_curr_idx += g_pserver->repl_backlog_size; } + new_idx = replica->repl_curr_idx; } - g_pserver->repl_batch_idxStart = 0; + g_pserver->repl_batch_idxStart -= earliest_idx; + if (g_pserver->repl_batch_idxStart < 0) + g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size; } else { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = (char*)zmalloc(newsize); @@ -342,6 +385,12 @@ void resizeReplicationBacklogForClients(long long newsize) { } } g_pserver->repl_backlog_size = newsize; + + serverLog(LL_NOTICE, "We are ending with: master_repl_offset: %lld, repl_batch_offStart: %lld, new_off: %lld, " + "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, new_idx: %lld, repl_backlog_size: %lld", + g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, new_off, + g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, new_idx, g_pserver->repl_backlog_size + ); } void freeReplicationBacklog(void) { @@ -367,20 +416,41 @@ void feedReplicationBacklog(const void *ptr, size_t len) { const unsigned char *p = (const unsigned char*)ptr; if (g_pserver->repl_batch_idxStart >= 0) { - long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; + /* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */ + long long lower_bound = getLowestOffsetAmongReplicas(); + if (lower_bound == -1) + lower_bound = g_pserver->repl_batch_offStart; + long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; if (minimumsize > g_pserver->repl_backlog_size) { flushReplBacklogToClients(); - minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; + minimumsize = g_pserver->master_repl_offset + len - lower_bound +1; if (minimumsize > g_pserver->repl_backlog_size) { // This is an emergency overflow, we better resize to fit long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); - resizeReplicationBacklog(newsize); + resizeReplicationBacklogForClients(newsize); } } +#ifdef RESIZE_BACKLOG + long long lowest_replica_offset = getLowestOffsetAmongReplicas(); + minimumsize = g_pserver->master_repl_offset + len - lowest_replica_offset; + if (lowest_replica_offset != -1 && minimumsize > g_pserver->repl_backlog_size){ + serverLog(LL_WARNING, "THE REPLICATION BACKLOG SIZE IS TOO SMALL, THIS IS A PROBLEM"); + long long oldsize = g_pserver->repl_backlog_size; + resizeReplicationBacklogForClients(std::max(g_pserver->repl_backlog_size * 2, minimumsize)); + serverLog(LL_WARNING, "changed size from %lld to %lld", oldsize, g_pserver->repl_backlog_size); + flushReplBacklogToClients(); + } +#endif } + // serverLog(LL_NOTICE, "Pt2 start with: master_repl_offset: %lld, repl_batch_offStart: %lld, " + // "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld", + // g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, + // g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size + // ); + g_pserver->master_repl_offset += len; /* This is a circular buffer, so write as much data we can at every @@ -395,12 +465,23 @@ void feedReplicationBacklog(const void *ptr, size_t len) { len -= thislen; p += thislen; g_pserver->repl_backlog_histlen += thislen; + // serverLog(LL_NOTICE, "Pt2 intermediate with: master_repl_offset: %lld, repl_batch_offStart: %lld, " + // "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld", + // g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, + // g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size + // ); } if (g_pserver->repl_backlog_histlen > g_pserver->repl_backlog_size) g_pserver->repl_backlog_histlen = g_pserver->repl_backlog_size; /* Set the offset of the first byte we have in the backlog. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset - g_pserver->repl_backlog_histlen + 1; + + // serverLog(LL_NOTICE, "Pt2 end with: master_repl_offset: %lld, repl_batch_offStart: %lld, " + // "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld", + // g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, + // g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size + // ); } /* Wrapper for feedReplicationBacklog() that takes Redis string objects @@ -774,7 +855,6 @@ long long addReplyReplicationBacklog(client *c, long long offset) { * split the reply in two parts if we are cross-boundary. */ len = g_pserver->repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); - serverLog(LL_NOTICE, "Coming through from the addReplicationBacklog"); #ifdef BYPASS_PSYNC setReplIdx(c, j, offset); #else @@ -789,7 +869,6 @@ long long addReplyReplicationBacklog(client *c, long long offset) { j = 0; } #endif - serverLog(LL_NOTICE, "rdb transmitted? %d, pending replies? %d", c->transmittedRDB, clientHasPendingReplies(c)); return g_pserver->repl_backlog_histlen - skip; } @@ -1520,13 +1599,11 @@ void sendBulkToSlave(connection *conn) { replica->repldboff += nwritten; g_pserver->stat_net_output_bytes += nwritten; - // replica->repl_curr_idx = g_pserver->repl_backlog_idx; if (replica->repldboff == replica->repldbsize) { close(replica->repldbfd); replica->repldbfd = -1; connSetWriteHandler(replica->conn,NULL); putSlaveOnline(replica); - serverLog(LL_NOTICE, "ABOUT TO DIE HERE"); } } @@ -4560,6 +4637,7 @@ void _clientAsyncReplyBufferReserve(client *c, size_t len); /* Has the end of the replication backlog overflowed past the beginning? */ bool replOverflowHasOccured(){ + if (g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart){ long long repl_idx_difference = g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart ? g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart : @@ -4575,8 +4653,13 @@ thread_local int transmittedCount = 0; void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); - if (g_pserver->repl_batch_offStart < 0) + if (g_pserver->repl_batch_offStart < 0){ + if (getLowestOffsetAmongReplicas() == -1){ + serverLog(LL_NOTICE, "this is a case i probably have to handle"); + } return; + } + if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { bool fAsyncWrite = false; @@ -4585,7 +4668,7 @@ void flushReplBacklogToClients() serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); - serverAssert(!replOverflowHasOccured()); + // serverAssert(!replOverflowHasOccured()); listIter li; listNode *ln; listRewind(g_pserver->slaves, &li); @@ -4605,11 +4688,21 @@ void flushReplBacklogToClients() ul.lock(); else fAsyncWrite = true; + + if (g_pserver->master_repl_offset - replica->repl_curr_off > g_pserver->repl_backlog_size){ + serverLog(LL_WARNING, "THE REPLICATION BACKLOG SIZE IS TOO SMALL, THIS IS A PROBLEM"); + long long oldsize = g_pserver->repl_backlog_size; + resizeReplicationBacklogForClients(std::max(g_pserver->repl_backlog_size * 2, g_pserver->master_repl_offset - replica->repl_curr_off)); + serverLog(LL_WARNING, "changing size from %lld to %lld", oldsize, g_pserver->repl_backlog_size); + } + + } + + listRewind(g_pserver->slaves, &li); #endif while ((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); - // serverLog(LL_NOTICE, "replica state: %d", replica->replstate); if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; diff --git a/src/server.cpp b/src/server.cpp index 3d547f748..9664a4a6b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1796,6 +1796,7 @@ int clientsCronTrackClientsMemUsage(client *c) { mem += zmalloc_size(c); mem += c->argv_len_sum(); if (c->argv) mem += zmalloc_size(c->argv); + // serverLog(LL_NOTICE, "Mem here is : %lu", mem); /* Now that we have the memory used by the client, remove the old * value from the old category, and add it back. */ g_pserver->stat_clients_type_memory[c->client_cron_last_memory_type] -= @@ -1854,7 +1855,7 @@ void clientsCron(int iel) { while(listLength(g_pserver->clients) && iterations--) { client *c; listNode *head; - + // serverLog(LL_NOTICE, "we are at iteration: %d", iterations); /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */