Updated resize logic

Former-commit-id: e6d892ef21b7fc6f51433f32b01198038e555419
This commit is contained in:
VivekSainiEQ 2021-04-29 17:01:06 +00:00
parent 5dfac7172f
commit 089cbfa580
3 changed files with 151 additions and 79 deletions

View File

@ -1680,22 +1680,8 @@ int writeToClient(client *c, int handler_installed) {
/* if this is a write to a replica, it's coming straight from the replication backlog */ /* if this is a write to a replica, it's coming straight from the replication backlog */
long long repl_backlog_idx = g_pserver->repl_backlog_idx; long long repl_backlog_idx = g_pserver->repl_backlog_idx;
bool wroteFromClientBuffer = false; /* True if you wrote from the client buffer in this function call */
while(clientHasPendingReplies(c)) { while(clientHasPendingReplies(c)) {
wroteFromClientBuffer = true;
if (c->flags & CLIENT_SLAVE && listLength(c->reply) % 10 == 0){
serverLog(LL_NOTICE, "-----------------------------------------");
serverLog(LL_NOTICE, "replica w/ pending replies, with a reply list size of: %lu", listLength(c->reply));
serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
serverLog(LL_NOTICE, "-----------------------------------------");
}
if (c->bufpos > 0) { if (c->bufpos > 0) {
// serverLog(LL_NOTICE, "Sending reply %d", x);
// serverLog(LL_NOTICE, "SUSSUS AMOGUS, %ld", c->bufpos);
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break; if (nwritten <= 0) break;
c->sentlen += nwritten; c->sentlen += nwritten;
@ -1753,9 +1739,7 @@ int writeToClient(client *c, int handler_installed) {
/* If there are no more pending replies, then we have transmitted the RDB. /* If there are no more pending replies, then we have transmitted the RDB.
* This means further replication commands will be taken straight from the * This means further replication commands will be taken straight from the
* replication backlog from now on. */ * replication backlog from now on. */
if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c)){ if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){
if (!c->transmittedRDB)
serverLog(LL_NOTICE, "---------->>>>>>>> TRANSMISSION OF THE RDB HAS COMPLETED <<<<<<<<----------");
c->transmittedRDB = true; c->transmittedRDB = true;
} }
@ -1775,49 +1759,27 @@ int writeToClient(client *c, int handler_installed) {
/* wrap around case, v. rare */ /* wrap around case, v. rare */
/* also v. buggy so there's that */ /* also v. buggy so there's that */
} else { } else {
serverLog(LL_NOTICE, "WRAP CASE");
serverLog(LL_NOTICE, "-----------------------------------------");
serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
serverLog(LL_NOTICE, "actually written: %ld", nwritten);
serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen);
serverLog(LL_NOTICE, "nwritten: %ld", nwritten);
serverLog(LL_NOTICE, "-----------------------------------------");
nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx; nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx;
nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx); nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */ /* only attempt wrapping if we write the correct number of bytes */
if (nwritten == repl_backlog_size - c->repl_curr_idx){ if (nwritten == repl_backlog_size - c->repl_curr_idx){
serverLog(LL_NOTICE, "SECOND STAGE");
serverLog(LL_NOTICE, "-----------------------------------------");
serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
serverLog(LL_NOTICE, "actually written: %ld", nwritten);
serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen);
serverLog(LL_NOTICE, "-----------------------------------------");
long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx); long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx);
if (nwrittenPart2 != -1) if (nwrittenPart2 != -1)
nwritten += nwrittenPart2; nwritten += nwrittenPart2;
serverLog(LL_NOTICE, "nwrittenPart2: %lld", nwrittenPart2); }
serverLog(LL_NOTICE, "-----------------------------------------");
} else {
serverLog(LL_NOTICE, "SUPER SHORT");
}
} }
/* only update the replica's current index if bytes were sent */ /* only update the replica's current index if bytes were sent */
// if (nrequested != nwritten){ // if (nrequested != nwritten){
serverLog(LL_NOTICE, "-----------------------------------------"); // serverLog(LL_NOTICE, "-----------------------------------------");
serverLog(LL_NOTICE, "AFTER THE FACT"); // serverLog(LL_NOTICE, "AFTER THE FACT");
serverLog(LL_NOTICE, "requested to write: %ld", nrequested); // serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
serverLog(LL_NOTICE, "actually written: %ld", nwritten); // serverLog(LL_NOTICE, "actually written: %ld", nwritten);
serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); // serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); // serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
serverLog(LL_NOTICE, "-----------------------------------------"); // serverLog(LL_NOTICE, "-----------------------------------------");
// } // }
@ -1902,25 +1864,36 @@ void ProcessPendingAsyncWrites()
serverAssert(c->fPendingAsyncWrite); serverAssert(c->fPendingAsyncWrite);
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
{ {
zfree(c->replyAsync); if (c->replyAsync != nullptr){
c->replyAsync = nullptr; zfree(c->replyAsync);
c->replyAsync = nullptr;
}
c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWrite = FALSE;
continue; continue;
} }
int size = c->replyAsync->used; /* since writes from master to replica can come directly from the replication backlog,
* writes may have been signalled without having been copied to the replyAsync buffer,
* thus causing the buffer to be NULL */
if (c->replyAsync != nullptr){
int size = c->replyAsync->used;
if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) { if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) {
memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size); memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size);
c->bufpos += size; c->bufpos += size;
} else { } else {
c->reply_bytes += c->replyAsync->size; c->reply_bytes += c->replyAsync->size;
listAddNodeTail(c->reply, c->replyAsync); listAddNodeTail(c->reply, c->replyAsync);
c->replyAsync = nullptr;
}
zfree(c->replyAsync);
c->replyAsync = nullptr; c->replyAsync = nullptr;
} else {
/* Only replicas should have empty async reply buffers */
serverAssert(c->flags & CLIENT_SLAVE);
} }
zfree(c->replyAsync);
c->replyAsync = nullptr;
c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWrite = FALSE;
// Now install the write event handler // Now install the write event handler
@ -1935,17 +1908,17 @@ void ProcessPendingAsyncWrites()
{ {
ae_flags |= AE_BARRIER; ae_flags |= AE_BARRIER;
} }
if (!((c->replstate == REPL_STATE_NONE || if (!((c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))))
continue; continue;
asyncCloseClientOnOutputBufferLimitReached(c); asyncCloseClientOnOutputBufferLimitReached(c);
if (c->flags & CLIENT_CLOSE_ASAP) if (c->flags & CLIENT_CLOSE_ASAP)
continue; // we will never write this so don't post an op continue; // we will never write this so don't post an op
std::atomic_thread_fence(std::memory_order_seq_cst); std::atomic_thread_fence(std::memory_order_seq_cst);
if (FCorrectThread(c)) if (FCorrectThread(c))
{ {
prepareClientToWrite(c); // queue an event prepareClientToWrite(c); // queue an event
@ -3386,7 +3359,12 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* that writes to said replica are using data from the replication backlog * that writes to said replica are using data from the replication backlog
* as opposed to it's own internal buffer, this number should keep track of that */ * as opposed to it's own internal buffer, this number should keep track of that */
unsigned long getClientReplicationBacklogSharedUsage(client *c) { unsigned long getClientReplicationBacklogSharedUsage(client *c) {
return (c->repl_curr_idx == -1 && c->flags & CLIENT_SLAVE) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1){
// serverLog(LL_NOTICE, "repl_backlog_size %lld, repl_backlog_idx %lld, master_repl_offset %lld, repl_curr_idx %lld, repl_curr_off %lld",
// g_pserver->repl_backlog_size, g_pserver->repl_backlog_idx, g_pserver->master_repl_offset, c->repl_curr_idx, c->repl_curr_off);
}
return (!(c->flags & CLIENT_SLAVE) || c->repl_curr_idx == -1) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
} }
/* This function returns the number of bytes that Redis is /* This function returns the number of bytes that Redis is

View File

@ -47,8 +47,8 @@
#include <unordered_map> #include <unordered_map>
#include <string> #include <string>
#define BYPASS_BUFFER // #define BYPASS_BUFFER
// #define BYPASS_PSYNC // #define RESIZE_BACKLOG
void replicationDiscardCachedMaster(redisMaster *mi); void replicationDiscardCachedMaster(redisMaster *mi);
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
@ -57,6 +57,30 @@ void putSlaveOnline(client *replica);
int cancelReplicationHandshake(redisMaster *mi); int cancelReplicationHandshake(redisMaster *mi);
static void propagateMasterStaleKeys(); static void propagateMasterStaleKeys();
/* gets the lowest offset amongst all of the replicas */
long long getLowestOffsetAmongReplicas(){
serverAssert(GlobalLocksAcquired());
long long min_offset = LONG_LONG_MAX;
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
// check for potential overflow first
while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
if (replica->repl_curr_idx == -1) continue;
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
if (FCorrectThread(replica))
ul.lock();
min_offset = std::min(min_offset, replica->repl_curr_off);
}
/* return -1 if no other minimum was found */
return min_offset == LONG_LONG_MAX ? -1 : min_offset;
}
/* We take a global flag to remember if this instance generated an RDB /* We take a global flag to remember if this instance generated an RDB
* because of replication, so that we can remove the RDB file in case * because of replication, so that we can remove the RDB file in case
* the instance is configured to have no persistence. */ * the instance is configured to have no persistence. */
@ -67,11 +91,13 @@ void resizeReplicationBacklogForClients(long long newsize);
void setReplIdx(client *c, long long idx, long long off){ void setReplIdx(client *c, long long idx, long long off){
if (prepareClientToWrite(c) != C_OK) return; if (prepareClientToWrite(c) != C_OK) return;
// serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx); // serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx);
// serverLog(LL_NOTICE, "What is this value? %lld", c->repl_curr_idx); // serverLog(LL_NOTICE, "Repl Index started at: %lld", c->repl_curr_idx);
if (c->repl_curr_idx == -1){ if (c->repl_curr_idx == -1){
c->repl_curr_idx = idx; c->repl_curr_idx = idx;
c->repl_curr_off = off; c->repl_curr_off = off;
} }
// serverLog(LL_NOTICE, "Repl Index has become: %lld", c->repl_curr_idx);
} }
/* --------------------------- Utility functions ---------------------------- */ /* --------------------------- Utility functions ---------------------------- */
@ -277,7 +303,7 @@ void resizeReplicationBacklogForClients(long long newsize) {
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
if (g_pserver->repl_backlog_size == newsize) return; if (g_pserver->repl_backlog_size == newsize) return;
serverLog(LL_NOTICE, "WE HAD TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize); serverLog(LL_NOTICE, "WE HAVE TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize);
/* get the critical client size, i.e. the size of the data unflushed to clients */ /* get the critical client size, i.e. the size of the data unflushed to clients */
long long earliest_off = LONG_LONG_MAX; long long earliest_off = LONG_LONG_MAX;
long long earliest_idx = -1; long long earliest_idx = -1;
@ -290,6 +316,20 @@ void resizeReplicationBacklogForClients(long long newsize) {
earliest_off = replica->repl_curr_off; earliest_off = replica->repl_curr_off;
earliest_idx = replica->repl_curr_idx; earliest_idx = replica->repl_curr_idx;
} }
serverLog(LL_NOTICE, "repl_curr_idx: %lld, earlistidx: %lld", replica->repl_curr_idx, earliest_idx);
}
serverLog(LL_NOTICE, "We are starting with: master_repl_offset: %lld, repl_batch_offStart: %lld, earliest_off: %lld, "
"repl_backlog_idx: %lld, repl_batch_idxStart: %lld, earliest_idx: %lld, repl_backlog_size: %lld",
g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, earliest_off,
g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, earliest_idx, g_pserver->repl_backlog_size
);
long long new_off = 0, new_idx = 0;
/* if no earliest offset is found amongst the clients, they are all up to date with the flushed index */
if (earliest_off == LONG_LONG_MAX && earliest_idx == -1){
earliest_idx = g_pserver->repl_batch_idxStart;
earliest_off = g_pserver->repl_batch_offStart;
} }
if (g_pserver->repl_backlog != NULL) { if (g_pserver->repl_backlog != NULL) {
@ -330,8 +370,11 @@ void resizeReplicationBacklogForClients(long long newsize) {
if (replica->repl_curr_idx < 0) if (replica->repl_curr_idx < 0)
replica->repl_curr_idx += g_pserver->repl_backlog_size; replica->repl_curr_idx += g_pserver->repl_backlog_size;
} }
new_idx = replica->repl_curr_idx;
} }
g_pserver->repl_batch_idxStart = 0; g_pserver->repl_batch_idxStart -= earliest_idx;
if (g_pserver->repl_batch_idxStart < 0)
g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size;
} else { } else {
zfree(g_pserver->repl_backlog); zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = (char*)zmalloc(newsize); g_pserver->repl_backlog = (char*)zmalloc(newsize);
@ -342,6 +385,12 @@ void resizeReplicationBacklogForClients(long long newsize) {
} }
} }
g_pserver->repl_backlog_size = newsize; g_pserver->repl_backlog_size = newsize;
serverLog(LL_NOTICE, "We are ending with: master_repl_offset: %lld, repl_batch_offStart: %lld, new_off: %lld, "
"repl_backlog_idx: %lld, repl_batch_idxStart: %lld, new_idx: %lld, repl_backlog_size: %lld",
g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, new_off,
g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, new_idx, g_pserver->repl_backlog_size
);
} }
void freeReplicationBacklog(void) { void freeReplicationBacklog(void) {
@ -367,20 +416,41 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
const unsigned char *p = (const unsigned char*)ptr; const unsigned char *p = (const unsigned char*)ptr;
if (g_pserver->repl_batch_idxStart >= 0) { if (g_pserver->repl_batch_idxStart >= 0) {
long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; /* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */
long long lower_bound = getLowestOffsetAmongReplicas();
if (lower_bound == -1)
lower_bound = g_pserver->repl_batch_offStart;
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
if (minimumsize > g_pserver->repl_backlog_size) { if (minimumsize > g_pserver->repl_backlog_size) {
flushReplBacklogToClients(); flushReplBacklogToClients();
minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; minimumsize = g_pserver->master_repl_offset + len - lower_bound +1;
if (minimumsize > g_pserver->repl_backlog_size) { if (minimumsize > g_pserver->repl_backlog_size) {
// This is an emergency overflow, we better resize to fit // This is an emergency overflow, we better resize to fit
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize);
resizeReplicationBacklog(newsize); resizeReplicationBacklogForClients(newsize);
} }
} }
#ifdef RESIZE_BACKLOG
long long lowest_replica_offset = getLowestOffsetAmongReplicas();
minimumsize = g_pserver->master_repl_offset + len - lowest_replica_offset;
if (lowest_replica_offset != -1 && minimumsize > g_pserver->repl_backlog_size){
serverLog(LL_WARNING, "THE REPLICATION BACKLOG SIZE IS TOO SMALL, THIS IS A PROBLEM");
long long oldsize = g_pserver->repl_backlog_size;
resizeReplicationBacklogForClients(std::max(g_pserver->repl_backlog_size * 2, minimumsize));
serverLog(LL_WARNING, "changed size from %lld to %lld", oldsize, g_pserver->repl_backlog_size);
flushReplBacklogToClients();
}
#endif
} }
// serverLog(LL_NOTICE, "Pt2 start with: master_repl_offset: %lld, repl_batch_offStart: %lld, "
// "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld",
// g_pserver->master_repl_offset, g_pserver->repl_batch_offStart,
// g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size
// );
g_pserver->master_repl_offset += len; g_pserver->master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every /* This is a circular buffer, so write as much data we can at every
@ -395,12 +465,23 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
len -= thislen; len -= thislen;
p += thislen; p += thislen;
g_pserver->repl_backlog_histlen += thislen; g_pserver->repl_backlog_histlen += thislen;
// serverLog(LL_NOTICE, "Pt2 intermediate with: master_repl_offset: %lld, repl_batch_offStart: %lld, "
// "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld",
// g_pserver->master_repl_offset, g_pserver->repl_batch_offStart,
// g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size
// );
} }
if (g_pserver->repl_backlog_histlen > g_pserver->repl_backlog_size) if (g_pserver->repl_backlog_histlen > g_pserver->repl_backlog_size)
g_pserver->repl_backlog_histlen = g_pserver->repl_backlog_size; g_pserver->repl_backlog_histlen = g_pserver->repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */ /* Set the offset of the first byte we have in the backlog. */
g_pserver->repl_backlog_off = g_pserver->master_repl_offset - g_pserver->repl_backlog_off = g_pserver->master_repl_offset -
g_pserver->repl_backlog_histlen + 1; g_pserver->repl_backlog_histlen + 1;
// serverLog(LL_NOTICE, "Pt2 end with: master_repl_offset: %lld, repl_batch_offStart: %lld, "
// "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld",
// g_pserver->master_repl_offset, g_pserver->repl_batch_offStart,
// g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size
// );
} }
/* Wrapper for feedReplicationBacklog() that takes Redis string objects /* Wrapper for feedReplicationBacklog() that takes Redis string objects
@ -774,7 +855,6 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
* split the reply in two parts if we are cross-boundary. */ * split the reply in two parts if we are cross-boundary. */
len = g_pserver->repl_backlog_histlen - skip; len = g_pserver->repl_backlog_histlen - skip;
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
serverLog(LL_NOTICE, "Coming through from the addReplicationBacklog");
#ifdef BYPASS_PSYNC #ifdef BYPASS_PSYNC
setReplIdx(c, j, offset); setReplIdx(c, j, offset);
#else #else
@ -789,7 +869,6 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
j = 0; j = 0;
} }
#endif #endif
serverLog(LL_NOTICE, "rdb transmitted? %d, pending replies? %d", c->transmittedRDB, clientHasPendingReplies(c));
return g_pserver->repl_backlog_histlen - skip; return g_pserver->repl_backlog_histlen - skip;
} }
@ -1520,13 +1599,11 @@ void sendBulkToSlave(connection *conn) {
replica->repldboff += nwritten; replica->repldboff += nwritten;
g_pserver->stat_net_output_bytes += nwritten; g_pserver->stat_net_output_bytes += nwritten;
// replica->repl_curr_idx = g_pserver->repl_backlog_idx;
if (replica->repldboff == replica->repldbsize) { if (replica->repldboff == replica->repldbsize) {
close(replica->repldbfd); close(replica->repldbfd);
replica->repldbfd = -1; replica->repldbfd = -1;
connSetWriteHandler(replica->conn,NULL); connSetWriteHandler(replica->conn,NULL);
putSlaveOnline(replica); putSlaveOnline(replica);
serverLog(LL_NOTICE, "ABOUT TO DIE HERE");
} }
} }
@ -4560,6 +4637,7 @@ void _clientAsyncReplyBufferReserve(client *c, size_t len);
/* Has the end of the replication backlog overflowed past the beginning? */ /* Has the end of the replication backlog overflowed past the beginning? */
bool replOverflowHasOccured(){ bool replOverflowHasOccured(){
if (g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart){ if (g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart){
long long repl_idx_difference = g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart ? long long repl_idx_difference = g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart ?
g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart : g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart :
@ -4575,8 +4653,13 @@ thread_local int transmittedCount = 0;
void flushReplBacklogToClients() void flushReplBacklogToClients()
{ {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
if (g_pserver->repl_batch_offStart < 0) if (g_pserver->repl_batch_offStart < 0){
if (getLowestOffsetAmongReplicas() == -1){
serverLog(LL_NOTICE, "this is a case i probably have to handle");
}
return; return;
}
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
bool fAsyncWrite = false; bool fAsyncWrite = false;
@ -4585,7 +4668,7 @@ void flushReplBacklogToClients()
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
serverAssert(!replOverflowHasOccured()); // serverAssert(!replOverflowHasOccured());
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(g_pserver->slaves, &li); listRewind(g_pserver->slaves, &li);
@ -4605,11 +4688,21 @@ void flushReplBacklogToClients()
ul.lock(); ul.lock();
else else
fAsyncWrite = true; fAsyncWrite = true;
if (g_pserver->master_repl_offset - replica->repl_curr_off > g_pserver->repl_backlog_size){
serverLog(LL_WARNING, "THE REPLICATION BACKLOG SIZE IS TOO SMALL, THIS IS A PROBLEM");
long long oldsize = g_pserver->repl_backlog_size;
resizeReplicationBacklogForClients(std::max(g_pserver->repl_backlog_size * 2, g_pserver->master_repl_offset - replica->repl_curr_off));
serverLog(LL_WARNING, "changing size from %lld to %lld", oldsize, g_pserver->repl_backlog_size);
}
}
listRewind(g_pserver->slaves, &li);
#endif #endif
while ((ln = listNext(&li))) { while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln); client *replica = (client*)listNodeValue(ln);
// serverLog(LL_NOTICE, "replica state: %d", replica->replstate);
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue;

View File

@ -1796,6 +1796,7 @@ int clientsCronTrackClientsMemUsage(client *c) {
mem += zmalloc_size(c); mem += zmalloc_size(c);
mem += c->argv_len_sum(); mem += c->argv_len_sum();
if (c->argv) mem += zmalloc_size(c->argv); if (c->argv) mem += zmalloc_size(c->argv);
// serverLog(LL_NOTICE, "Mem here is : %lu", mem);
/* Now that we have the memory used by the client, remove the old /* Now that we have the memory used by the client, remove the old
* value from the old category, and add it back. */ * value from the old category, and add it back. */
g_pserver->stat_clients_type_memory[c->client_cron_last_memory_type] -= g_pserver->stat_clients_type_memory[c->client_cron_last_memory_type] -=
@ -1854,7 +1855,7 @@ void clientsCron(int iel) {
while(listLength(g_pserver->clients) && iterations--) { while(listLength(g_pserver->clients) && iterations--) {
client *c; client *c;
listNode *head; listNode *head;
// serverLog(LL_NOTICE, "we are at iteration: %d", iterations);
/* Rotate the list, take the current head, process. /* Rotate the list, take the current head, process.
* This way if the client must be removed from the list it's the * This way if the client must be removed from the list it's the
* first element and we don't incur into O(N) computation. */ * first element and we don't incur into O(N) computation. */