diff --git a/src/evict.cpp b/src/evict.cpp index ba426f0ee..d336bc8b8 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -354,6 +354,8 @@ unsigned long LFUDecrAndReturn(robj_roptr o) { return counter; } +unsigned long getClientReplicationBacklogSharedUsage(client *c); + /* We don't want to count AOF buffers and slaves output buffers as * used memory: the eviction should use mostly data size. This function * returns the sum of AOF and slaves buffer. */ diff --git a/src/networking.cpp b/src/networking.cpp index 07312a9ee..767fe9c2b 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1765,15 +1765,7 @@ client *lookupClientByID(uint64_t id) { return (c == raxNotFound) ? NULL : c; } -/* Compute the corresponding index from a replication backlog offset - * by taking the distance between the input offset and the replication backlog offset - * and applying that to the replication backlog index, wrapping around if the index - * becomes negative. - * TODO: Rewrite comment for new logic */ -long long getReplIndexFromOffset(long long offset){ - long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size; - return index; -} +long long getReplIndexFromOffset(long long offset); /* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed because of some @@ -1832,35 +1824,31 @@ int writeToClient(client *c, int handler_installed) { } } /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT - * bytes, in a single threaded server it's a good idea to serve - * other clients as well, even if a very large request comes from - * super fast link that is always able to accept data (in real world - * scenario think about 'KEYS *' against the loopback interface). - * - * However if we are over the maxmemory limit we ignore that and - * just deliver as much data as it is possible to deliver. - * - * Moreover, we also send as much as possible if the client is - * a replica or a monitor (otherwise, on high-speed traffic, the - * replication/output buffer will grow indefinitely) */ + * bytes, in a single threaded server it's a good idea to serve + * other clients as well, even if a very large request comes from + * super fast link that is always able to accept data (in real world + * scenario think about 'KEYS *' against the loopback interface). + * + * However if we are over the maxmemory limit we ignore that and + * just deliver as much data as it is possible to deliver. + * + * Moreover, we also send as much as possible if the client is + * a replica or a monitor (otherwise, on high-speed traffic, the + * replication/output buffer will grow indefinitely) */ if (totwritten > NET_MAX_WRITES_PER_EVENT && (g_pserver->maxmemory == 0 || - zmalloc_used_memory() < g_pserver->maxmemory) && + zmalloc_used_memory() < g_pserver->maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } /* We can only directly read from the replication backlog if the client is a replica, so only attempt to do so if that's the case. */ if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { - /* For replicas, we don't store all the information in the client buffer - * We always read from the replication backlog directly */ + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); - - // serverLog(LL_NOTICE, "written to handler"); - long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); - serverAssert(c->repl_curr_off != -1); + if (c->repl_curr_off != c->repl_end_off){ long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off); long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog @@ -1884,14 +1872,9 @@ int writeToClient(client *c, int handler_installed) { totwritten += nwritten; c->repl_curr_off += nwritten; serverAssert(c->repl_curr_off <= c->repl_end_off); - /* If the client offset matches the global offset, we wrote all we needed to, - * in which case, there is no pending write */ - + /* If the client's current offset matches the last offset it can read from, there is no pending write */ if (c->repl_curr_off == c->repl_end_off){ - // serverLog(LL_NOTICE, "Successfully wrote up until %lld", c->repl_end_off); c->fPendingReplicaWrite = false; - } else { - // serverLog(LL_NOTICE, "Wrote to %lld out of %lld", c->repl_curr_off, c->repl_end_off); } } @@ -3719,8 +3702,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } } -/* In the case of a replica client, it is possible (and very likely) - * that writes to said replica are using data from the replication backlog +/* In the case of a replica client, writes to said replica are using data from the replication backlog * as opposed to it's own internal buffer, this number should keep track of that */ unsigned long getClientReplicationBacklogSharedUsage(client *c) { return (!(c->flags & CLIENT_SLAVE) || !c->fPendingReplicaWrite ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; diff --git a/src/replication.cpp b/src/replication.cpp index cb0b562b1..b9465680e 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -47,8 +47,6 @@ #include #include -#define BYPASS_BUFFER - void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); void replicationSendAck(redisMaster *mi); @@ -61,8 +59,6 @@ static void propagateMasterStaleKeys(); * the instance is configured to have no persistence. */ int RDBGeneratedByReplication = 0; -void resizeReplicationBacklogForClients(long long newsize); - /* --------------------------- Utility functions ---------------------------- */ /* Return the pointer to a string representing the replica ip:listening_port @@ -205,7 +201,14 @@ void createReplicationBacklog(void) { g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } -long long getReplIndexFromOffset(long long offset); +/* Compute the corresponding index from a replication backlog offset + * Since this computation needs the size of the replication backlog, + * you need to have the repl_backlog_lock in order to call it */ +long long getReplIndexFromOffset(long long offset){ + serverAssert(g_pserver->repl_backlog_lock.fOwnLock()); + long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size; + return index; +} /* This function is called when the user modifies the replication backlog * size at runtime. It is up to the function to both update the @@ -293,7 +296,7 @@ void feedReplicationBacklog(const void *ptr, size_t len) { if (g_pserver->repl_batch_idxStart >= 0) { - /* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */ + /* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */ long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); if (lower_bound == -1) lower_bound = g_pserver->repl_batch_offStart; @@ -306,9 +309,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) { minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; - serverLog(LL_NOTICE, "minimumsize: %lld, g_pserver->master_repl_offset: %lld, len: %lu, lower_bound: %lld", - minimumsize, g_pserver->master_repl_offset, len, lower_bound); - if (minimumsize > g_pserver->repl_backlog_size) { // This is an emergency overflow, we better resize to fit long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); @@ -635,9 +635,7 @@ void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) { printf("\n"); } - if (g_pserver->repl_backlog){ - feedReplicationBacklog(buf,buflen); - } + if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen); } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { @@ -689,13 +687,12 @@ int prepareClientToWrite(client *c); /* Feed the replica 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { - long long j, skip, len; + long long skip, len; serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); if (g_pserver->repl_backlog_histlen == 0) { serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); - serverLog(LL_NOTICE, "REOAD TO RESIST"); c->repl_curr_off = g_pserver->master_repl_offset; c->repl_end_off = g_pserver->master_repl_offset; return 0; @@ -714,30 +711,20 @@ long long addReplyReplicationBacklog(client *c, long long offset) { skip = offset - g_pserver->repl_backlog_off; serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - /* Point j to the oldest byte, that is actually our - * g_pserver->repl_backlog_off byte. */ - j = (g_pserver->repl_backlog_idx + - (g_pserver->repl_backlog_size-g_pserver->repl_backlog_histlen)) % - g_pserver->repl_backlog_size; - serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j); - - /* Discard the amount of data to seek to the specified 'offset'. */ - j = (j + skip) % g_pserver->repl_backlog_size; - - /* Feed replica with data. Since it is a circular buffer we have to - * split the reply in two parts if we are cross-boundary. */ len = g_pserver->repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); + /* Set the start and end offsets for the replica so that a future + * writeToClient will send the backlog from the given offset to + * the current end of the backlog to said replica */ c->repl_curr_off = offset - 1; - // serverLog(LL_NOTICE, "Client %s, replica offset %lld in psync", replicationGetSlaveName(c), c->repl_curr_off); c->repl_end_off = g_pserver->master_repl_offset; /* Force the partial sync to be queued */ prepareClientToWrite(c); c->fPendingReplicaWrite = true; - return g_pserver->repl_backlog_histlen - skip; + return len; } /* Return the offset to provide as reply to the PSYNC command received @@ -4963,14 +4950,18 @@ void flushReplBacklogToClients() listIter li; listNode *ln; listRewind(g_pserver->slaves, &li); + /* We don't actually write any data in this function since we send data + * directly from the replication backlog to replicas in writeToClient. + * + * What we do however, is set the end offset of each replica here. This way, + * future calls to writeToClient will know up to where in the replication + * backlog is valid for writing. */ while ((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); if (!canFeedReplicaReplBuffer(replica)) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; - // serverLog(LL_NOTICE, "Client %s, replica offset %lld", replicationGetSlaveName(replica), replica->repl_curr_off); - std::unique_lock ul(replica->lock); if (!FCorrectThread(replica)) fAsyncWrite = true; diff --git a/src/server.h b/src/server.h index cb3973969..0d6f766ce 100644 --- a/src/server.h +++ b/src/server.h @@ -1590,9 +1590,11 @@ struct client { copying this replica output buffer should use. */ - long long repl_curr_off = -1; /* Replication offset of the client, only if it's a replica*/ - long long repl_end_off = -1; /* Replication offset to write to */ - int fPendingReplicaWrite; + long long repl_curr_off = -1;/* Replication offset of the replica, also where in the backlog we need to start from + * when sending data to this replica. */ + long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset + * to prevent needing the global lock */ + int fPendingReplicaWrite; /* Is there a write queued for this replica? */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ @@ -2375,8 +2377,8 @@ struct redisServer { int repl_diskless_load; /* Slave parse RDB directly from the socket. * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ - std::atomic repl_lowest_off; /* The lowest offset amongst all clients - Updated before calls to feed the replication backlog */ + std::atomic repl_lowest_off; /* The lowest offset amongst all replicas + -1 if there are no replicas */ /* Replication (replica) */ list *masters; int enable_multimaster; @@ -2825,7 +2827,6 @@ sds getAllClientsInfoString(int type); void rewriteClientCommandVector(client *c, int argc, ...); void rewriteClientCommandArgument(client *c, int i, robj *newval); void replaceClientCommandVector(client *c, int argc, robj **argv); -unsigned long getClientReplicationBacklogSharedUsage(client *c); unsigned long getClientOutputBufferMemoryUsage(client *c); int freeClientsInAsyncFreeQueue(int iel); void asyncCloseClientOnOutputBufferLimitReached(client *c); @@ -3017,7 +3018,6 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, void rdbPipeWriteHandlerConnRemoved(struct connection *conn); void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire); void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire); -void updateLowestOffsetAmongReplicas(void); void clearFailoverState(void); void updateFailoverStatus(void); void abortFailover(redisMaster *mi, const char *err);