diff --git a/src/evict.cpp b/src/evict.cpp index 7ec223f6d..54153dc27 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -522,7 +522,6 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) { if (g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free; /* We need to free memory, but policy forbids. */ - serverLog(LL_NOTICE, "evicting i guess lol, the overhead was %ld, the repl_backlog_size, %lld", freeMemoryGetNotCountedMemory(), g_pserver->repl_backlog_size); while (mem_freed < mem_tofree) { int j, k, i; static unsigned int next_db = 0; diff --git a/src/networking.cpp b/src/networking.cpp index cead76998..aba1f1705 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -223,9 +223,6 @@ void clientInstallWriteHandler(client *c) { * if not already done and, for slaves, if the replica can actually receive * writes at this stage. */ - if (c->flags & CLIENT_SLAVE) - serverLog(LL_NOTICE, "installing write handler"); - if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) @@ -277,10 +274,6 @@ void clientInstallAsyncWriteHandler(client *c) { int prepareClientToWrite(client *c) { bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread - if (c->flags & CLIENT_SLAVE){ - serverLog(LL_NOTICE, "got into prepareClientToWrite"); - } - if (!fAsync) { serverAssert(c->conn == nullptr || c->lock.fOwnLock()); } else { @@ -1695,10 +1688,6 @@ int writeToClient(client *c, int handler_installed) { serverAssertDebug(FCorrectThread(c)); std::unique_locklock)> lock(c->lock); - // serverLog(LL_NOTICE, "acq client"); - - if (c->flags & CLIENT_SLAVE) - serverLog(LL_NOTICE, "writeToClient has happened"); while(clientHasPendingReplies(c)) { serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); @@ -1710,7 +1699,6 @@ int writeToClient(client *c, int handler_installed) { /* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ - // serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen); if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; @@ -1764,33 +1752,24 @@ int writeToClient(client *c, int handler_installed) { * We always read from the replication backlog directly */ std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); - /* Right now, we're bringing in the offStart into the scope - * If repl_batch_offStart is equal to -1, that means the mechanism is disabled - * which implies there is no data to flush and that the global offset is accurate */ - // long long offStart = g_pserver->repl_batch_offStart == -1 ? g_pserver->master_repl_offset : g_pserver->repl_batch_offStart; - long long offStart = c->repl_end_off; - long long idxStart = getReplIndexFromOffset(offStart); + long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); serverAssert(c->repl_curr_off != -1); - if (c->repl_curr_off != offStart){ - serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld", - c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart); - - long long curr_idx = getReplIndexFromOffset(c->repl_curr_off); - long long nwrittenPart2 = 0; + if (c->repl_curr_off != c->repl_end_off){ + long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); + long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog + * in the event of a wrap around write */ /* normal case with no wrap around */ - if (idxStart >= curr_idx){ - nwritten = connWrite(c->conn, g_pserver->repl_backlog + curr_idx, idxStart - curr_idx); - /* wrap around case, v. rare */ - /* also v. buggy so there's that */ + if (repl_end_idx >= repl_curr_idx){ + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, repl_end_idx - repl_curr_idx); + /* wrap around case */ } else { - serverLog(LL_NOTICE, "ROAD OF RESISTANCE"); - nwritten = connWrite(c->conn, g_pserver->repl_backlog + curr_idx, g_pserver->repl_backlog_size - curr_idx); + nwritten = connWrite(c->conn, g_pserver->repl_backlog + repl_curr_idx, g_pserver->repl_backlog_size - repl_curr_idx); /* only attempt wrapping if we write the correct number of bytes */ - if (nwritten == g_pserver->repl_backlog_size - curr_idx){ - long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, idxStart); - if (nwrittenPart2 != -1) - nwritten += nwrittenPart2; + if (nwritten == g_pserver->repl_backlog_size - repl_curr_idx){ + nwritten2ndStage = connWrite(c->conn, g_pserver->repl_backlog, repl_end_idx); + if (nwritten2ndStage != -1) + nwritten += nwritten2ndStage; } } @@ -1798,31 +1777,19 @@ int writeToClient(client *c, int handler_installed) { if (nwritten > 0){ totwritten += nwritten; c->repl_curr_off += nwritten; - if (1){ - serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld", - c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart); - } - serverAssert(c->repl_curr_off <= offStart); + serverAssert(c->repl_curr_off <= c->repl_end_off); /* If the client offset matches the global offset, we wrote all we needed to, * in which case, there is no pending write */ - if (c->repl_curr_off == offStart){ - serverLog(LL_NOTICE, "good, %lld", offStart); + if (c->repl_curr_off == c->repl_end_off){ c->fPendingReplicaWrite = false; - } else { - serverLog(LL_NOTICE, "mismatch between repl_curr_off (%lld) and offStart (%lld)", c->repl_curr_off, offStart); } } /* If the second part of a write didn't go through, we still need to register that */ - if (nwrittenPart2 == -1) nwritten = -1; + if (nwritten2ndStage == -1) nwritten = -1; } - - // if (c->flags & CLIENT_SLAVE && handler_installed) - // serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed); - } - // serverLog(LL_NOTICE, "rel client"); g_pserver->stat_net_output_bytes += totwritten; if (nwritten == -1) { if (connGetState(c->conn) == CONN_STATE_CONNECTED) { @@ -1843,11 +1810,6 @@ int writeToClient(client *c, int handler_installed) { if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime; } if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) { - // if(c->flags & CLIENT_SLAVE && handler_installed){ - // serverLog(LL_NOTICE, "Uninstalling handler"); - // serverLog(LL_NOTICE, "repl_backlog_size: %lld", g_pserver->repl_backlog_size); - // serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); - // } c->sentlen = 0; if (handler_installed) connSetWriteHandler(c->conn, NULL); @@ -1863,7 +1825,6 @@ int writeToClient(client *c, int handler_installed) { /* Write event handler. Just send data to the client. */ void sendReplyToClient(connection *conn) { client *c = (client*)connGetPrivateData(conn); - // serverLog(LL_NOTICE, "called the sendreplytoclient"); if (writeToClient(c,1) == C_ERR) { AeLocker ae; @@ -1997,7 +1958,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write); processed += (int)vec.size(); - // serverLog(LL_NOTICE, "entered handleClientsWithPendingWrites"); for (client *c : vec) { serverAssertDebug(FCorrectThread(c)); @@ -2013,12 +1973,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; - // if (c->flags & CLIENT_SLAVE){ - // if(clientHasPendingReplies(c)) - // serverLog(LL_NOTICE, "somehow the client buffer has these values: %s", c->buf); - // serverLog(LL_NOTICE, "LOL"); - // } - /* Try to write buffers to the client socket. */ if (writeToClient(c,0) == C_ERR) { @@ -2216,34 +2170,6 @@ static void setProtocolError(const char *errstr, client *c) { c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR); } -static void printQueryBuffer(client *c) { - if (cserver.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) { - sds client = catClientInfoString(sdsempty(),c); - - /* Sample some protocol to given an idea about what was inside. */ - char buf[PROTO_DUMP_LEN*2]; - if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) { - snprintf(buf,sizeof(buf),"%s", c->querybuf+c->qb_pos); - } else { - snprintf(buf,sizeof(buf),"%.*s (... more %zu bytes ...) %.*s", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); - } - - /* Remove non printable chars. */ - char *p = buf; - while (*p != '\0') { - if (!isprint(*p)) *p = '.'; - p++; - } - - /* Log all the client and protocol info. */ - int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING : - LL_VERBOSE; - serverLog(loglevel, - "Query buffer from client %lu: %s. %s", c->id, client, buf); - sdsfree(client); - } -} - /* Process the query buffer for client 'c', setting up the client argument * vector for command execution. Returns C_OK if after running the function * the client has a well-formed ready to be processed command, otherwise @@ -2498,8 +2424,6 @@ void parseClientCommandBuffer(client *c) { } size_t cqueriesStart = c->vecqueuedcmd.size(); - // if (c->flags & CLIENT_MASTER) - // printQueryBuffer(c); if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { diff --git a/src/replication.cpp b/src/replication.cpp index 96bf161f9..ebdb8af78 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -60,7 +60,6 @@ static void propagateMasterStaleKeys(); void updateLowestOffsetAmongReplicas(){ serverAssert(GlobalLocksAcquired()); serverAssert(!g_pserver->repl_backlog_lock.fOwnLock()); - // serverLog(LL_NOTICE, "off- have repl"); long long min_offset = LONG_LONG_MAX; listIter li; listNode *ln; @@ -73,14 +72,13 @@ void updateLowestOffsetAmongReplicas(){ if (replica->flags & CLIENT_CLOSE_ASAP) continue; std::unique_lock ul(replica->lock); - // serverLog(LL_NOTICE, "off- acq client"); - min_offset = std::min(min_offset, replica->repl_curr_off); - // serverLog(LL_NOTICE, "off- rel client"); + min_offset = std::min(min_offset, replica->repl_curr_off); } /* return -1 if no other minimum was found */ g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst); } + /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case * the instance is configured to have no persistence. */ @@ -232,6 +230,8 @@ void createReplicationBacklog(void) { g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } +long long getReplIndexFromOffset(long long offset); + /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the * g_pserver->repl_backlog_size and to resize the buffer and setup it so that @@ -243,8 +243,6 @@ void resizeReplicationBacklog(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; - serverLog(LL_NOTICE, "WE HAD TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize); - if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. @@ -252,59 +250,8 @@ void resizeReplicationBacklog(long long newsize) { * worse often we need to alloc additional space before freeing the * old buffer. */ - if (g_pserver->repl_batch_idxStart >= 0) { - // We need to keep critical data so we can't shrink less than the hot data in the buffer - newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart); - char *backlog = (char*)zmalloc(newsize); - g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart; - - if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { - auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; - memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog); - serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); - } else { - auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; - memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); - memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); - auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; - serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); - } - zfree(g_pserver->repl_backlog); - g_pserver->repl_backlog = backlog; - g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; - g_pserver->repl_batch_idxStart = 0; - g_pserver->repl_backlog_start = g_pserver->master_repl_offset; - } else { - zfree(g_pserver->repl_backlog); - g_pserver->repl_backlog = (char*)zmalloc(newsize); - g_pserver->repl_backlog_histlen = 0; - g_pserver->repl_backlog_idx = 0; - /* Next byte we have is... the next since the buffer is empty. */ - g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; - } - } - g_pserver->repl_backlog_size = newsize; -} - -long long getReplIndexFromOffset(long long offset); - -/* The above but for when clients need extra replication backlog because ??? */ -void resizeReplicationBacklogForClients(long long newsize) { - if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) - newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; - if (g_pserver->repl_backlog_size == newsize) return; - - serverLog(LL_NOTICE, "WE HAVE TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize); - /* get the critical client size, i.e. the size of the data unflushed to clients */ - long long earliest_off = g_pserver->repl_lowest_off.load(); - - - if (g_pserver->repl_backlog != NULL) { - /* What we actually do is to flush the old buffer and realloc a new - * empty one. It will refill with new data incrementally. - * The reason is that copying a few gigabytes adds latency and even - * worse often we need to alloc additional space before freeing the - * old buffer. */ + /* get the critical client size, i.e. the size of the data unflushed to clients */ + long long earliest_off = g_pserver->repl_lowest_off.load(); if (earliest_off != -1) { // We need to keep critical data so we can't shrink less than the hot data in the buffer @@ -316,8 +263,6 @@ void resizeReplicationBacklogForClients(long long newsize) { if (g_pserver->repl_backlog_idx >= earliest_idx) { auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx; memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog); - serverLog(LL_NOTICE, "g_pserver->master_repl_offset: %lld, earliest_off: %lld, g_pserver->repl_backlog_idx: %lld, earliest_idx: %lld, repl_backlog_start: %lld", - g_pserver->master_repl_offset, earliest_off, g_pserver->repl_backlog_idx, earliest_idx, g_pserver->repl_backlog_start); serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); } else { auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx; @@ -344,14 +289,9 @@ void resizeReplicationBacklogForClients(long long newsize) { } } g_pserver->repl_backlog_size = newsize; - - serverLog(LL_NOTICE, "We are ending with: master_repl_offset: %lld, repl_batch_offStart: %lld, new_off: %lld, " - "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, new_idx: %lld, repl_backlog_size: %lld", - g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, 0LL, - g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, 0LL, g_pserver->repl_backlog_size - ); } + void freeReplicationBacklog(void) { serverAssert(GlobalLocksAcquired()); listIter li; @@ -391,17 +331,11 @@ void feedReplicationBacklog(const void *ptr, size_t len) { // This is an emergency overflow, we better resize to fit long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); - resizeReplicationBacklogForClients(newsize); + resizeReplicationBacklog(newsize); } } } - // serverLog(LL_NOTICE, "Pt2 start with: master_repl_offset: %lld, repl_batch_offStart: %lld, " - // "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld", - // g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, - // g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size - // ); - g_pserver->master_repl_offset += len; /* This is a circular buffer, so write as much data we can at every @@ -423,12 +357,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) { /* Set the offset of the first byte we have in the backlog. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset - g_pserver->repl_backlog_histlen + 1; - - // serverLog(LL_NOTICE, "Pt2 end with: master_repl_offset: %lld, repl_batch_offStart: %lld, " - // "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld", - // g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, - // g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size - // ); } /* Wrapper for feedReplicationBacklog() that takes Redis string objects @@ -578,9 +506,7 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) /* Add the SELECT command into the backlog. */ /* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */ - if (g_pserver->repl_backlog && fSendRaw) { - feedReplicationBacklogWithObject(selectcmd); - } + if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); @@ -678,7 +604,7 @@ void replicationFeedSlaves(list *replicas, int dictid, robj **argv, int argc) { void showLatestBacklog(void) { if (g_pserver->repl_backlog == NULL) return; - long long dumplen = 1024; + long long dumplen = 256; if (g_pserver->repl_backlog_histlen < dumplen) dumplen = g_pserver->repl_backlog_histlen; @@ -769,7 +695,9 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, } decrRefCount(cmdobj); } -#define BYPASS_PSYNC + +int prepareClientToWrite(client *c); + /* Feed the replica 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { @@ -809,26 +737,14 @@ long long addReplyReplicationBacklog(client *c, long long offset) { * split the reply in two parts if we are cross-boundary. */ len = g_pserver->repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); -#ifdef BYPASS_PSYNC + c->repl_curr_off = offset - 1; c->repl_end_off = g_pserver->master_repl_offset; - serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", c->id, getClientPeerId(c), c->repl_curr_off); /* Force the partial sync to be queued */ prepareClientToWrite(c); - c->fPendingReplicaWrite = true; -#else - while(len) { - long long thislen = - ((g_pserver->repl_backlog_size - j) < len) ? - (g_pserver->repl_backlog_size - j) : len; + c->fPendingReplicaWrite = true; - serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen); - addReplySds(c,sdsnewlen(g_pserver->repl_backlog + j, thislen)); - len -= thislen; - j = 0; - } -#endif return g_pserver->repl_backlog_histlen - skip; } @@ -866,15 +782,11 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) { replica->repl_curr_off = offset; replica->repl_end_off = g_pserver->master_repl_offset; - serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off); - /* We are going to accumulate the incremental changes for this * replica as well. Set replicaseldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ g_pserver->replicaseldb = -1; - serverLog(LL_NOTICE, "We are setting up here lad"); - /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(replica->flags & CLIENT_PRE_PSYNC)) { @@ -1179,7 +1091,6 @@ void syncCommand(client *c) { if (g_pserver->FRdbSaveInProgress() && g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK) { - serverLog(LL_NOTICE, "case 1"); /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another replica that is * registering differences since the server forked to save. */ @@ -1211,7 +1122,6 @@ void syncCommand(client *c) { } else if (g_pserver->FRdbSaveInProgress() && g_pserver->rdb_child_type == RDB_CHILD_TYPE_SOCKET) { - serverLog(LL_NOTICE, "case 2"); /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ @@ -1219,7 +1129,6 @@ void syncCommand(client *c) { /* CASE 3: There is no BGSAVE is progress. */ } else { - serverLog(LL_NOTICE, "case 3"); if (g_pserver->repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a @@ -4606,9 +4515,10 @@ void _clientAsyncReplyBufferReserve(client *c, size_t len); void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); + /* If we have the repl backlog lock, we will deadlock */ + serverAssert(!g_pserver->repl_backlog_lock.fOwnLock()); if (g_pserver->repl_batch_offStart < 0) return; - if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { bool fAsyncWrite = false; @@ -4617,66 +4527,31 @@ void flushReplBacklogToClients() serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); - serverLog(LL_NOTICE, "the master repl offset is %lld", g_pserver->master_repl_offset); - showLatestBacklog(); listIter li; listNode *ln; listRewind(g_pserver->slaves, &li); while ((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); - // serverLog(LL_NOTICE, "client %lu is in the party", replica->id); - - // serverLog(LL_NOTICE, "is there a write pending for %lu, %d", replica->id, replica->fPendingReplicaWrite); if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; - std::unique_lock ul(replica->lock, std::defer_lock); - if (FCorrectThread(replica)) - ul.lock(); - else + std::unique_lock ul(replica->lock); + if (!FCorrectThread(replica)) fAsyncWrite = true; - - /* If we are online and the RDB has been sent, there is no need to feed the client buffer - * We will send our replies directly from the replication backlog instead */ -#ifdef BYPASS_BUFFER - { - std::unique_lock asyncUl(replica->lock, std::defer_lock); - if (!FCorrectThread(replica)) - asyncUl.lock(); + /* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */ + serverAssert(replica->repl_curr_off != -1); - /* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */ - serverAssert(replica->repl_curr_off != -1); + replica->repl_end_off = g_pserver->master_repl_offset; - replica->repl_end_off = g_pserver->master_repl_offset; - - /* Only if the there isn't already a pending write do we prepare the client to write */ - if (!replica->fPendingReplicaWrite){ - serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset); - prepareClientToWrite(replica); - replica->fPendingReplicaWrite = true; - } - - continue; + /* Only if the there isn't already a pending write do we prepare the client to write */ + if (!replica->fPendingReplicaWrite){ + serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset); + prepareClientToWrite(replica); + replica->fPendingReplicaWrite = true; } -#endif - if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { - long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; - serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy); - serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy)); - serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size); - - addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); - } else { - auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; - if (fAsyncWrite) - _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); - addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); - addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); - serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart)); - } } if (fAsyncWrite) ProcessPendingAsyncWrites(); diff --git a/src/server.cpp b/src/server.cpp index 439e1aeff..362569bfa 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1796,7 +1796,6 @@ int clientsCronTrackClientsMemUsage(client *c) { mem += zmalloc_size(c); mem += c->argv_len_sum(); if (c->argv) mem += zmalloc_size(c->argv); - // serverLog(LL_NOTICE, "Mem here is : %lu", mem); /* Now that we have the memory used by the client, remove the old * value from the old category, and add it back. */ g_pserver->stat_clients_type_memory[c->client_cron_last_memory_type] -= @@ -1855,7 +1854,6 @@ void clientsCron(int iel) { while(listLength(g_pserver->clients) && iterations--) { client *c; listNode *head; - // serverLog(LL_NOTICE, "we are at iteration: %d", iterations); /* Rotate the list, take the current head, process. * This way if the client must be removed from the list it's the * first element and we don't incur into O(N) computation. */ diff --git a/src/server.h b/src/server.h index 64a2ca515..0fcd8f5ef 100644 --- a/src/server.h +++ b/src/server.h @@ -3540,7 +3540,6 @@ void tlsInit(void); void tlsInitThread(); int tlsConfigure(redisTLSContextConfig *ctx_config); -int prepareClientToWrite(client *c); class ShutdownException