diff --git a/src/multi.cpp b/src/multi.cpp index 9df72383d..9fd5206fb 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -237,6 +237,8 @@ void execCommand(client *c) { * backlog with the final EXEC. */ if (g_pserver->repl_backlog && was_master && !is_master) { const char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; + updateLowestOffsetAmongReplicas(); + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); feedReplicationBacklog(execcmd,strlen(execcmd)); } } diff --git a/src/networking.cpp b/src/networking.cpp index 176693501..caefd6d1e 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -125,6 +125,7 @@ client *createClient(connection *conn, int iel) { client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; c->id = client_id; + sprintf(c->lock.szName, "client %lu", client_id); c->resp = 2; c->conn = conn; c->name = NULL; @@ -1677,8 +1678,7 @@ int writeToClient(client *c, int handler_installed) { serverAssertDebug(FCorrectThread(c)); std::unique_locklock)> lock(c->lock); - - + // serverLog(LL_NOTICE, "acq client"); while(clientHasPendingReplies(c)) { if (c->bufpos > 0) { @@ -1736,82 +1736,87 @@ int writeToClient(client *c, int handler_installed) { !(c->flags & CLIENT_SLAVE)) break; } - /* If there are no more pending replies, then we have transmitted the RDB. - * This means further replication commands will be taken straight from the - * replication backlog from now on. */ + /* We can only directly read from the replication backlog if the client + is a replica, so only attempt to do so if that's the case. */ + if (c->flags & CLIENT_SLAVE) { + /* If there are no more pending replies, then we have transmitted the RDB. + * This means further replication commands will be taken straight from the + * replication backlog from now on. */ + std::unique_lock tRDBLock (c->transmittedRDBLock); - std::unique_lock tRDBLock (c->transmittedRDBLock); + if (c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){ + c->transmittedRDB = true; + } + bool transmittedRDB = c->transmittedRDB; + tRDBLock.unlock(); - if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){ - c->transmittedRDB = true; - } - bool transmittedRDB = c->transmittedRDB; - tRDBLock.unlock(); + /* For replicas, we don't store all the information in the client buffer + * Most of the time (aside from immediately after synchronizing), we read + * from the replication backlog directly */ + if (c->repl_curr_idx != -1 && transmittedRDB){ + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); - /* if this is a write to a replica, it's coming straight from the replication backlog */ - long long repl_backlog_idx = g_pserver->repl_backlog_idx; + /* copy global variables into local scope so if they change in between we don't care */ + long long repl_backlog_idx = g_pserver->repl_backlog_idx; + long long repl_backlog_size = g_pserver->repl_backlog_size; + long long nwrittenPart2 = 0; - /* For replicas, we don't store all the information in the client buffer - * Most of the time (aside from immediately after synchronizing), we read - * from the replication backlog directly */ - if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1 && transmittedRDB){ - /* copy global variables into local scope so if they change in between we don't care */ - long long repl_backlog_size = g_pserver->repl_backlog_size; - long long nwrittenPart2 = 0; + ssize_t nrequested; /* The number of bytes requested to write */ + /* normal case with no wrap around */ + if (repl_backlog_idx >= c->repl_curr_idx){ + nrequested = repl_backlog_idx - c->repl_curr_idx; + nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_idx - c->repl_curr_idx); + /* wrap around case, v. rare */ + /* also v. buggy so there's that */ + } else { + nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx; + nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx); + /* only attempt wrapping if we write the correct number of bytes */ + if (nwritten == repl_backlog_size - c->repl_curr_idx){ + long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx); + if (nwrittenPart2 != -1) + nwritten += nwrittenPart2; - ssize_t nrequested; /* The number of bytes requested to write */ - /* normal case with no wrap around */ - if (repl_backlog_idx >= c->repl_curr_idx){ - nrequested = repl_backlog_idx - c->repl_curr_idx; - nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_idx - c->repl_curr_idx); - /* wrap around case, v. rare */ - /* also v. buggy so there's that */ - } else { - nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx; - nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx); - /* only attempt wrapping if we write the correct number of bytes */ - if (nwritten == repl_backlog_size - c->repl_curr_idx){ - long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx); - if (nwrittenPart2 != -1) - nwritten += nwrittenPart2; + } + } - } + /* only update the replica's current index if bytes were sent */ + + // if (nrequested != nwritten){ + // serverLog(LL_NOTICE, "-----------------------------------------"); + // serverLog(LL_NOTICE, "AFTER THE FACT"); + // serverLog(LL_NOTICE, "requested to write: %ld", nrequested); + // serverLog(LL_NOTICE, "actually written: %ld", nwritten); + // serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); + // serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); + // serverLog(LL_NOTICE, "-----------------------------------------"); + // } + + + if (nwritten == nrequested && g_pserver->repl_backlog_idx == repl_backlog_idx){ + c->repl_curr_idx = -1; /* -1 denotes no more replica writes */ + } + else if (nwritten > 0) + c->repl_curr_idx = (c->repl_curr_idx + nwritten) % repl_backlog_size; + + serverAssert(c->repl_curr_idx < repl_backlog_size); + + /* only increment bytes if an error didn't occur */ + if (nwritten > 0){ + totwritten += nwritten; + c->repl_curr_off += nwritten; + } + + /* If the second part of a write didn't go through, we still need to register that */ + if (nwrittenPart2 == -1) nwritten = -1; } - /* only update the replica's current index if bytes were sent */ + if (c->flags & CLIENT_SLAVE && handler_installed) + serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed); - // if (nrequested != nwritten){ - // serverLog(LL_NOTICE, "-----------------------------------------"); - // serverLog(LL_NOTICE, "AFTER THE FACT"); - // serverLog(LL_NOTICE, "requested to write: %ld", nrequested); - // serverLog(LL_NOTICE, "actually written: %ld", nwritten); - // serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); - // serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); - // serverLog(LL_NOTICE, "-----------------------------------------"); - // } - - - if (nwritten == nrequested && g_pserver->repl_backlog_idx == repl_backlog_idx){ - c->repl_curr_idx = -1; /* -1 denotes no more replica writes */ - } - else if (nwritten > 0) - c->repl_curr_idx = (c->repl_curr_idx + nwritten) % repl_backlog_size; - - serverAssert(c->repl_curr_idx < repl_backlog_size); - - /* only increment bytes if an error didn't occur */ - if (nwritten > 0){ - totwritten += nwritten; - c->repl_curr_off += nwritten; - } - - /* If the second part of a write didn't go through, we still need to register that */ - if (nwrittenPart2 == -1) nwritten = -1; } - if (c->flags & CLIENT_SLAVE && handler_installed) - serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed); - + // serverLog(LL_NOTICE, "rel client"); g_pserver->stat_net_output_bytes += totwritten; if (nwritten == -1) { if (connGetState(c->conn) == CONN_STATE_CONNECTED) { @@ -1834,7 +1839,7 @@ int writeToClient(client *c, int handler_installed) { if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) { if(c->flags & CLIENT_SLAVE && handler_installed){ serverLog(LL_NOTICE, "Uninstalling handler"); - serverLog(LL_NOTICE, "handler repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); + serverLog(LL_NOTICE, "handler repl_curr_idx: %lld, repl_backlog_size: %lld", c->repl_curr_idx, g_pserver->repl_backlog_size); serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); } c->sentlen = 0; diff --git a/src/replication.cpp b/src/replication.cpp index ad79f4887..d1181bdf4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -56,9 +56,11 @@ void putSlaveOnline(client *replica); int cancelReplicationHandshake(redisMaster *mi); static void propagateMasterStaleKeys(); -/* gets the lowest offset amongst all of the replicas */ -long long getLowestOffsetAmongReplicas(){ +/* gets the lowest offset amongst all of the replicas and stores it globally*/ +void updateLowestOffsetAmongReplicas(){ serverAssert(GlobalLocksAcquired()); + serverAssert(!g_pserver->repl_backlog_lock.fOwnLock()); + // serverLog(LL_NOTICE, "off- have repl"); long long min_offset = LONG_LONG_MAX; listIter li; listNode *ln; @@ -69,16 +71,15 @@ long long getLowestOffsetAmongReplicas(){ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; - if (replica->repl_curr_idx == -1) continue; - std::unique_lock ul(replica->lock, std::defer_lock); - if (FCorrectThread(replica)) - ul.lock(); + std::unique_lock ul(replica->lock); + // serverLog(LL_NOTICE, "off- acq client"); - min_offset = std::min(min_offset, replica->repl_curr_off); + min_offset = std::min(min_offset, replica->repl_curr_off); + // serverLog(LL_NOTICE, "off- rel client"); } /* return -1 if no other minimum was found */ - return min_offset == LONG_LONG_MAX ? -1 : min_offset; + g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst); } /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case @@ -412,11 +413,12 @@ void freeReplicationBacklog(void) { * the backlog without incrementing the offset. */ void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); + serverAssert(g_pserver->repl_backlog_lock.fOwnLock()); const unsigned char *p = (const unsigned char*)ptr; if (g_pserver->repl_batch_idxStart >= 0) { /* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */ - long long lower_bound = getLowestOffsetAmongReplicas(); + long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); if (lower_bound == -1) lower_bound = g_pserver->repl_batch_offStart; long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; @@ -441,10 +443,9 @@ void feedReplicationBacklog(const void *ptr, size_t len) { g_pserver->master_repl_offset += len; - - /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ + while(len) { size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx; if (thislen > len) thislen = len; @@ -598,6 +599,8 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); bool fSendRaw = !g_pserver->fActiveReplica; + updateLowestOffsetAmongReplicas(); + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); /* Send SELECT command to every replica if needed. */ if (g_pserver->replicaseldb != dictid) { @@ -619,7 +622,9 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) /* Add the SELECT command into the backlog. */ /* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */ - if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); + if (g_pserver->repl_backlog && fSendRaw) { + feedReplicationBacklogWithObject(selectcmd); + } if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); @@ -632,7 +637,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) if (fSendRaw) { char aux[LONG_STR_SIZE+3]; - /* Add the multi bulk reply length. */ aux[0] = '*'; int multilen = ll2string(aux+1,sizeof(aux)-1,argc); @@ -759,7 +763,11 @@ void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) { printf("\n"); } - if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen); + if (g_pserver->repl_backlog){ + updateLowestOffsetAmongReplicas(); + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); + feedReplicationBacklog(buf,buflen); + } } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { @@ -4662,6 +4670,9 @@ void flushReplBacklogToClients() #ifdef BYPASS_BUFFER { + std::unique_lock asyncUl(replica->lock, std::defer_lock); + if (!FCorrectThread(replica)) + asyncUl.lock(); /* If we are online and the RDB has been sent, there is no need to feed the client buffer * We will send our replies directly from the replication backlog instead */ std::unique_lock tRDBLock (replica->transmittedRDBLock); @@ -4694,21 +4705,5 @@ void flushReplBacklogToClients() // This may be called multiple times per "frame" so update with our progress flushing to clients g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - } else if (getLowestOffsetAmongReplicas() != -1){ - listIter li; - listNode *ln; - listRewind(g_pserver->slaves, &li); - while ((ln = listNext(&li))) { - client *replica = (client*)listNodeValue(ln); - - std::unique_lock ul(replica->lock, std::defer_lock); - if (FCorrectThread(replica)) - ul.lock(); - - /* try to force prepare client to write i guess? */ - if (replica->repl_curr_idx != -1){ - if (prepareClientToWrite(replica) != C_OK) continue; - } - } - } + } } diff --git a/src/server.cpp b/src/server.cpp index 9664a4a6b..439e1aeff 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2924,6 +2924,7 @@ void initServerConfig(void) { g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER; g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; g_pserver->master_repl_offset = 0; + g_pserver->repl_lowest_off.store(-1, std::memory_order_seq_cst); /* Replication partial resync backlog */ g_pserver->repl_backlog = NULL; diff --git a/src/server.h b/src/server.h index 14005e7d5..da1fce52e 100644 --- a/src/server.h +++ b/src/server.h @@ -2241,6 +2241,8 @@ struct redisServer { int repl_diskless_load; /* Slave parse RDB directly from the socket. * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ + std::atomic repl_lowest_off; /* The lowest offset amongst all clients + Updated before calls to feed the replication backlog */ /* Replication (replica) */ list *masters; int enable_multimaster; @@ -2838,6 +2840,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, void rdbPipeWriteHandlerConnRemoved(struct connection *conn); void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire); void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire); +void updateLowestOffsetAmongReplicas(void); /* Generic persistence functions */ void startLoadingFile(FILE* fp, const char * filename, int rdbflags);