From 05fe41b33a3b1ebc25a5c17748a5c00a3e0a2984 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Mon, 26 Apr 2021 22:13:32 +0000 Subject: [PATCH] Primitive implementation of bypassing client buffer, stats are all messed up and print statements everywhere Former-commit-id: 8ae310fb0f7b53add826f76891da333b63860001 --- src/networking.cpp | 159 +++++++++++++++++++++++++++++++++++++++----- src/replication.cpp | 156 +++++++++++++++++++++++++++++++++++++++++++ src/server.h | 11 +++ 3 files changed, 308 insertions(+), 18 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 574a8bc6c..18ab382bd 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -224,6 +224,7 @@ void clientInstallWriteHandler(client *c) { (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { + // serverLog(LL_NOTICE, "we installing boyz"); AssertCorrectThread(c); serverAssert(c->lock.fOwnLock()); /* Here instead of installing the write handler, we just flag the @@ -301,7 +302,7 @@ int prepareClientToWrite(client *c) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ - if (!fAsync && !clientHasPendingReplies(c)) clientInstallWriteHandler(c); + if (!fAsync && !clientHasPendingReplies(c) && c->repl_curr_idx == -1) clientInstallWriteHandler(c); if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1676,15 +1677,33 @@ int writeToClient(client *c, int handler_installed) { std::unique_locklock)> lock(c->lock); + /* if this is a write to a replica, it's coming straight from the replication backlog */ + long long repl_backlog_idx = g_pserver->repl_backlog_idx; + + bool wroteFromClientBuffer = false; /* True if you wrote from the client buffer in this function call */ + while(clientHasPendingReplies(c)) { + wroteFromClientBuffer = true; + if (c->flags & CLIENT_SLAVE && listLength(c->reply) % 10 == 0){ + + serverLog(LL_NOTICE, "-----------------------------------------"); + serverLog(LL_NOTICE, "replica w/ pending replies, with a reply list size of: %lu", listLength(c->reply)); + serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); + serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); + serverLog(LL_NOTICE, "-----------------------------------------"); + + } if (c->bufpos > 0) { + // serverLog(LL_NOTICE, "Sending reply %d", x); + // serverLog(LL_NOTICE, "SUSSUS AMOGUS, %ld", c->bufpos); nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ + * the remainder of the reply. */ + // serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen); if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; @@ -1714,23 +1733,112 @@ int writeToClient(client *c, int handler_installed) { } } /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT - * bytes, in a single threaded server it's a good idea to serve - * other clients as well, even if a very large request comes from - * super fast link that is always able to accept data (in real world - * scenario think about 'KEYS *' against the loopback interface). - * - * However if we are over the maxmemory limit we ignore that and - * just deliver as much data as it is possible to deliver. - * - * Moreover, we also send as much as possible if the client is - * a replica or a monitor (otherwise, on high-speed traffic, the - * replication/output buffer will grow indefinitely) */ + * bytes, in a single threaded server it's a good idea to serve + * other clients as well, even if a very large request comes from + * super fast link that is always able to accept data (in real world + * scenario think about 'KEYS *' against the loopback interface). + * + * However if we are over the maxmemory limit we ignore that and + * just deliver as much data as it is possible to deliver. + * + * Moreover, we also send as much as possible if the client is + * a replica or a monitor (otherwise, on high-speed traffic, the + * replication/output buffer will grow indefinitely) */ if (totwritten > NET_MAX_WRITES_PER_EVENT && (g_pserver->maxmemory == 0 || - zmalloc_used_memory() < g_pserver->maxmemory) && + zmalloc_used_memory() < g_pserver->maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } - + + /* If there are no more pending replies, then we have transmitted the RDB. + * This means further replication commands will be taken straight from the + * replication backlog from now on. */ + if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c)){ + if (!c->transmittedRDB) + serverLog(LL_NOTICE, "---------->>>>>>>> TRANSMISSION OF THE RDB HAS COMPLETED <<<<<<<<----------"); + c->transmittedRDB = true; + } + + /* For replicas, we don't store all the information in the client buffer + * Most of the time (aside from immediately after synchronizing), we read + * from the replication backlog directly */ + if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1 && c->transmittedRDB){ + /* copy global variables into local scope so if they change in between we don't care */ + long long repl_backlog_size = g_pserver->repl_backlog_size; + long long nwrittenPart2 = 0; + + ssize_t nrequested; /* The number of bytes requested to write */ + /* normal case with no wrap around */ + if (repl_backlog_idx >= c->repl_curr_idx){ + nrequested = repl_backlog_idx - c->repl_curr_idx; + nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_idx - c->repl_curr_idx); + /* wrap around case, v. rare */ + /* also v. buggy so there's that */ + } else { + serverLog(LL_NOTICE, "WRAP CASE"); + serverLog(LL_NOTICE, "-----------------------------------------"); + serverLog(LL_NOTICE, "requested to write: %ld", nrequested); + serverLog(LL_NOTICE, "actually written: %ld", nwritten); + serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); + serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen); + serverLog(LL_NOTICE, "nwritten: %ld", nwritten); + serverLog(LL_NOTICE, "-----------------------------------------"); + + nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx; + nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx); + /* only attempt wrapping if we write the correct number of bytes */ + if (nwritten == repl_backlog_size - c->repl_curr_idx){ + serverLog(LL_NOTICE, "SECOND STAGE"); + serverLog(LL_NOTICE, "-----------------------------------------"); + serverLog(LL_NOTICE, "requested to write: %ld", nrequested); + serverLog(LL_NOTICE, "actually written: %ld", nwritten); + serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); + serverLog(LL_NOTICE, "buf pos: %d, sentlen: %ld", c->bufpos, c->sentlen); + serverLog(LL_NOTICE, "-----------------------------------------"); + + long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx); + if (nwrittenPart2 != -1) + nwritten += nwrittenPart2; + + serverLog(LL_NOTICE, "nwrittenPart2: %lld", nwrittenPart2); + serverLog(LL_NOTICE, "-----------------------------------------"); + } else { + serverLog(LL_NOTICE, "SUPER SHORT"); + } + + } + + /* only update the replica's current index if bytes were sent */ + + // if (nrequested != nwritten){ + serverLog(LL_NOTICE, "-----------------------------------------"); + serverLog(LL_NOTICE, "AFTER THE FACT"); + serverLog(LL_NOTICE, "requested to write: %ld", nrequested); + serverLog(LL_NOTICE, "actually written: %ld", nwritten); + serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); + serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); + serverLog(LL_NOTICE, "-----------------------------------------"); + // } + + + if (nwritten == nrequested){ + c->repl_curr_idx = -1; /* -1 denotes no more replica writes */ + } + else if (nwritten > 0) + c->repl_curr_idx = (c->repl_curr_idx + nwritten) % repl_backlog_size; + + serverAssert(c->repl_curr_idx < repl_backlog_size); + + /* only increment bytes if an error didn't occur */ + if (nwritten > 0){ + totwritten += nwritten; + c->repl_curr_off += nwritten; + } + + /* If the second part of a write didn't go through, we still need to register that */ + if (nwrittenPart2 == -1) nwritten = -1; + } + g_pserver->stat_net_output_bytes += totwritten; if (nwritten == -1) { if (connGetState(c->conn) == CONN_STATE_CONNECTED) { @@ -1750,7 +1858,7 @@ int writeToClient(client *c, int handler_installed) { * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime; } - if (!clientHasPendingReplies(c)) { + if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) { c->sentlen = 0; if (handler_installed) connSetWriteHandler(c->conn, NULL); @@ -1904,6 +2012,12 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; + // if (c->flags & CLIENT_SLAVE){ + // if(clientHasPendingReplies(c)) + // serverLog(LL_NOTICE, "somehow the client buffer has these values: %s", c->buf); + // serverLog(LL_NOTICE, "LOL"); + // } + /* Try to write buffers to the client socket. */ if (writeToClient(c,0) == C_ERR) { @@ -1920,7 +2034,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c)) { + if (clientHasPendingReplies(c) || c->repl_curr_idx != -1) { if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) freeClientAsync(c); } @@ -3268,6 +3382,13 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { } } +/* In the case of a replica client, it is possible (and very likely) + * that writes to said replica are using data from the replication backlog + * as opposed to it's own internal buffer, this number should keep track of that */ +unsigned long getClientReplicationBacklogSharedUsage(client *c) { + return (c->repl_curr_idx == -1 && c->flags & CLIENT_SLAVE) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off; +} + /* This function returns the number of bytes that Redis is * using to store the reply still not read by the client. * @@ -3276,9 +3397,11 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * enforcing the client output length limits. */ unsigned long getClientOutputBufferMemoryUsage(client *c) { unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); - return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0); + return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0) + getClientReplicationBacklogSharedUsage(c); } + + /* Get the class of a client, used in order to enforce limits to different * classes of clients. * diff --git a/src/replication.cpp b/src/replication.cpp index 2533bae52..ccb538a69 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -47,6 +47,9 @@ #include #include +#define BYPASS_BUFFER +// #define BYPASS_PSYNC + void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); void replicationSendAck(redisMaster *mi); @@ -59,6 +62,18 @@ static void propagateMasterStaleKeys(); * the instance is configured to have no persistence. */ int RDBGeneratedByReplication = 0; +void resizeReplicationBacklogForClients(long long newsize); + +void setReplIdx(client *c, long long idx, long long off){ + if (prepareClientToWrite(c) != C_OK) return; + // serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx); + // serverLog(LL_NOTICE, "What is this value? %lld", c->repl_curr_idx); + if (c->repl_curr_idx == -1){ + c->repl_curr_idx = idx; + c->repl_curr_off = off; + } +} + /* --------------------------- Utility functions ---------------------------- */ /* Return the pointer to a string representing the replica ip:listening_port @@ -213,6 +228,8 @@ void resizeReplicationBacklog(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; + serverLog(LL_NOTICE, "WE HAD TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize); + if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. @@ -253,6 +270,80 @@ void resizeReplicationBacklog(long long newsize) { g_pserver->repl_backlog_size = newsize; } + +/* The above but for when clients need extra replication backlog because ??? */ +void resizeReplicationBacklogForClients(long long newsize) { + if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE) + newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; + if (g_pserver->repl_backlog_size == newsize) return; + + serverLog(LL_NOTICE, "WE HAD TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize); + /* get the critical client size, i.e. the size of the data unflushed to clients */ + long long earliest_off = LONG_LONG_MAX; + long long earliest_idx = -1; + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + if (replica->repl_curr_off != -1 && replica->repl_curr_off < earliest_off){ + earliest_off = replica->repl_curr_off; + earliest_idx = replica->repl_curr_idx; + } + } + + if (g_pserver->repl_backlog != NULL) { + /* What we actually do is to flush the old buffer and realloc a new + * empty one. It will refill with new data incrementally. + * The reason is that copying a few gigabytes adds latency and even + * worse often we need to alloc additional space before freeing the + * old buffer. */ + + if (earliest_idx >= 0) { + // We need to keep critical data so we can't shrink less than the hot data in the buffer + newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off); + char *backlog = (char*)zmalloc(newsize); + g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off; + + if (g_pserver->repl_backlog_idx >= earliest_idx) { + auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx; + memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog); + serverLog(LL_NOTICE, "g_pserver->master_repl_offset: %lld, earliest_off: %lld, g_pserver->repl_backlog_idx: %lld, earliest_idx: %lld", + g_pserver->master_repl_offset, earliest_off, g_pserver->repl_backlog_idx, earliest_idx); + serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); + } else { + auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx; + memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbPhase1); + memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); + auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; + serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); + } + zfree(g_pserver->repl_backlog); + g_pserver->repl_backlog = backlog; + g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; + listRewind(g_pserver->slaves, &li); + /* Go through the clients and update their replication indicies */ + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + if (replica->repl_curr_idx != -1){ + replica->repl_curr_idx -= earliest_idx; + if (replica->repl_curr_idx < 0) + replica->repl_curr_idx += g_pserver->repl_backlog_size; + } + } + g_pserver->repl_batch_idxStart = 0; + } else { + zfree(g_pserver->repl_backlog); + g_pserver->repl_backlog = (char*)zmalloc(newsize); + g_pserver->repl_backlog_histlen = 0; + g_pserver->repl_backlog_idx = 0; + /* Next byte we have is... the next since the buffer is empty. */ + g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + } + } + g_pserver->repl_backlog_size = newsize; +} + void freeReplicationBacklog(void) { serverAssert(GlobalLocksAcquired()); listIter li; @@ -683,6 +774,10 @@ long long addReplyReplicationBacklog(client *c, long long offset) { * split the reply in two parts if we are cross-boundary. */ len = g_pserver->repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); + serverLog(LL_NOTICE, "Coming through from the addReplicationBacklog"); +#ifdef BYPASS_PSYNC + setReplIdx(c, j, offset); +#else while(len) { long long thislen = ((g_pserver->repl_backlog_size - j) < len) ? @@ -693,6 +788,8 @@ long long addReplyReplicationBacklog(client *c, long long offset) { len -= thislen; j = 0; } +#endif + serverLog(LL_NOTICE, "rdb transmitted? %d, pending replies? %d", c->transmittedRDB, clientHasPendingReplies(c)); return g_pserver->repl_backlog_histlen - skip; } @@ -731,6 +828,8 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) { * a SELECT statement in the replication stream. */ g_pserver->replicaseldb = -1; + serverLog(LL_NOTICE, "We are setting up here lad"); + /* Don't send this reply to slaves that approached us with * the old SYNC command. */ if (!(replica->flags & CLIENT_PRE_PSYNC)) { @@ -989,6 +1088,7 @@ void syncCommand(client *c) { if (!strcasecmp((const char*)ptrFromObj(c->argv[0]),"psync")) { if (masterTryPartialResynchronization(c) == C_OK) { g_pserver->stat_sync_partial_ok++; + // c->repl_curr_idx = g_pserver->repl_backlog_idx; return; /* No full resync needed, return. */ } else { char *master_replid = (char*)ptrFromObj(c->argv[1]); @@ -1016,6 +1116,7 @@ void syncCommand(client *c) { connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= CLIENT_SLAVE; + // c->repl_curr_idx = g_pserver->repl_backlog_idx; listAddNodeTail(g_pserver->slaves,c); /* Create the replication backlog if needed. */ @@ -1035,6 +1136,7 @@ void syncCommand(client *c) { if (g_pserver->FRdbSaveInProgress() && g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK) { + serverLog(LL_NOTICE, "case 1"); /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another replica that is * registering differences since the server forked to save. */ @@ -1066,6 +1168,7 @@ void syncCommand(client *c) { } else if (g_pserver->FRdbSaveInProgress() && g_pserver->rdb_child_type == RDB_CHILD_TYPE_SOCKET) { + serverLog(LL_NOTICE, "case 2"); /* There is an RDB child process but it is writing directly to * children sockets. We need to wait for the next BGSAVE * in order to synchronize. */ @@ -1073,6 +1176,7 @@ void syncCommand(client *c) { /* CASE 3: There is no BGSAVE is progress. */ } else { + serverLog(LL_NOTICE, "case 3"); if (g_pserver->repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { /* Diskless replication RDB child is created inside * replicationCron() since we want to delay its start a @@ -1278,6 +1382,7 @@ void replconfCommand(client *c) { * 3) Update the count of "good replicas". */ void putSlaveOnline(client *replica) { replica->replstate = SLAVE_STATE_ONLINE; + replica->repl_put_online_on_ack = 0; replica->repl_ack_time = g_pserver->unixtime; /* Prevent false timeout. */ if (connSetWriteHandler(replica->conn, sendReplyToClient, true) == C_ERR) { @@ -1415,11 +1520,13 @@ void sendBulkToSlave(connection *conn) { replica->repldboff += nwritten; g_pserver->stat_net_output_bytes += nwritten; + // replica->repl_curr_idx = g_pserver->repl_backlog_idx; if (replica->repldboff == replica->repldbsize) { close(replica->repldbfd); replica->repldbfd = -1; connSetWriteHandler(replica->conn,NULL); putSlaveOnline(replica); + serverLog(LL_NOTICE, "ABOUT TO DIE HERE"); } } @@ -4450,6 +4557,21 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long } void _clientAsyncReplyBufferReserve(client *c, size_t len); + +/* Has the end of the replication backlog overflowed past the beginning? */ +bool replOverflowHasOccured(){ + if (g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart){ + long long repl_idx_difference = g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart ? + g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart : + (g_pserver->repl_backlog_size + g_pserver->repl_backlog_idx) - g_pserver->repl_batch_idxStart; + + return g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > repl_idx_difference; + } + return false; +} + +thread_local int transmittedCount = 0; + void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); @@ -4463,11 +4585,31 @@ void flushReplBacklogToClients() serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); + serverAssert(!replOverflowHasOccured()); listIter li; listNode *ln; listRewind(g_pserver->slaves, &li); + +#if 0 + // check for potential overflow first while ((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); + // serverLog(LL_NOTICE, "replica state: %d", replica->replstate); + + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (replica->flags & CLIENT_CLOSE_ASAP) continue; + if (replica->repl_curr_idx == -1) continue; + + std::unique_lock ul(replica->lock, std::defer_lock); + if (FCorrectThread(replica)) + ul.lock(); + else + fAsyncWrite = true; +#endif + + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + // serverLog(LL_NOTICE, "replica state: %d", replica->replstate); if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; @@ -4478,6 +4620,19 @@ void flushReplBacklogToClients() else fAsyncWrite = true; + +#ifdef BYPASS_BUFFER + /* If we are online and the RDB has been sent, there is no need to feed the client buffer + * We will send our replies directly from the replication backlog instead */ + if (replica->replstate == SLAVE_STATE_ONLINE && replica->transmittedRDB){ + setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart); + continue; + } +#else + if (replica->replstate == SLAVE_STATE_ONLINE){ + // serverLog(LL_NOTICE, "would be calling this garbage function w/ offset: %lld", g_pserver->repl_batch_idxStart); + } +#endif if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy); @@ -4491,6 +4646,7 @@ void flushReplBacklogToClients() _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); + serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart)); } } diff --git a/src/server.h b/src/server.h index 878154bc5..cfd6c34a0 100644 --- a/src/server.h +++ b/src/server.h @@ -1516,6 +1516,8 @@ struct client { long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this replica output buffer should use. */ + long long repl_curr_idx = -1; /* Replication index sent, if this is a replica */ + long long repl_curr_off = -1; char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ @@ -1575,6 +1577,9 @@ struct client { robj **argv; size_t argv_len_sumActive = 0; + bool transmittedRDB = false; /* Have we finished transmitting the RDB to this replica? */ + /* If so, we can read from the replication backlog instead of the client buffer */ + // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); size_t argv_len_sum() const; @@ -3470,6 +3475,8 @@ void mixDigest(unsigned char *digest, const void *ptr, size_t len); void xorDigest(unsigned char *digest, const void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags); + + int moduleGILAcquiredByModule(void); extern int g_fInCrash; static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate @@ -3526,6 +3533,8 @@ void tlsInit(void); void tlsInitThread(); int tlsConfigure(redisTLSContextConfig *ctx_config); +int prepareClientToWrite(client *c); + class ShutdownException {}; @@ -3538,3 +3547,5 @@ class ShutdownException int iAmMaster(void); #endif + +