Added more synchronization and fixed some data races

Former-commit-id: 183e015dac6f85df1c94d0761e89bc23d9f53319
This commit is contained in:
VivekSainiEQ 2021-05-27 18:57:23 +00:00
parent 8a7cf8a608
commit 4d85f3f6b0
5 changed files with 105 additions and 99 deletions

View File

@ -237,6 +237,8 @@ void execCommand(client *c) {
* backlog with the final EXEC. */ * backlog with the final EXEC. */
if (g_pserver->repl_backlog && was_master && !is_master) { if (g_pserver->repl_backlog && was_master && !is_master) {
const char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; const char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
updateLowestOffsetAmongReplicas();
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
feedReplicationBacklog(execcmd,strlen(execcmd)); feedReplicationBacklog(execcmd,strlen(execcmd));
} }
} }

View File

@ -125,6 +125,7 @@ client *createClient(connection *conn, int iel) {
client_id = g_pserver->next_client_id.fetch_add(1); client_id = g_pserver->next_client_id.fetch_add(1);
c->iel = iel; c->iel = iel;
c->id = client_id; c->id = client_id;
sprintf(c->lock.szName, "client %lu", client_id);
c->resp = 2; c->resp = 2;
c->conn = conn; c->conn = conn;
c->name = NULL; c->name = NULL;
@ -1677,8 +1678,7 @@ int writeToClient(client *c, int handler_installed) {
serverAssertDebug(FCorrectThread(c)); serverAssertDebug(FCorrectThread(c));
std::unique_lock<decltype(c->lock)> lock(c->lock); std::unique_lock<decltype(c->lock)> lock(c->lock);
// serverLog(LL_NOTICE, "acq client");
while(clientHasPendingReplies(c)) { while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) { if (c->bufpos > 0) {
@ -1736,82 +1736,87 @@ int writeToClient(client *c, int handler_installed) {
!(c->flags & CLIENT_SLAVE)) break; !(c->flags & CLIENT_SLAVE)) break;
} }
/* If there are no more pending replies, then we have transmitted the RDB. /* We can only directly read from the replication backlog if the client
* This means further replication commands will be taken straight from the is a replica, so only attempt to do so if that's the case. */
* replication backlog from now on. */ if (c->flags & CLIENT_SLAVE) {
/* If there are no more pending replies, then we have transmitted the RDB.
* This means further replication commands will be taken straight from the
* replication backlog from now on. */
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock); if (c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){
c->transmittedRDB = true;
}
bool transmittedRDB = c->transmittedRDB;
tRDBLock.unlock();
if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){ /* For replicas, we don't store all the information in the client buffer
c->transmittedRDB = true; * Most of the time (aside from immediately after synchronizing), we read
} * from the replication backlog directly */
bool transmittedRDB = c->transmittedRDB; if (c->repl_curr_idx != -1 && transmittedRDB){
tRDBLock.unlock(); std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
/* if this is a write to a replica, it's coming straight from the replication backlog */ /* copy global variables into local scope so if they change in between we don't care */
long long repl_backlog_idx = g_pserver->repl_backlog_idx; long long repl_backlog_idx = g_pserver->repl_backlog_idx;
long long repl_backlog_size = g_pserver->repl_backlog_size;
long long nwrittenPart2 = 0;
/* For replicas, we don't store all the information in the client buffer ssize_t nrequested; /* The number of bytes requested to write */
* Most of the time (aside from immediately after synchronizing), we read /* normal case with no wrap around */
* from the replication backlog directly */ if (repl_backlog_idx >= c->repl_curr_idx){
if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1 && transmittedRDB){ nrequested = repl_backlog_idx - c->repl_curr_idx;
/* copy global variables into local scope so if they change in between we don't care */ nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_idx - c->repl_curr_idx);
long long repl_backlog_size = g_pserver->repl_backlog_size; /* wrap around case, v. rare */
long long nwrittenPart2 = 0; /* also v. buggy so there's that */
} else {
nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx;
nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == repl_backlog_size - c->repl_curr_idx){
long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx);
if (nwrittenPart2 != -1)
nwritten += nwrittenPart2;
ssize_t nrequested; /* The number of bytes requested to write */ }
/* normal case with no wrap around */ }
if (repl_backlog_idx >= c->repl_curr_idx){
nrequested = repl_backlog_idx - c->repl_curr_idx;
nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_idx - c->repl_curr_idx);
/* wrap around case, v. rare */
/* also v. buggy so there's that */
} else {
nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx;
nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx);
/* only attempt wrapping if we write the correct number of bytes */
if (nwritten == repl_backlog_size - c->repl_curr_idx){
long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx);
if (nwrittenPart2 != -1)
nwritten += nwrittenPart2;
} /* only update the replica's current index if bytes were sent */
// if (nrequested != nwritten){
// serverLog(LL_NOTICE, "-----------------------------------------");
// serverLog(LL_NOTICE, "AFTER THE FACT");
// serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
// serverLog(LL_NOTICE, "actually written: %ld", nwritten);
// serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
// serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
// serverLog(LL_NOTICE, "-----------------------------------------");
// }
if (nwritten == nrequested && g_pserver->repl_backlog_idx == repl_backlog_idx){
c->repl_curr_idx = -1; /* -1 denotes no more replica writes */
}
else if (nwritten > 0)
c->repl_curr_idx = (c->repl_curr_idx + nwritten) % repl_backlog_size;
serverAssert(c->repl_curr_idx < repl_backlog_size);
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
totwritten += nwritten;
c->repl_curr_off += nwritten;
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwrittenPart2 == -1) nwritten = -1;
} }
/* only update the replica's current index if bytes were sent */ if (c->flags & CLIENT_SLAVE && handler_installed)
serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed);
// if (nrequested != nwritten){
// serverLog(LL_NOTICE, "-----------------------------------------");
// serverLog(LL_NOTICE, "AFTER THE FACT");
// serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
// serverLog(LL_NOTICE, "actually written: %ld", nwritten);
// serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
// serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
// serverLog(LL_NOTICE, "-----------------------------------------");
// }
if (nwritten == nrequested && g_pserver->repl_backlog_idx == repl_backlog_idx){
c->repl_curr_idx = -1; /* -1 denotes no more replica writes */
}
else if (nwritten > 0)
c->repl_curr_idx = (c->repl_curr_idx + nwritten) % repl_backlog_size;
serverAssert(c->repl_curr_idx < repl_backlog_size);
/* only increment bytes if an error didn't occur */
if (nwritten > 0){
totwritten += nwritten;
c->repl_curr_off += nwritten;
}
/* If the second part of a write didn't go through, we still need to register that */
if (nwrittenPart2 == -1) nwritten = -1;
} }
if (c->flags & CLIENT_SLAVE && handler_installed) // serverLog(LL_NOTICE, "rel client");
serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed);
g_pserver->stat_net_output_bytes += totwritten; g_pserver->stat_net_output_bytes += totwritten;
if (nwritten == -1) { if (nwritten == -1) {
if (connGetState(c->conn) == CONN_STATE_CONNECTED) { if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
@ -1834,7 +1839,7 @@ int writeToClient(client *c, int handler_installed) {
if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) { if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) {
if(c->flags & CLIENT_SLAVE && handler_installed){ if(c->flags & CLIENT_SLAVE && handler_installed){
serverLog(LL_NOTICE, "Uninstalling handler"); serverLog(LL_NOTICE, "Uninstalling handler");
serverLog(LL_NOTICE, "handler repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size); serverLog(LL_NOTICE, "handler repl_curr_idx: %lld, repl_backlog_size: %lld", c->repl_curr_idx, g_pserver->repl_backlog_size);
serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
} }
c->sentlen = 0; c->sentlen = 0;

View File

@ -56,9 +56,11 @@ void putSlaveOnline(client *replica);
int cancelReplicationHandshake(redisMaster *mi); int cancelReplicationHandshake(redisMaster *mi);
static void propagateMasterStaleKeys(); static void propagateMasterStaleKeys();
/* gets the lowest offset amongst all of the replicas */ /* gets the lowest offset amongst all of the replicas and stores it globally*/
long long getLowestOffsetAmongReplicas(){ void updateLowestOffsetAmongReplicas(){
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
serverAssert(!g_pserver->repl_backlog_lock.fOwnLock());
// serverLog(LL_NOTICE, "off- have repl");
long long min_offset = LONG_LONG_MAX; long long min_offset = LONG_LONG_MAX;
listIter li; listIter li;
listNode *ln; listNode *ln;
@ -69,16 +71,15 @@ long long getLowestOffsetAmongReplicas(){
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue;
if (replica->repl_curr_idx == -1) continue;
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock); std::unique_lock<fastlock> ul(replica->lock);
if (FCorrectThread(replica)) // serverLog(LL_NOTICE, "off- acq client");
ul.lock();
min_offset = std::min(min_offset, replica->repl_curr_off); min_offset = std::min(min_offset, replica->repl_curr_off);
// serverLog(LL_NOTICE, "off- rel client");
} }
/* return -1 if no other minimum was found */ /* return -1 if no other minimum was found */
return min_offset == LONG_LONG_MAX ? -1 : min_offset; g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
} }
/* We take a global flag to remember if this instance generated an RDB /* We take a global flag to remember if this instance generated an RDB
* because of replication, so that we can remove the RDB file in case * because of replication, so that we can remove the RDB file in case
@ -412,11 +413,12 @@ void freeReplicationBacklog(void) {
* the backlog without incrementing the offset. */ * the backlog without incrementing the offset. */
void feedReplicationBacklog(const void *ptr, size_t len) { void feedReplicationBacklog(const void *ptr, size_t len) {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
serverAssert(g_pserver->repl_backlog_lock.fOwnLock());
const unsigned char *p = (const unsigned char*)ptr; const unsigned char *p = (const unsigned char*)ptr;
if (g_pserver->repl_batch_idxStart >= 0) { if (g_pserver->repl_batch_idxStart >= 0) {
/* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */ /* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */
long long lower_bound = getLowestOffsetAmongReplicas(); long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
if (lower_bound == -1) if (lower_bound == -1)
lower_bound = g_pserver->repl_batch_offStart; lower_bound = g_pserver->repl_batch_offStart;
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
@ -441,10 +443,9 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
g_pserver->master_repl_offset += len; g_pserver->master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every /* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */ * iteration and rewind the "idx" index if we reach the limit. */
while(len) { while(len) {
size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx; size_t thislen = g_pserver->repl_backlog_size - g_pserver->repl_backlog_idx;
if (thislen > len) thislen = len; if (thislen > len) thislen = len;
@ -598,6 +599,8 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc)
serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL));
bool fSendRaw = !g_pserver->fActiveReplica; bool fSendRaw = !g_pserver->fActiveReplica;
updateLowestOffsetAmongReplicas();
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
/* Send SELECT command to every replica if needed. */ /* Send SELECT command to every replica if needed. */
if (g_pserver->replicaseldb != dictid) { if (g_pserver->replicaseldb != dictid) {
@ -619,7 +622,9 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc)
/* Add the SELECT command into the backlog. */ /* Add the SELECT command into the backlog. */
/* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */ /* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); if (g_pserver->repl_backlog && fSendRaw) {
feedReplicationBacklogWithObject(selectcmd);
}
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd); decrRefCount(selectcmd);
@ -632,7 +637,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc)
if (fSendRaw) if (fSendRaw)
{ {
char aux[LONG_STR_SIZE+3]; char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */ /* Add the multi bulk reply length. */
aux[0] = '*'; aux[0] = '*';
int multilen = ll2string(aux+1,sizeof(aux)-1,argc); int multilen = ll2string(aux+1,sizeof(aux)-1,argc);
@ -759,7 +763,11 @@ void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) {
printf("\n"); printf("\n");
} }
if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen); if (g_pserver->repl_backlog){
updateLowestOffsetAmongReplicas();
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
feedReplicationBacklog(buf,buflen);
}
} }
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
@ -4662,6 +4670,9 @@ void flushReplBacklogToClients()
#ifdef BYPASS_BUFFER #ifdef BYPASS_BUFFER
{ {
std::unique_lock<fastlock> asyncUl(replica->lock, std::defer_lock);
if (!FCorrectThread(replica))
asyncUl.lock();
/* If we are online and the RDB has been sent, there is no need to feed the client buffer /* If we are online and the RDB has been sent, there is no need to feed the client buffer
* We will send our replies directly from the replication backlog instead */ * We will send our replies directly from the replication backlog instead */
std::unique_lock<fastlock> tRDBLock (replica->transmittedRDBLock); std::unique_lock<fastlock> tRDBLock (replica->transmittedRDBLock);
@ -4694,21 +4705,5 @@ void flushReplBacklogToClients()
// This may be called multiple times per "frame" so update with our progress flushing to clients // This may be called multiple times per "frame" so update with our progress flushing to clients
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
} else if (getLowestOffsetAmongReplicas() != -1){ }
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
if (FCorrectThread(replica))
ul.lock();
/* try to force prepare client to write i guess? */
if (replica->repl_curr_idx != -1){
if (prepareClientToWrite(replica) != C_OK) continue;
}
}
}
} }

