From 615fbeb10f973c0eda57a45d504d24c0f8765b22 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Wed, 2 Jun 2021 23:41:36 +0000 Subject: [PATCH] Sync works single threaded properly, passes all but one testcase (which hangs) Former-commit-id: 9a6ca3a5d906b9d87fe70652d218decbb2775ac1 --- src/Makefile | 2 +- src/networking.cpp | 165 ++++++++++++++++++++++++++------------------ src/replication.cpp | 106 +++++++++++----------------- src/server.h | 9 +-- 4 files changed, 145 insertions(+), 137 deletions(-) diff --git a/src/Makefile b/src/Makefile index 966ce4400..a0ee5fe2a 100644 --- a/src/Makefile +++ b/src/Makefile @@ -15,7 +15,7 @@ release_hdr := $(shell sh -c './mkreleasehdr.sh') uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') -OPTIMIZATION?=-O2 -flto +OPTIMIZATION?=-O2 DEPENDENCY_TARGETS=hiredis linenoise lua rocksdb NODEPS:=clean distclean diff --git a/src/networking.cpp b/src/networking.cpp index caefd6d1e..80120d0ca 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -146,6 +146,7 @@ client *createClient(connection *conn, int iel) { c->flags = 0; c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWriteHandler = FALSE; + c->fPendingReplicaWrite = FALSE; c->ctime = c->lastinteraction = g_pserver->unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ @@ -221,6 +222,10 @@ void clientInstallWriteHandler(client *c) { /* Schedule the client to write the output buffers to the socket only * if not already done and, for slaves, if the replica can actually receive * writes at this stage. */ + + if (c->flags & CLIENT_SLAVE) + serverLog(LL_NOTICE, "installing write handler"); + if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) @@ -272,6 +277,9 @@ void clientInstallAsyncWriteHandler(client *c) { int prepareClientToWrite(client *c) { bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread + if (c->flags & CLIENT_SLAVE) + serverLog(LL_NOTICE, "got into prepareClientToWrite"); + if (!fAsync) { serverAssert(c->conn == nullptr || c->lock.fOwnLock()); } else { @@ -302,7 +310,7 @@ int prepareClientToWrite(client *c) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ - if (!fAsync && !clientHasPendingReplies(c) && c->repl_curr_idx == -1) clientInstallWriteHandler(c); + if (!fAsync && !clientHasPendingReplies(c) && !c->fPendingReplicaWrite) clientInstallWriteHandler(c); if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -320,7 +328,6 @@ void _clientAsyncReplyBufferReserve(client *c, size_t len) { clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); replyNew->used = 0; - std::unique_lock tRDBLock (c->transmittedRDBLock); c->replyAsync = replyNew; } @@ -334,7 +341,6 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) { if (fAsync) { serverAssert(GlobalLocksAcquired()); - std::unique_lock tRDBLock (c->transmittedRDBLock); if (c->replyAsync == nullptr || (c->replyAsync->size - c->replyAsync->used) < len) { if (c->replyAsync == nullptr) { @@ -1661,6 +1667,16 @@ client *lookupClientByID(uint64_t id) { return (c == raxNotFound) ? NULL : c; } +/* Compute the corresponding index from a replication backlog offset + * by taking the distance between the input offset and the replication backlog offset + * and applying that to the replication backlog index, wrapping around if the index + * becomes negative. + * TODO: Rewrite comment for new logic */ +long long getReplIndexFromOffset(long long offset){ + long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size; + return index; +} + /* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed because of some * error. If handler_installed is set, it will attempt to clear the @@ -1680,7 +1696,11 @@ int writeToClient(client *c, int handler_installed) { std::unique_locklock)> lock(c->lock); // serverLog(LL_NOTICE, "acq client"); + if (c->flags & CLIENT_SLAVE) + serverLog(LL_NOTICE, "writeToClient has happened"); + while(clientHasPendingReplies(c)) { + serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); if (c->bufpos > 0) { nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; @@ -1739,80 +1759,67 @@ int writeToClient(client *c, int handler_installed) { /* We can only directly read from the replication backlog if the client is a replica, so only attempt to do so if that's the case. */ if (c->flags & CLIENT_SLAVE) { - /* If there are no more pending replies, then we have transmitted the RDB. - * This means further replication commands will be taken straight from the - * replication backlog from now on. */ - std::unique_lock tRDBLock (c->transmittedRDBLock); - - if (c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){ - c->transmittedRDB = true; - } - bool transmittedRDB = c->transmittedRDB; - tRDBLock.unlock(); - /* For replicas, we don't store all the information in the client buffer - * Most of the time (aside from immediately after synchronizing), we read - * from the replication backlog directly */ - if (c->repl_curr_idx != -1 && transmittedRDB){ - std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); + * We always read from the replication backlog directly */ + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); - /* copy global variables into local scope so if they change in between we don't care */ - long long repl_backlog_idx = g_pserver->repl_backlog_idx; - long long repl_backlog_size = g_pserver->repl_backlog_size; + /* Right now, we're bringing in the offStart into the scope + * If repl_batch_offStart is equal to -1, that means the mechanism is disabled + * which implies there is no data to flush and that the global offset is accurate */ + long long offStart = g_pserver->repl_batch_offStart == -1 ? g_pserver->master_repl_offset : g_pserver->repl_batch_offStart; + long long idxStart = getReplIndexFromOffset(offStart); + if (g_pserver->repl_batch_offStart != -1) + serverAssert(idxStart == g_pserver->repl_batch_idxStart); + else + serverAssert(idxStart == g_pserver->repl_backlog_idx); + + if (c->repl_curr_off != -1 && c->repl_curr_off != offStart){ + serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld", + c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart); + + long long curr_idx = getReplIndexFromOffset(c->repl_curr_off); long long nwrittenPart2 = 0; - - ssize_t nrequested; /* The number of bytes requested to write */ /* normal case with no wrap around */ - if (repl_backlog_idx >= c->repl_curr_idx){ - nrequested = repl_backlog_idx - c->repl_curr_idx; - nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_idx - c->repl_curr_idx); + if (idxStart >= curr_idx){ + nwritten = connWrite(c->conn, g_pserver->repl_backlog + curr_idx, idxStart - curr_idx); /* wrap around case, v. rare */ /* also v. buggy so there's that */ } else { - nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx; - nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx); + serverLog(LL_NOTICE, "ROAD OF RESISTANCE"); + nwritten = connWrite(c->conn, g_pserver->repl_backlog + curr_idx, g_pserver->repl_backlog_size - curr_idx); /* only attempt wrapping if we write the correct number of bytes */ - if (nwritten == repl_backlog_size - c->repl_curr_idx){ - long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx); + if (nwritten == g_pserver->repl_backlog_size - curr_idx){ + long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, idxStart); if (nwrittenPart2 != -1) nwritten += nwrittenPart2; - } } - /* only update the replica's current index if bytes were sent */ - - // if (nrequested != nwritten){ - // serverLog(LL_NOTICE, "-----------------------------------------"); - // serverLog(LL_NOTICE, "AFTER THE FACT"); - // serverLog(LL_NOTICE, "requested to write: %ld", nrequested); - // serverLog(LL_NOTICE, "actually written: %ld", nwritten); - // serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); - // serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); - // serverLog(LL_NOTICE, "-----------------------------------------"); - // } - - - if (nwritten == nrequested && g_pserver->repl_backlog_idx == repl_backlog_idx){ - c->repl_curr_idx = -1; /* -1 denotes no more replica writes */ - } - else if (nwritten > 0) - c->repl_curr_idx = (c->repl_curr_idx + nwritten) % repl_backlog_size; - - serverAssert(c->repl_curr_idx < repl_backlog_size); - /* only increment bytes if an error didn't occur */ if (nwritten > 0){ totwritten += nwritten; c->repl_curr_off += nwritten; + if (1){ + serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld", + c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart); + } + serverAssert(c->repl_curr_off <= offStart); + /* If the client offset matches the global offset, we wrote all we needed to, + * in which case, there is no pending write */ + if (c->repl_curr_off == offStart){ + serverLog(LL_NOTICE, "good, %lld", offStart); + c->fPendingReplicaWrite = false; + } else { + serverLog(LL_NOTICE, "mismatch between repl_curr_off (%lld) and offStart (%lld)", c->repl_curr_off, offStart); + } } /* If the second part of a write didn't go through, we still need to register that */ if (nwrittenPart2 == -1) nwritten = -1; } - if (c->flags & CLIENT_SLAVE && handler_installed) - serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed); + // if (c->flags & CLIENT_SLAVE && handler_installed) + // serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed); } @@ -1836,12 +1843,12 @@ int writeToClient(client *c, int handler_installed) { * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime; } - if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) { - if(c->flags & CLIENT_SLAVE && handler_installed){ - serverLog(LL_NOTICE, "Uninstalling handler"); - serverLog(LL_NOTICE, "handler repl_curr_idx: %lld, repl_backlog_size: %lld", c->repl_curr_idx, g_pserver->repl_backlog_size); - serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); - } + if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) { + // if(c->flags & CLIENT_SLAVE && handler_installed){ + // serverLog(LL_NOTICE, "Uninstalling handler"); + // serverLog(LL_NOTICE, "handler repl_curr_idx: %lld, repl_backlog_size: %lld", c->repl_curr_idx, g_pserver->repl_backlog_size); + // serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); + // } c->sentlen = 0; if (handler_installed) connSetWriteHandler(c->conn, NULL); @@ -1857,7 +1864,7 @@ int writeToClient(client *c, int handler_installed) { /* Write event handler. Just send data to the client. */ void sendReplyToClient(connection *conn) { client *c = (client*)connGetPrivateData(conn); - serverLog(LL_NOTICE, "called the sendreplytoclient"); + // serverLog(LL_NOTICE, "called the sendreplytoclient"); if (writeToClient(c,1) == C_ERR) { AeLocker ae; @@ -1886,7 +1893,6 @@ void ProcessPendingAsyncWrites() serverAssert(c->fPendingAsyncWrite); if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) { - std::unique_lock tRDBLock (c->transmittedRDBLock); if (c->replyAsync != nullptr){ zfree(c->replyAsync); c->replyAsync = nullptr; @@ -1898,7 +1904,6 @@ void ProcessPendingAsyncWrites() /* since writes from master to replica can come directly from the replication backlog, * writes may have been signalled without having been copied to the replyAsync buffer, * thus causing the buffer to be NULL */ - std::unique_lock tRDBLock (c->transmittedRDBLock); if (c->replyAsync != nullptr){ int size = c->replyAsync->used; @@ -1919,7 +1924,6 @@ void ProcessPendingAsyncWrites() } c->fPendingAsyncWrite = FALSE; - tRDBLock.unlock(); // Now install the write event handler int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; /* For the fsync=always policy, we want that a given FD is never @@ -2032,8 +2036,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c) || c->repl_curr_idx != -1) { - serverLog(LL_NOTICE, "Setting a write handler for later"); + if (clientHasPendingReplies(c) || c->fPendingReplicaWrite) { if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) { freeClientAsync(c); } @@ -2214,6 +2217,34 @@ static void setProtocolError(const char *errstr, client *c) { c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR); } +static void printQueryBuffer(client *c) { + if (cserver.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) { + sds client = catClientInfoString(sdsempty(),c); + + /* Sample some protocol to given an idea about what was inside. */ + char buf[PROTO_DUMP_LEN*2]; + if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) { + snprintf(buf,sizeof(buf),"%s", c->querybuf+c->qb_pos); + } else { + snprintf(buf,sizeof(buf),"%.*s (... more %zu bytes ...) %.*s", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); + } + + /* Remove non printable chars. */ + char *p = buf; + while (*p != '\0') { + if (!isprint(*p)) *p = '.'; + p++; + } + + /* Log all the client and protocol info. */ + int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING : + LL_VERBOSE; + serverLog(loglevel, + "Query buffer from client %lu: %s. %s", c->id, client, buf); + sdsfree(client); + } +} + /* Process the query buffer for client 'c', setting up the client argument * vector for command execution. Returns C_OK if after running the function * the client has a well-formed ready to be processed command, otherwise @@ -2468,6 +2499,8 @@ void parseClientCommandBuffer(client *c) { } size_t cqueriesStart = c->vecqueuedcmd.size(); + // if (c->flags & CLIENT_MASTER) + // printQueryBuffer(c); if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { diff --git a/src/replication.cpp b/src/replication.cpp index d1181bdf4..97638e833 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -88,18 +88,6 @@ int RDBGeneratedByReplication = 0; void resizeReplicationBacklogForClients(long long newsize); -void setReplIdx(client *c, long long idx, long long off){ - // serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx); - // serverLog(LL_NOTICE, "Repl Index started at: %lld", c->repl_curr_idx); - if (c->repl_curr_idx == -1 && off >= c->repl_curr_off){ - if (prepareClientToWrite(c) != C_OK) return; - c->repl_curr_idx = idx; - c->repl_curr_off = off; - } - // serverLog(LL_NOTICE, "Repl Index has become: %lld", c->repl_curr_idx); - -} - /* --------------------------- Utility functions ---------------------------- */ /* Return the pointer to a string representing the replica ip:listening_port @@ -232,6 +220,7 @@ void createReplicationBacklog(void) { g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); g_pserver->repl_backlog_histlen = 0; g_pserver->repl_backlog_idx = 0; + g_pserver->repl_backlog_start = g_pserver->master_repl_offset; /* We don't have any data inside our buffer, but virtually the first * byte we have is the next byte that will be generated for the @@ -284,6 +273,7 @@ void resizeReplicationBacklog(long long newsize) { g_pserver->repl_backlog = backlog; g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; g_pserver->repl_batch_idxStart = 0; + g_pserver->repl_backlog_start = g_pserver->master_repl_offset; } else { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = (char*)zmalloc(newsize); @@ -296,6 +286,7 @@ void resizeReplicationBacklog(long long newsize) { g_pserver->repl_backlog_size = newsize; } +long long getReplIndexFromOffset(long long offset); /* The above but for when clients need extra replication backlog because ??? */ void resizeReplicationBacklogForClients(long long newsize) { @@ -305,32 +296,8 @@ void resizeReplicationBacklogForClients(long long newsize) { serverLog(LL_NOTICE, "WE HAVE TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize); /* get the critical client size, i.e. the size of the data unflushed to clients */ - long long earliest_off = LONG_LONG_MAX; - long long earliest_idx = -1; - listIter li; - listNode *ln; - listRewind(g_pserver->slaves, &li); - while ((ln = listNext(&li))) { - client *replica = (client*)listNodeValue(ln); - if (replica->repl_curr_off != -1 && replica->repl_curr_off < earliest_off){ - earliest_off = replica->repl_curr_off; - earliest_idx = replica->repl_curr_idx; - } - serverLog(LL_NOTICE, "repl_curr_idx: %lld, earlistidx: %lld", replica->repl_curr_idx, earliest_idx); - } - serverLog(LL_NOTICE, "We are starting with: master_repl_offset: %lld, repl_batch_offStart: %lld, earliest_off: %lld, " - "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, earliest_idx: %lld, repl_backlog_size: %lld", - g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, earliest_off, - g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, earliest_idx, g_pserver->repl_backlog_size - ); + long long earliest_off = g_pserver->repl_lowest_off.load(); - long long new_off = 0, new_idx = 0; - - /* if no earliest offset is found amongst the clients, they are all up to date with the flushed index */ - if (earliest_off == LONG_LONG_MAX && earliest_idx == -1){ - earliest_idx = g_pserver->repl_batch_idxStart; - earliest_off = g_pserver->repl_batch_offStart; - } if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new @@ -339,17 +306,18 @@ void resizeReplicationBacklogForClients(long long newsize) { * worse often we need to alloc additional space before freeing the * old buffer. */ - if (earliest_idx >= 0) { + if (earliest_off != -1) { // We need to keep critical data so we can't shrink less than the hot data in the buffer newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off); char *backlog = (char*)zmalloc(newsize); g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off; + long long earliest_idx = getReplIndexFromOffset(earliest_off); if (g_pserver->repl_backlog_idx >= earliest_idx) { auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx; memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog); - serverLog(LL_NOTICE, "g_pserver->master_repl_offset: %lld, earliest_off: %lld, g_pserver->repl_backlog_idx: %lld, earliest_idx: %lld", - g_pserver->master_repl_offset, earliest_off, g_pserver->repl_backlog_idx, earliest_idx); + serverLog(LL_NOTICE, "g_pserver->master_repl_offset: %lld, earliest_off: %lld, g_pserver->repl_backlog_idx: %lld, earliest_idx: %lld, repl_backlog_start: %lld", + g_pserver->master_repl_offset, earliest_off, g_pserver->repl_backlog_idx, earliest_idx, g_pserver->repl_backlog_start); serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); } else { auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx; @@ -361,20 +329,10 @@ void resizeReplicationBacklogForClients(long long newsize) { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = backlog; g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; - listRewind(g_pserver->slaves, &li); - /* Go through the clients and update their replication indicies */ - while ((ln = listNext(&li))) { - client *replica = (client*)listNodeValue(ln); - if (replica->repl_curr_idx != -1){ - replica->repl_curr_idx -= earliest_idx; - if (replica->repl_curr_idx < 0) - replica->repl_curr_idx += g_pserver->repl_backlog_size; - } - new_idx = replica->repl_curr_idx; - } g_pserver->repl_batch_idxStart -= earliest_idx; if (g_pserver->repl_batch_idxStart < 0) g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size; + g_pserver->repl_backlog_start = earliest_off; } else { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = (char*)zmalloc(newsize); @@ -382,14 +340,15 @@ void resizeReplicationBacklogForClients(long long newsize) { g_pserver->repl_backlog_idx = 0; /* Next byte we have is... the next since the buffer is empty. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + g_pserver->repl_backlog_start = g_pserver->master_repl_offset; } } g_pserver->repl_backlog_size = newsize; serverLog(LL_NOTICE, "We are ending with: master_repl_offset: %lld, repl_batch_offStart: %lld, new_off: %lld, " "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, new_idx: %lld, repl_backlog_size: %lld", - g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, new_off, - g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, new_idx, g_pserver->repl_backlog_size + g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, 0LL, + g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, 0LL, g_pserver->repl_backlog_size ); } @@ -456,11 +415,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) { len -= thislen; p += thislen; g_pserver->repl_backlog_histlen += thislen; - // serverLog(LL_NOTICE, "Pt2 intermediate with: master_repl_offset: %lld, repl_batch_offStart: %lld, " - // "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld", - // g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, - // g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size - // ); } if (g_pserver->repl_backlog_histlen > g_pserver->repl_backlog_size) g_pserver->repl_backlog_histlen = g_pserver->repl_backlog_size; @@ -722,7 +676,7 @@ void replicationFeedSlaves(list *replicas, int dictid, robj **argv, int argc) { void showLatestBacklog(void) { if (g_pserver->repl_backlog == NULL) return; - long long dumplen = 256; + long long dumplen = 1024; if (g_pserver->repl_backlog_histlen < dumplen) dumplen = g_pserver->repl_backlog_histlen; @@ -813,7 +767,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, } decrRefCount(cmdobj); } - +#define BYPASS_PSYNC /* Feed the replica 'c' with the replication backlog starting from the * specified 'offset' up to the end of the backlog. */ long long addReplyReplicationBacklog(client *c, long long offset) { @@ -854,7 +808,8 @@ long long addReplyReplicationBacklog(client *c, long long offset) { len = g_pserver->repl_backlog_histlen - skip; serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); #ifdef BYPASS_PSYNC - setReplIdx(c, j, offset); + c->repl_curr_off = offset - 1; + serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", c->id, getClientPeerId(c), c->repl_curr_off); #else while(len) { long long thislen = @@ -900,6 +855,11 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) { replica->psync_initial_offset = offset; replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + + replica->repl_curr_off = offset; + + serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off); + /* We are going to accumulate the incremental changes for this * replica as well. Set replicaseldb to -1 in order to force to re-emit * a SELECT statement in the replication stream. */ @@ -2006,7 +1966,6 @@ void replicationCreateMasterClient(redisMaster *mi, connection *conn, int dbid) mi->master->reploff_skipped = 0; mi->master->read_reploff = mi->master->reploff; mi->master->puser = NULL; /* This client can do everything. */ - memcpy(mi->master->uuid, mi->master_uuid, UUID_BINARY_LEN); memset(mi->master_uuid, 0, UUID_BINARY_LEN); // make sure people don't use this temp storage buffer @@ -4652,12 +4611,17 @@ void flushReplBacklogToClients() serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); + serverLog(LL_NOTICE, "the master repl offset is %lld", g_pserver->master_repl_offset); + showLatestBacklog(); listIter li; listNode *ln; listRewind(g_pserver->slaves, &li); while ((ln = listNext(&li))) { client *replica = (client*)listNodeValue(ln); + // serverLog(LL_NOTICE, "client %lu is in the party", replica->id); + + // serverLog(LL_NOTICE, "is there a write pending for %lu, %d", replica->id, replica->fPendingReplicaWrite); if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; @@ -4675,11 +4639,21 @@ void flushReplBacklogToClients() asyncUl.lock(); /* If we are online and the RDB has been sent, there is no need to feed the client buffer * We will send our replies directly from the replication backlog instead */ - std::unique_lock tRDBLock (replica->transmittedRDBLock); - if (replica->replstate == SLAVE_STATE_ONLINE && replica->transmittedRDB){ - setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart); - continue; + if (replica->repl_curr_off == -1){ + replica->repl_curr_off = g_pserver->repl_batch_offStart; + + serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off); + } + + /* Only if the there isn't already a pending write do we prepare the client to write */ + if (!replica->fPendingReplicaWrite){ + serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset); + prepareClientToWrite(replica); + replica->fPendingReplicaWrite = true; + } + + continue; } #endif if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { diff --git a/src/server.h b/src/server.h index da1fce52e..9fdf5e0ef 100644 --- a/src/server.h +++ b/src/server.h @@ -1516,8 +1516,11 @@ struct client { long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this replica output buffer should use. */ + long long repl_curr_idx = -1; /* Replication index sent, if this is a replica */ long long repl_curr_off = -1; + int fPendingReplicaWrite; + char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ @@ -1577,12 +1580,8 @@ struct client { robj **argv; size_t argv_len_sumActive = 0; - bool transmittedRDB = false; /* Have we finished transmitting the RDB to this replica? */ - /* If so, we can read from the replication backlog instead of the client buffer */ - // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); - fastlock transmittedRDBLock {"transmittedRDB"}; size_t argv_len_sum() const; }; @@ -2229,6 +2228,8 @@ struct redisServer { that is the next byte will'll write to.*/ long long repl_backlog_off; /* Replication "master offset" of first byte in the replication backlog buffer.*/ + long long repl_backlog_start; /* Used to compute indicies from offsets + basically, index = (offset - start) % size */ fastlock repl_backlog_lock {"replication backlog"}; time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */