Primitive implementation of bypassing client buffer, stats are all messed up and print statements everywhere
Former-commit-id: 8ae310fb0f7b53add826f76891da333b63860001
This commit is contained in:
parent
7f68765981
commit
05fe41b33a
@ -224,6 +224,7 @@ void clientInstallWriteHandler(client *c) {
|
||||
(c->replstate == REPL_STATE_NONE ||
|
||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||
{
|
||||
// serverLog(LL_NOTICE, "we installing boyz");
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
/* Here instead of installing the write handler, we just flag the
|
||||
@ -301,7 +302,7 @@ int prepareClientToWrite(client *c) {
|
||||
|
||||
/* Schedule the client to write the output buffers to the socket, unless
|
||||
* it should already be setup to do so (it has already pending data). */
|
||||
if (!fAsync && !clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
||||
if (!fAsync && !clientHasPendingReplies(c) && c->repl_curr_idx == -1) clientInstallWriteHandler(c);
|
||||
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
|
||||
|
||||
/* Authorize the caller to queue in the output buffer of this client. */
|
||||
@ -1676,15 +1677,33 @@ int writeToClient(client *c, int handler_installed) {
|
||||
|
||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||
|
||||
/* if this is a write to a replica, it's coming straight from the replication backlog */
|
||||
long long repl_backlog_idx = g_pserver->repl_backlog_idx;
|
||||
|
||||
bool wroteFromClientBuffer = false; /* True if you wrote from the client buffer in this function call */
|
||||
|
||||
while(clientHasPendingReplies(c)) {
|
||||
wroteFromClientBuffer = true;
|
||||
if (c->flags & CLIENT_SLAVE && listLength(c->reply) % 10 == 0){
|
||||
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
serverLog(LL_NOTICE, "replica w/ pending replies, with a reply list size of: %lu", listLength(c->reply));
|
||||
serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
|
||||
serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
|
||||
}
|
||||
if (c->bufpos > 0) {
|
||||
// serverLog(LL_NOTICE, "Sending reply %d", x);
|
||||
// serverLog(LL_NOTICE, "SUSSUS AMOGUS, %ld", c->bufpos);
|
||||
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
||||
if (nwritten <= 0) break;
|
||||
c->sentlen += nwritten;
|
||||
totwritten += nwritten;
|
||||
|
||||
/* If the buffer was sent, set bufpos to zero to continue with
|
||||
* the remainder of the reply. */
|
||||
* the remainder of the reply. */
|
||||
// serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen);
|
||||
if ((int)c->sentlen == c->bufpos) {
|
||||
c->bufpos = 0;
|
||||
c->sentlen = 0;
|
||||
@ -1714,23 +1733,112 @@ int writeToClient(client *c, int handler_installed) {
|
||||
}
|
||||
}
|
||||
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
|
||||
* bytes, in a single threaded server it's a good idea to serve
|
||||
* other clients as well, even if a very large request comes from
|
||||
* super fast link that is always able to accept data (in real world
|
||||
* scenario think about 'KEYS *' against the loopback interface).
|
||||
*
|
||||
* However if we are over the maxmemory limit we ignore that and
|
||||
* just deliver as much data as it is possible to deliver.
|
||||
*
|
||||
* Moreover, we also send as much as possible if the client is
|
||||
* a replica or a monitor (otherwise, on high-speed traffic, the
|
||||
* replication/output buffer will grow indefinitely) */
|
||||
* bytes, in a single threaded server it's a good idea to serve
|
||||
* other clients as well, even if a very large request comes from
|
||||
* super fast link that is always able to accept data (in real world
|
||||
* scenario think about 'KEYS *' against the loopback interface).
|
||||
*
|
||||
* However if we are over the maxmemory limit we ignore that and
|
||||
* just deliver as much data as it is possible to deliver.
|
||||
*
|
||||
* Moreover, we also send as much as possible if the client is
|
||||
* a replica or a monitor (otherwise, on high-speed traffic, the
|
||||
* replication/output buffer will grow indefinitely) */
|
||||
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
|
||||
(g_pserver->maxmemory == 0 ||
|
||||
zmalloc_used_memory() < g_pserver->maxmemory) &&
|
||||
zmalloc_used_memory() < g_pserver->maxmemory) &&
|
||||
!(c->flags & CLIENT_SLAVE)) break;
|
||||
}
|
||||
|
||||
|
||||
/* If there are no more pending replies, then we have transmitted the RDB.
|
||||
* This means further replication commands will be taken straight from the
|
||||
* replication backlog from now on. */
|
||||
if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c)){
|
||||
if (!c->transmittedRDB)
|
||||
serverLog(LL_NOTICE, "---------->>>>>>>> TRANSMISSION OF THE RDB HAS COMPLETED <<<<<<<<----------");
|
||||
c->transmittedRDB = true;
|
||||
}
|
||||
|
||||
/* For replicas, we don't store all the information in the client buffer
|
||||
* Most of the time (aside from immediately after synchronizing), we read
|
||||
* from the replication backlog directly */
|
||||
if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1 && c->transmittedRDB){
|
||||
/* copy global variables into local scope so if they change in between we don't care */
|
||||
long long repl_backlog_size = g_pserver->repl_backlog_size;
|
||||
long long nwrittenPart2 = 0;
|
||||
|
||||
ssize_t nrequested; /* The number of bytes requested to write */
|
||||
/* normal case with no wrap around */
|
||||
if (repl_backlog_idx >= c->repl_curr_idx){
|
||||
nrequested = repl_backlog_idx - c->repl_curr_idx;
|
||||
nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_idx - c->repl_curr_idx);
|
||||
/* wrap around case, v. rare */
|
||||
/* also v. buggy so there's that */
|
||||
} else {
|
||||
serverLog(LL_NOTICE, "WRAP CASE");
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
|
||||
serverLog(LL_NOTICE, "actually written: %ld", nwritten);
|
||||
serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
|
||||
serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen);
|
||||
serverLog(LL_NOTICE, "nwritten: %ld", nwritten);
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
|
||||
nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx;
|
||||
nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx);
|
||||
/* only attempt wrapping if we write the correct number of bytes */
|
||||
if (nwritten == repl_backlog_size - c->repl_curr_idx){
|
||||
serverLog(LL_NOTICE, "SECOND STAGE");
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
|
||||
serverLog(LL_NOTICE, "actually written: %ld", nwritten);
|
||||
serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
|
||||
serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen);
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
|
||||
long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx);
|
||||
if (nwrittenPart2 != -1)
|
||||
nwritten += nwrittenPart2;
|
||||
|
||||
serverLog(LL_NOTICE, "nwrittenPart2: %lld", nwrittenPart2);
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
} else {
|
||||
serverLog(LL_NOTICE, "SUPER SHORT");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* only update the replica's current index if bytes were sent */
|
||||
|
||||
// if (nrequested != nwritten){
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
serverLog(LL_NOTICE, "AFTER THE FACT");
|
||||
serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
|
||||
serverLog(LL_NOTICE, "actually written: %ld", nwritten);
|
||||
serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
|
||||
serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
|
||||
serverLog(LL_NOTICE, "-----------------------------------------");
|
||||
// }
|
||||
|
||||
|
||||
if (nwritten == nrequested){
|
||||
c->repl_curr_idx = -1; /* -1 denotes no more replica writes */
|
||||
}
|
||||
else if (nwritten > 0)
|
||||
c->repl_curr_idx = (c->repl_curr_idx + nwritten) % repl_backlog_size;
|
||||
|
||||
serverAssert(c->repl_curr_idx < repl_backlog_size);
|
||||
|
||||
/* only increment bytes if an error didn't occur */
|
||||
if (nwritten > 0){
|
||||
totwritten += nwritten;
|
||||
c->repl_curr_off += nwritten;
|
||||
}
|
||||
|
||||
/* If the second part of a write didn't go through, we still need to register that */
|
||||
if (nwrittenPart2 == -1) nwritten = -1;
|
||||
}
|
||||
|
||||
g_pserver->stat_net_output_bytes += totwritten;
|
||||
if (nwritten == -1) {
|
||||
if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
|
||||
@ -1750,7 +1858,7 @@ int writeToClient(client *c, int handler_installed) {
|
||||
* We just rely on data / pings received for timeout detection. */
|
||||
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
|
||||
}
|
||||
if (!clientHasPendingReplies(c)) {
|
||||
if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) {
|
||||
c->sentlen = 0;
|
||||
if (handler_installed) connSetWriteHandler(c->conn, NULL);
|
||||
|
||||
@ -1904,6 +2012,12 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
||||
/* Don't write to clients that are going to be closed anyway. */
|
||||
if (c->flags & CLIENT_CLOSE_ASAP) continue;
|
||||
|
||||
// if (c->flags & CLIENT_SLAVE){
|
||||
// if(clientHasPendingReplies(c))
|
||||
// serverLog(LL_NOTICE, "somehow the client buffer has these values: %s", c->buf);
|
||||
// serverLog(LL_NOTICE, "LOL");
|
||||
// }
|
||||
|
||||
/* Try to write buffers to the client socket. */
|
||||
if (writeToClient(c,0) == C_ERR)
|
||||
{
|
||||
@ -1920,7 +2034,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
||||
|
||||
/* If after the synchronous writes above we still have data to
|
||||
* output to the client, we need to install the writable handler. */
|
||||
if (clientHasPendingReplies(c)) {
|
||||
if (clientHasPendingReplies(c) || c->repl_curr_idx != -1) {
|
||||
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR)
|
||||
freeClientAsync(c);
|
||||
}
|
||||
@ -3268,6 +3382,13 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
|
||||
}
|
||||
}
|
||||
|
||||
/* In the case of a replica client, it is possible (and very likely)
|
||||
* that writes to said replica are using data from the replication backlog
|
||||
* as opposed to it's own internal buffer, this number should keep track of that */
|
||||
unsigned long getClientReplicationBacklogSharedUsage(client *c) {
|
||||
return (c->repl_curr_idx == -1 && c->flags & CLIENT_SLAVE) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
|
||||
}
|
||||
|
||||
/* This function returns the number of bytes that Redis is
|
||||
* using to store the reply still not read by the client.
|
||||
*
|
||||
@ -3276,9 +3397,11 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
|
||||
* enforcing the client output length limits. */
|
||||
unsigned long getClientOutputBufferMemoryUsage(client *c) {
|
||||
unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
|
||||
return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0);
|
||||
return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0) + getClientReplicationBacklogSharedUsage(c);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* Get the class of a client, used in order to enforce limits to different
|
||||
* classes of clients.
|
||||
*
|
||||
|
@ -47,6 +47,9 @@
|
||||
#include <unordered_map>
|
||||
#include <string>
|
||||
|
||||
#define BYPASS_BUFFER
|
||||
// #define BYPASS_PSYNC
|
||||
|
||||
void replicationDiscardCachedMaster(redisMaster *mi);
|
||||
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
|
||||
void replicationSendAck(redisMaster *mi);
|
||||
@ -59,6 +62,18 @@ static void propagateMasterStaleKeys();
|
||||
* the instance is configured to have no persistence. */
|
||||
int RDBGeneratedByReplication = 0;
|
||||
|
||||
void resizeReplicationBacklogForClients(long long newsize);
|
||||
|
||||
void setReplIdx(client *c, long long idx, long long off){
|
||||
if (prepareClientToWrite(c) != C_OK) return;
|
||||
// serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx);
|
||||
// serverLog(LL_NOTICE, "What is this value? %lld", c->repl_curr_idx);
|
||||
if (c->repl_curr_idx == -1){
|
||||
c->repl_curr_idx = idx;
|
||||
c->repl_curr_off = off;
|
||||
}
|
||||
}
|
||||
|
||||
/* --------------------------- Utility functions ---------------------------- */
|
||||
|
||||
/* Return the pointer to a string representing the replica ip:listening_port
|
||||
@ -213,6 +228,8 @@ void resizeReplicationBacklog(long long newsize) {
|
||||
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
||||
if (g_pserver->repl_backlog_size == newsize) return;
|
||||
|
||||
serverLog(LL_NOTICE, "WE HAD TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize);
|
||||
|
||||
if (g_pserver->repl_backlog != NULL) {
|
||||
/* What we actually do is to flush the old buffer and realloc a new
|
||||
* empty one. It will refill with new data incrementally.
|
||||
@ -253,6 +270,80 @@ void resizeReplicationBacklog(long long newsize) {
|
||||
g_pserver->repl_backlog_size = newsize;
|
||||
}
|
||||
|
||||
|
||||
/* The above but for when clients need extra replication backlog because ??? */
|
||||
void resizeReplicationBacklogForClients(long long newsize) {
|
||||
if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
|
||||
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
|
||||
if (g_pserver->repl_backlog_size == newsize) return;
|
||||
|
||||
serverLog(LL_NOTICE, "WE HAD TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize);
|
||||
/* get the critical client size, i.e. the size of the data unflushed to clients */
|
||||
long long earliest_off = LONG_LONG_MAX;
|
||||
long long earliest_idx = -1;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(g_pserver->slaves, &li);
|
||||
while ((ln = listNext(&li))) {
|
||||
client *replica = (client*)listNodeValue(ln);
|
||||
if (replica->repl_curr_off != -1 && replica->repl_curr_off < earliest_off){
|
||||
earliest_off = replica->repl_curr_off;
|
||||
earliest_idx = replica->repl_curr_idx;
|
||||
}
|
||||
}
|
||||
|
||||
if (g_pserver->repl_backlog != NULL) {
|
||||
/* What we actually do is to flush the old buffer and realloc a new
|
||||
* empty one. It will refill with new data incrementally.
|
||||
* The reason is that copying a few gigabytes adds latency and even
|
||||
* worse often we need to alloc additional space before freeing the
|
||||
* old buffer. */
|
||||
|
||||
if (earliest_idx >= 0) {
|
||||
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
||||
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
|
||||
char *backlog = (char*)zmalloc(newsize);
|
||||
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off;
|
||||
|
||||
if (g_pserver->repl_backlog_idx >= earliest_idx) {
|
||||
auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx;
|
||||
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog);
|
||||
serverLog(LL_NOTICE, "g_pserver->master_repl_offset: %lld, earliest_off: %lld, g_pserver->repl_backlog_idx: %lld, earliest_idx: %lld",
|
||||
g_pserver->master_repl_offset, earliest_off, g_pserver->repl_backlog_idx, earliest_idx);
|
||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
||||
} else {
|
||||
auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx;
|
||||
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbPhase1);
|
||||
memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
|
||||
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
|
||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
||||
}
|
||||
zfree(g_pserver->repl_backlog);
|
||||
g_pserver->repl_backlog = backlog;
|
||||
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
||||
listRewind(g_pserver->slaves, &li);
|
||||
/* Go through the clients and update their replication indicies */
|
||||
while ((ln = listNext(&li))) {
|
||||
client *replica = (client*)listNodeValue(ln);
|
||||
if (replica->repl_curr_idx != -1){
|
||||
replica->repl_curr_idx -= earliest_idx;
|
||||
if (replica->repl_curr_idx < 0)
|
||||
replica->repl_curr_idx += g_pserver->repl_backlog_size;
|
||||
}
|
||||
}
|
||||
g_pserver->repl_batch_idxStart = 0;
|
||||
} else {
|
||||
zfree(g_pserver->repl_backlog);
|
||||
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
||||
g_pserver->repl_backlog_histlen = 0;
|
||||
g_pserver->repl_backlog_idx = 0;
|
||||
/* Next byte we have is... the next since the buffer is empty. */
|
||||
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
|
||||
}
|
||||
}
|
||||
g_pserver->repl_backlog_size = newsize;
|
||||
}
|
||||
|
||||
void freeReplicationBacklog(void) {
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
listIter li;
|
||||
@ -683,6 +774,10 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
|
||||
* split the reply in two parts if we are cross-boundary. */
|
||||
len = g_pserver->repl_backlog_histlen - skip;
|
||||
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
||||
serverLog(LL_NOTICE, "Coming through from the addReplicationBacklog");
|
||||
#ifdef BYPASS_PSYNC
|
||||
setReplIdx(c, j, offset);
|
||||
#else
|
||||
while(len) {
|
||||
long long thislen =
|
||||
((g_pserver->repl_backlog_size - j) < len) ?
|
||||
@ -693,6 +788,8 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
|
||||
len -= thislen;
|
||||
j = 0;
|
||||
}
|
||||
#endif
|
||||
serverLog(LL_NOTICE, "rdb transmitted? %d, pending replies? %d", c->transmittedRDB, clientHasPendingReplies(c));
|
||||
return g_pserver->repl_backlog_histlen - skip;
|
||||
}
|
||||
|
||||
@ -731,6 +828,8 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) {
|
||||
* a SELECT statement in the replication stream. */
|
||||
g_pserver->replicaseldb = -1;
|
||||
|
||||
serverLog(LL_NOTICE, "We are setting up here lad");
|
||||
|
||||
/* Don't send this reply to slaves that approached us with
|
||||
* the old SYNC command. */
|
||||
if (!(replica->flags & CLIENT_PRE_PSYNC)) {
|
||||
@ -989,6 +1088,7 @@ void syncCommand(client *c) {
|
||||
if (!strcasecmp((const char*)ptrFromObj(c->argv[0]),"psync")) {
|
||||
if (masterTryPartialResynchronization(c) == C_OK) {
|
||||
g_pserver->stat_sync_partial_ok++;
|
||||
// c->repl_curr_idx = g_pserver->repl_backlog_idx;
|
||||
return; /* No full resync needed, return. */
|
||||
} else {
|
||||
char *master_replid = (char*)ptrFromObj(c->argv[1]);
|
||||
@ -1016,6 +1116,7 @@ void syncCommand(client *c) {
|
||||
connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
|
||||
c->repldbfd = -1;
|
||||
c->flags |= CLIENT_SLAVE;
|
||||
// c->repl_curr_idx = g_pserver->repl_backlog_idx;
|
||||
listAddNodeTail(g_pserver->slaves,c);
|
||||
|
||||
/* Create the replication backlog if needed. */
|
||||
@ -1035,6 +1136,7 @@ void syncCommand(client *c) {
|
||||
if (g_pserver->FRdbSaveInProgress() &&
|
||||
g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK)
|
||||
{
|
||||
serverLog(LL_NOTICE, "case 1");
|
||||
/* Ok a background save is in progress. Let's check if it is a good
|
||||
* one for replication, i.e. if there is another replica that is
|
||||
* registering differences since the server forked to save. */
|
||||
@ -1066,6 +1168,7 @@ void syncCommand(client *c) {
|
||||
} else if (g_pserver->FRdbSaveInProgress() &&
|
||||
g_pserver->rdb_child_type == RDB_CHILD_TYPE_SOCKET)
|
||||
{
|
||||
serverLog(LL_NOTICE, "case 2");
|
||||
/* There is an RDB child process but it is writing directly to
|
||||
* children sockets. We need to wait for the next BGSAVE
|
||||
* in order to synchronize. */
|
||||
@ -1073,6 +1176,7 @@ void syncCommand(client *c) {
|
||||
|
||||
/* CASE 3: There is no BGSAVE is progress. */
|
||||
} else {
|
||||
serverLog(LL_NOTICE, "case 3");
|
||||
if (g_pserver->repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
|
||||
/* Diskless replication RDB child is created inside
|
||||
* replicationCron() since we want to delay its start a
|
||||
@ -1278,6 +1382,7 @@ void replconfCommand(client *c) {
|
||||
* 3) Update the count of "good replicas". */
|
||||
void putSlaveOnline(client *replica) {
|
||||
replica->replstate = SLAVE_STATE_ONLINE;
|
||||
|
||||
replica->repl_put_online_on_ack = 0;
|
||||
replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */
|
||||
if (connSetWriteHandler(replica->conn, sendReplyToClient, true) == C_ERR) {
|
||||
@ -1415,11 +1520,13 @@ void sendBulkToSlave(connection *conn) {
|
||||
|
||||
replica->repldboff += nwritten;
|
||||
g_pserver->stat_net_output_bytes += nwritten;
|
||||
// replica->repl_curr_idx = g_pserver->repl_backlog_idx;
|
||||
if (replica->repldboff == replica->repldbsize) {
|
||||
close(replica->repldbfd);
|
||||
replica->repldbfd = -1;
|
||||
connSetWriteHandler(replica->conn,NULL);
|
||||
putSlaveOnline(replica);
|
||||
serverLog(LL_NOTICE, "ABOUT TO DIE HERE");
|
||||
}
|
||||
}
|
||||
|
||||
@ -4450,6 +4557,21 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long
|
||||
}
|
||||
|
||||
void _clientAsyncReplyBufferReserve(client *c, size_t len);
|
||||
|
||||
/* Has the end of the replication backlog overflowed past the beginning? */
|
||||
bool replOverflowHasOccured(){
|
||||
if (g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart){
|
||||
long long repl_idx_difference = g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart ?
|
||||
g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart :
|
||||
(g_pserver->repl_backlog_size + g_pserver->repl_backlog_idx) - g_pserver->repl_batch_idxStart;
|
||||
|
||||
return g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > repl_idx_difference;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
thread_local int transmittedCount = 0;
|
||||
|
||||
void flushReplBacklogToClients()
|
||||
{
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
@ -4463,11 +4585,31 @@ void flushReplBacklogToClients()
|
||||
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
|
||||
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
|
||||
|
||||
serverAssert(!replOverflowHasOccured());
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(g_pserver->slaves, &li);
|
||||
|
||||
#if 0
|
||||
// check for potential overflow first
|
||||
while ((ln = listNext(&li))) {
|
||||
client *replica = (client*)listNodeValue(ln);
|
||||
// serverLog(LL_NOTICE, "replica state: %d", replica->replstate);
|
||||
|
||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||
if (replica->repl_curr_idx == -1) continue;
|
||||
|
||||
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
|
||||
if (FCorrectThread(replica))
|
||||
ul.lock();
|
||||
else
|
||||
fAsyncWrite = true;
|
||||
#endif
|
||||
|
||||
while ((ln = listNext(&li))) {
|
||||
client *replica = (client*)listNodeValue(ln);
|
||||
// serverLog(LL_NOTICE, "replica state: %d", replica->replstate);
|
||||
|
||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||
@ -4478,6 +4620,19 @@ void flushReplBacklogToClients()
|
||||
else
|
||||
fAsyncWrite = true;
|
||||
|
||||
|
||||
#ifdef BYPASS_BUFFER
|
||||
/* If we are online and the RDB has been sent, there is no need to feed the client buffer
|
||||
* We will send our replies directly from the replication backlog instead */
|
||||
if (replica->replstate == SLAVE_STATE_ONLINE && replica->transmittedRDB){
|
||||
setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart);
|
||||
continue;
|
||||
}
|
||||
#else
|
||||
if (replica->replstate == SLAVE_STATE_ONLINE){
|
||||
// serverLog(LL_NOTICE, "would be calling this garbage function w/ offset: %lld", g_pserver->repl_batch_idxStart);
|
||||
}
|
||||
#endif
|
||||
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
||||
long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
|
||||
serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy);
|
||||
@ -4491,6 +4646,7 @@ void flushReplBacklogToClients()
|
||||
_clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx);
|
||||
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
|
||||
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
|
||||
|
||||
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));
|
||||
}
|
||||
}
|
||||
|
11
src/server.h
11
src/server.h
@ -1516,6 +1516,8 @@ struct client {
|
||||
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
||||
copying this replica output buffer
|
||||
should use. */
|
||||
long long repl_curr_idx = -1; /* Replication index sent, if this is a replica */
|
||||
long long repl_curr_off = -1;
|
||||
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
|
||||
int slave_listening_port; /* As configured with: REPLCONF listening-port */
|
||||
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
|
||||
@ -1575,6 +1577,9 @@ struct client {
|
||||
robj **argv;
|
||||
size_t argv_len_sumActive = 0;
|
||||
|
||||
bool transmittedRDB = false; /* Have we finished transmitting the RDB to this replica? */
|
||||
/* If so, we can read from the replication backlog instead of the client buffer */
|
||||
|
||||
// post a function from a non-client thread to run on its client thread
|
||||
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
|
||||
size_t argv_len_sum() const;
|
||||
@ -3470,6 +3475,8 @@ void mixDigest(unsigned char *digest, const void *ptr, size_t len);
|
||||
void xorDigest(unsigned char *digest, const void *ptr, size_t len);
|
||||
int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags);
|
||||
|
||||
|
||||
|
||||
int moduleGILAcquiredByModule(void);
|
||||
extern int g_fInCrash;
|
||||
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
|
||||
@ -3526,6 +3533,8 @@ void tlsInit(void);
|
||||
void tlsInitThread();
|
||||
int tlsConfigure(redisTLSContextConfig *ctx_config);
|
||||
|
||||
int prepareClientToWrite(client *c);
|
||||
|
||||
|
||||
class ShutdownException
|
||||
{};
|
||||
@ -3538,3 +3547,5 @@ class ShutdownException
|
||||
int iAmMaster(void);
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user