Seems to pass multithreaded test cases, thank the lord
Former-commit-id: 6cbf70cfff5735f3d4ef2e980945b4b1a1f85971
This commit is contained in:
parent
09c4939287
commit
9e344f2577
@ -277,8 +277,9 @@ void clientInstallAsyncWriteHandler(client *c) {
|
||||
int prepareClientToWrite(client *c) {
|
||||
bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread
|
||||
|
||||
if (c->flags & CLIENT_SLAVE)
|
||||
if (c->flags & CLIENT_SLAVE){
|
||||
serverLog(LL_NOTICE, "got into prepareClientToWrite");
|
||||
}
|
||||
|
||||
if (!fAsync) {
|
||||
serverAssert(c->conn == nullptr || c->lock.fOwnLock());
|
||||
@ -1758,7 +1759,7 @@ int writeToClient(client *c, int handler_installed) {
|
||||
|
||||
/* We can only directly read from the replication backlog if the client
|
||||
is a replica, so only attempt to do so if that's the case. */
|
||||
if (c->flags & CLIENT_SLAVE) {
|
||||
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
|
||||
/* For replicas, we don't store all the information in the client buffer
|
||||
* We always read from the replication backlog directly */
|
||||
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
|
||||
@ -1766,14 +1767,12 @@ int writeToClient(client *c, int handler_installed) {
|
||||
/* Right now, we're bringing in the offStart into the scope
|
||||
* If repl_batch_offStart is equal to -1, that means the mechanism is disabled
|
||||
* which implies there is no data to flush and that the global offset is accurate */
|
||||
long long offStart = g_pserver->repl_batch_offStart == -1 ? g_pserver->master_repl_offset : g_pserver->repl_batch_offStart;
|
||||
// long long offStart = g_pserver->repl_batch_offStart == -1 ? g_pserver->master_repl_offset : g_pserver->repl_batch_offStart;
|
||||
long long offStart = c->repl_end_off;
|
||||
long long idxStart = getReplIndexFromOffset(offStart);
|
||||
if (g_pserver->repl_batch_offStart != -1)
|
||||
serverAssert(idxStart == g_pserver->repl_batch_idxStart);
|
||||
else
|
||||
serverAssert(idxStart == g_pserver->repl_backlog_idx);
|
||||
|
||||
if (c->repl_curr_off != -1 && c->repl_curr_off != offStart){
|
||||
serverAssert(c->repl_curr_off != -1);
|
||||
if (c->repl_curr_off != offStart){
|
||||
serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld",
|
||||
c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart);
|
||||
|
||||
@ -1846,7 +1845,7 @@ int writeToClient(client *c, int handler_installed) {
|
||||
if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) {
|
||||
// if(c->flags & CLIENT_SLAVE && handler_installed){
|
||||
// serverLog(LL_NOTICE, "Uninstalling handler");
|
||||
// serverLog(LL_NOTICE, "handler repl_curr_idx: %lld, repl_backlog_size: %lld", c->repl_curr_idx, g_pserver->repl_backlog_size);
|
||||
// serverLog(LL_NOTICE, "repl_backlog_size: %lld", g_pserver->repl_backlog_size);
|
||||
// serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
|
||||
// }
|
||||
c->sentlen = 0;
|
||||
|
@ -382,7 +382,9 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
||||
lower_bound = g_pserver->repl_batch_offStart;
|
||||
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
|
||||
if (minimumsize > g_pserver->repl_backlog_size) {
|
||||
g_pserver->repl_backlog_lock.unlock();
|
||||
flushReplBacklogToClients();
|
||||
g_pserver->repl_backlog_lock.lock();
|
||||
minimumsize = g_pserver->master_repl_offset + len - lower_bound +1;
|
||||
|
||||
if (minimumsize > g_pserver->repl_backlog_size) {
|
||||
@ -809,6 +811,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
|
||||
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
||||
#ifdef BYPASS_PSYNC
|
||||
c->repl_curr_off = offset - 1;
|
||||
c->repl_end_off = g_pserver->master_repl_offset;
|
||||
serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", c->id, getClientPeerId(c), c->repl_curr_off);
|
||||
|
||||
/* Force the partial sync to be queued */
|
||||
@ -861,6 +864,7 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) {
|
||||
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
||||
|
||||
replica->repl_curr_off = offset;
|
||||
replica->repl_end_off = g_pserver->master_repl_offset;
|
||||
|
||||
serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off);
|
||||
|
||||
@ -4634,19 +4638,18 @@ void flushReplBacklogToClients()
|
||||
fAsyncWrite = true;
|
||||
|
||||
|
||||
/* If we are online and the RDB has been sent, there is no need to feed the client buffer
|
||||
* We will send our replies directly from the replication backlog instead */
|
||||
#ifdef BYPASS_BUFFER
|
||||
{
|
||||
std::unique_lock<fastlock> asyncUl(replica->lock, std::defer_lock);
|
||||
if (!FCorrectThread(replica))
|
||||
asyncUl.lock();
|
||||
/* If we are online and the RDB has been sent, there is no need to feed the client buffer
|
||||
* We will send our replies directly from the replication backlog instead */
|
||||
if (replica->repl_curr_off == -1){
|
||||
replica->repl_curr_off = g_pserver->repl_batch_offStart;
|
||||
|
||||
serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off);
|
||||
/* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */
|
||||
serverAssert(replica->repl_curr_off != -1);
|
||||
|
||||
}
|
||||
replica->repl_end_off = g_pserver->master_repl_offset;
|
||||
|
||||
/* Only if the there isn't already a pending write do we prepare the client to write */
|
||||
if (!replica->fPendingReplicaWrite){
|
||||
|
@ -1518,6 +1518,7 @@ struct client {
|
||||
should use. */
|
||||
|
||||
long long repl_curr_off = -1; /* Replication offset of the client, only if it's a replica*/
|
||||
long long repl_end_off = -1; /* Replication offset to write to */
|
||||
int fPendingReplicaWrite;
|
||||
|
||||
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
|
||||
|
Loading…
x
Reference in New Issue
Block a user