View File

@ -2924,6 +2924,7 @@ void initServerConfig(void) {
g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER; g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER;
g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
g_pserver->master_repl_offset = 0; g_pserver->master_repl_offset = 0;
g_pserver->repl_lowest_off.store(-1, std::memory_order_seq_cst);
/* Replication partial resync backlog */ /* Replication partial resync backlog */
g_pserver->repl_backlog = NULL; g_pserver->repl_backlog = NULL;

View File

@ -2241,6 +2241,8 @@ struct redisServer {
int repl_diskless_load; /* Slave parse RDB directly from the socket. int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */ * see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
std::atomic <long long> repl_lowest_off; /* The lowest offset amongst all clients
Updated before calls to feed the replication backlog */
/* Replication (replica) */ /* Replication (replica) */
list *masters; list *masters;
int enable_multimaster; int enable_multimaster;
@ -2838,6 +2840,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
void rdbPipeWriteHandlerConnRemoved(struct connection *conn); void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire); void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire);
void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire); void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire);
void updateLowestOffsetAmongReplicas(void);
/* Generic persistence functions */ /* Generic persistence functions */
void startLoadingFile(FILE* fp, const char * filename, int rdbflags); void startLoadingFile(FILE* fp, const char * filename, int rdbflags);