Relaxed locking, should run faster now

Former-commit-id: 5cec4d026dc1766b9ecbade6ec4b9d0e75a94e0f
This commit is contained in:
Ubuntu 2021-06-14 19:30:49 +00:00
parent 15f6acae11
commit 80dddab0c4
3 changed files with 16 additions and 9 deletions

View File

@ -268,7 +268,6 @@ void execCommand(client *c) {
if (g_pserver->repl_backlog && was_master && !is_master) { if (g_pserver->repl_backlog && was_master && !is_master) {
const char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; const char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
updateLowestOffsetAmongReplicas(); updateLowestOffsetAmongReplicas();
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
feedReplicationBacklog(execcmd,strlen(execcmd)); feedReplicationBacklog(execcmd,strlen(execcmd));
} }
afterPropagateExec(); afterPropagateExec();

View File

@ -1856,6 +1856,8 @@ int writeToClient(client *c, int handler_installed) {
* We always read from the replication backlog directly */ * We always read from the replication backlog directly */
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock); std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
// serverLog(LL_NOTICE, "written to handler");
long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off);
serverAssert(c->repl_curr_off != -1); serverAssert(c->repl_curr_off != -1);
@ -1884,8 +1886,12 @@ int writeToClient(client *c, int handler_installed) {
serverAssert(c->repl_curr_off <= c->repl_end_off); serverAssert(c->repl_curr_off <= c->repl_end_off);
/* If the client offset matches the global offset, we wrote all we needed to, /* If the client offset matches the global offset, we wrote all we needed to,
* in which case, there is no pending write */ * in which case, there is no pending write */
if (c->repl_curr_off == c->repl_end_off){ if (c->repl_curr_off == c->repl_end_off){
// serverLog(LL_NOTICE, "Successfully wrote up until %lld", c->repl_end_off);
c->fPendingReplicaWrite = false; c->fPendingReplicaWrite = false;
} else {
// serverLog(LL_NOTICE, "Wrote to %lld out of %lld", c->repl_curr_off, c->repl_end_off);
} }
} }

View File

@ -241,6 +241,8 @@ void resizeReplicationBacklog(long long newsize) {
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
if (g_pserver->repl_backlog_size == newsize) return; if (g_pserver->repl_backlog_size == newsize) return;
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
if (g_pserver->repl_backlog != NULL) { if (g_pserver->repl_backlog != NULL) {
/* What we actually do is to flush the old buffer and realloc a new /* What we actually do is to flush the old buffer and realloc a new
* empty one. It will refill with new data incrementally. * empty one. It will refill with new data incrementally.
@ -310,9 +312,9 @@ void freeReplicationBacklog(void) {
* the backlog without incrementing the offset. */ * the backlog without incrementing the offset. */
void feedReplicationBacklog(const void *ptr, size_t len) { void feedReplicationBacklog(const void *ptr, size_t len) {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
serverAssert(g_pserver->repl_backlog_lock.fOwnLock());
const unsigned char *p = (const unsigned char*)ptr; const unsigned char *p = (const unsigned char*)ptr;
if (g_pserver->repl_batch_idxStart >= 0) { if (g_pserver->repl_batch_idxStart >= 0) {
/* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */ /* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */
long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
@ -320,10 +322,11 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
lower_bound = g_pserver->repl_batch_offStart; lower_bound = g_pserver->repl_batch_offStart;
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
if (minimumsize > g_pserver->repl_backlog_size) { if (minimumsize > g_pserver->repl_backlog_size) {
g_pserver->repl_backlog_lock.unlock();
flushReplBacklogToClients(); flushReplBacklogToClients();
g_pserver->repl_backlog_lock.lock(); minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
minimumsize = g_pserver->master_repl_offset + len - lower_bound +1;
serverLog(LL_NOTICE, "minimumsize: %lld, g_pserver->master_repl_offset: %lld, len: %lu, lower_bound: %lld",
minimumsize, g_pserver->master_repl_offset, len, lower_bound);
if (minimumsize > g_pserver->repl_backlog_size) { if (minimumsize > g_pserver->repl_backlog_size) {
// This is an emergency overflow, we better resize to fit // This is an emergency overflow, we better resize to fit
@ -492,7 +495,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc)
bool fSendRaw = !g_pserver->fActiveReplica; bool fSendRaw = !g_pserver->fActiveReplica;
updateLowestOffsetAmongReplicas(); updateLowestOffsetAmongReplicas();
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
/* Send SELECT command to every replica if needed. */ /* Send SELECT command to every replica if needed. */
if (g_pserver->replicaseldb != dictid) { if (g_pserver->replicaseldb != dictid) {
@ -655,7 +657,6 @@ void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) {
if (g_pserver->repl_backlog){ if (g_pserver->repl_backlog){
updateLowestOffsetAmongReplicas(); updateLowestOffsetAmongReplicas();
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
feedReplicationBacklog(buf,buflen); feedReplicationBacklog(buf,buflen);
} }
} }
@ -750,7 +751,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
c->repl_curr_off = offset - 1; c->repl_curr_off = offset - 1;
serverLog(LL_NOTICE, "Client %s, replica offset %lld in psync", replicationGetSlaveName(c), c->repl_curr_off); // serverLog(LL_NOTICE, "Client %s, replica offset %lld in psync", replicationGetSlaveName(c), c->repl_curr_off);
c->repl_end_off = g_pserver->master_repl_offset; c->repl_end_off = g_pserver->master_repl_offset;
/* Force the partial sync to be queued */ /* Force the partial sync to be queued */
@ -4988,7 +4989,7 @@ void flushReplBacklogToClients()
if (!canFeedReplicaReplBuffer(replica)) continue; if (!canFeedReplicaReplBuffer(replica)) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue;
serverLog(LL_NOTICE, "Client %s, replica offset %lld", replicationGetSlaveName(replica), replica->repl_curr_off); // serverLog(LL_NOTICE, "Client %s, replica offset %lld", replicationGetSlaveName(replica), replica->repl_curr_off);
std::unique_lock<fastlock> ul(replica->lock); std::unique_lock<fastlock> ul(replica->lock);
if (!FCorrectThread(replica)) if (!FCorrectThread(replica))
@ -5013,6 +5014,7 @@ void flushReplBacklogToClients()
// This may be called multiple times per "frame" so update with our progress flushing to clients // This may be called multiple times per "frame" so update with our progress flushing to clients
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
updateLowestOffsetAmongReplicas();
} }
} }