More code cleanup

Former-commit-id: 8e9962b9b7b9093399451bf93d30e5b5d26e3d33
This commit is contained in:
VivekSainiEQ 2021-06-16 19:41:55 +00:00
parent 03709d475a
commit 4ac475ea20
4 changed files with 47 additions and 72 deletions

View File

@ -354,6 +354,8 @@ unsigned long LFUDecrAndReturn(robj_roptr o) {
return counter;
}
unsigned long getClientReplicationBacklogSharedUsage(client *c);
/* We don't want to count AOF buffers and slaves output buffers as
* used memory: the eviction should use mostly data size. This function
* returns the sum of AOF and slaves buffer. */

View File

@ -1765,15 +1765,7 @@ client *lookupClientByID(uint64_t id) {
return (c == raxNotFound) ? NULL : c;
}
/* Compute the corresponding index from a replication backlog offset
* by taking the distance between the input offset and the replication backlog offset
* and applying that to the replication backlog index, wrapping around if the index
* becomes negative.
* TODO: Rewrite comment for new logic */
long long getReplIndexFromOffset(long long offset){
long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size;
return index;
}
long long getReplIndexFromOffset(long long offset);
/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed because of some
@ -1832,35 +1824,31 @@ int writeToClient(client *c, int handler_installed) {
}
}
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a replica or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a replica or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(g_pserver->maxmemory == 0 ||
zmalloc_used_memory() < g_pserver->maxmemory) &&
zmalloc_used_memory() < g_pserver->maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
/* We can only directly read from the replication backlog if the client
is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
/* For replicas, we don't store all the information in the client buffer
* We always read from the replication backlog directly */
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
// serverLog(LL_NOTICE, "written to handler");
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1);
if (c->repl_curr_off != c->repl_end_off){
long long repl_curr_idx = getReplIndexFromOffset(c->repl_curr_off);
long long nwritten2ndStage = 0; /* How much was written from the start of the replication backlog
@ -1884,14 +1872,9 @@ int writeToClient(client *c, int handler_installed) {
totwritten += nwritten;
c->repl_curr_off += nwritten;
serverAssert(c->repl_curr_off <= c->repl_end_off);
/* If the client offset matches the global offset, we wrote all we needed to,
* in which case, there is no pending write */
/* If the client's current offset matches the last offset it can read from, there is no pending write */
if (c->repl_curr_off == c->repl_end_off){
// serverLog(LL_NOTICE, "Successfully wrote up until %lld", c->repl_end_off);
c->fPendingReplicaWrite = false;
} else {
// serverLog(LL_NOTICE, "Wrote to %lld out of %lld", c->repl_curr_off, c->repl_end_off);
}
}
@ -3719,8 +3702,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
}
}
/* In the case of a replica client, it is possible (and very likely)
* that writes to said replica are using data from the replication backlog
/* In the case of a replica client, writes to said replica are using data from the replication backlog
* as opposed to it's own internal buffer, this number should keep track of that */
unsigned long getClientReplicationBacklogSharedUsage(client *c) {
return (!(c->flags & CLIENT_SLAVE) || !c->fPendingReplicaWrite ) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;

View File

@ -47,8 +47,6 @@
#include <unordered_map>
#include <string>
#define BYPASS_BUFFER
void replicationDiscardCachedMaster(redisMaster *mi);
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
void replicationSendAck(redisMaster *mi);
@ -61,8 +59,6 @@ static void propagateMasterStaleKeys();
* the instance is configured to have no persistence. */
int RDBGeneratedByReplication = 0;
void resizeReplicationBacklogForClients(long long newsize);
/* --------------------------- Utility functions ---------------------------- */
/* Return the pointer to a string representing the replica ip:listening_port
@ -205,7 +201,14 @@ void createReplicationBacklog(void) {
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
long long getReplIndexFromOffset(long long offset);
/* Compute the corresponding index from a replication backlog offset
* Since this computation needs the size of the replication backlog,
* you need to have the repl_backlog_lock in order to call it */
long long getReplIndexFromOffset(long long offset){
serverAssert(g_pserver->repl_backlog_lock.fOwnLock());
long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size;
return index;
}
/* This function is called when the user modifies the replication backlog
* size at runtime. It is up to the function to both update the
@ -293,7 +296,7 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
if (g_pserver->repl_batch_idxStart >= 0) {
/* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */
/* We are lower bounded by the lowest replica offset, or the batch offset start if not applicable */
long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
if (lower_bound == -1)
lower_bound = g_pserver->repl_batch_offStart;
@ -306,9 +309,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
serverLog(LL_NOTICE, "minimumsize: %lld, g_pserver->master_repl_offset: %lld, len: %lu, lower_bound: %lld",
minimumsize, g_pserver->master_repl_offset, len, lower_bound);
if (minimumsize > g_pserver->repl_backlog_size) {
// This is an emergency overflow, we better resize to fit
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
@ -635,9 +635,7 @@ void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) {
printf("\n");
}
if (g_pserver->repl_backlog){
feedReplicationBacklog(buf,buflen);
}
if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen);
}
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
@ -689,13 +687,12 @@ int prepareClientToWrite(client *c);
/* Feed the replica 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */
long long addReplyReplicationBacklog(client *c, long long offset) {
long long j, skip, len;
long long skip, len;
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
if (g_pserver->repl_backlog_histlen == 0) {
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
serverLog(LL_NOTICE, "REOAD TO RESIST");
c->repl_curr_off = g_pserver->master_repl_offset;
c->repl_end_off = g_pserver->master_repl_offset;
return 0;
@ -714,30 +711,20 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
skip = offset - g_pserver->repl_backlog_off;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
/* Point j to the oldest byte, that is actually our
* g_pserver->repl_backlog_off byte. */
j = (g_pserver->repl_backlog_idx +
(g_pserver->repl_backlog_size-g_pserver->repl_backlog_histlen)) %
g_pserver->repl_backlog_size;
serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
/* Discard the amount of data to seek to the specified 'offset'. */
j = (j + skip) % g_pserver->repl_backlog_size;
/* Feed replica with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross-boundary. */
len = g_pserver->repl_backlog_histlen - skip;
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
/* Set the start and end offsets for the replica so that a future
* writeToClient will send the backlog from the given offset to
* the current end of the backlog to said replica */
c->repl_curr_off = offset - 1;
// serverLog(LL_NOTICE, "Client %s, replica offset %lld in psync", replicationGetSlaveName(c), c->repl_curr_off);
c->repl_end_off = g_pserver->master_repl_offset;
/* Force the partial sync to be queued */
prepareClientToWrite(c);
c->fPendingReplicaWrite = true;
return g_pserver->repl_backlog_histlen - skip;
return len;
}
/* Return the offset to provide as reply to the PSYNC command received
@ -4963,14 +4950,18 @@ void flushReplBacklogToClients()
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
/* We don't actually write any data in this function since we send data
* directly from the replication backlog to replicas in writeToClient.
*
* What we do however, is set the end offset of each replica here. This way,
* future calls to writeToClient will know up to where in the replication
* backlog is valid for writing. */
while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
if (!canFeedReplicaReplBuffer(replica)) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
// serverLog(LL_NOTICE, "Client %s, replica offset %lld", replicationGetSlaveName(replica), replica->repl_curr_off);
std::unique_lock<fastlock> ul(replica->lock);
if (!FCorrectThread(replica))
fAsyncWrite = true;

View File

@ -1590,9 +1590,11 @@ struct client {
copying this replica output buffer
should use. */
long long repl_curr_off = -1; /* Replication offset of the client, only if it's a replica*/
long long repl_end_off = -1; /* Replication offset to write to */
int fPendingReplicaWrite;
long long repl_curr_off = -1;/* Replication offset of the replica, also where in the backlog we need to start from
* when sending data to this replica. */
long long repl_end_off = -1; /* Replication offset to write to, stored in the replica, as opposed to using the global offset
* to prevent needing the global lock */
int fPendingReplicaWrite; /* Is there a write queued for this replica? */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
@ -2375,8 +2377,8 @@ struct redisServer {
int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
std::atomic <long long> repl_lowest_off; /* The lowest offset amongst all clients
Updated before calls to feed the replication backlog */
std::atomic <long long> repl_lowest_off; /* The lowest offset amongst all replicas
-1 if there are no replicas */
/* Replication (replica) */
list *masters;
int enable_multimaster;
@ -2825,7 +2827,6 @@ sds getAllClientsInfoString(int type);
void rewriteClientCommandVector(client *c, int argc, ...);
void rewriteClientCommandArgument(client *c, int i, robj *newval);
void replaceClientCommandVector(client *c, int argc, robj **argv);
unsigned long getClientReplicationBacklogSharedUsage(client *c);
unsigned long getClientOutputBufferMemoryUsage(client *c);
int freeClientsInAsyncFreeQueue(int iel);
void asyncCloseClientOnOutputBufferLimitReached(client *c);
@ -3017,7 +3018,6 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire);
void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire);
void updateLowestOffsetAmongReplicas(void);
void clearFailoverState(void);
void updateFailoverStatus(void);
void abortFailover(redisMaster *mi, const char *err);