Sync works single threaded properly, passes all but one testcase (which hangs)
Former-commit-id: 9a6ca3a5d906b9d87fe70652d218decbb2775ac1
This commit is contained in:
parent
4d85f3f6b0
commit
615fbeb10f
@ -15,7 +15,7 @@
|
|||||||
release_hdr := $(shell sh -c './mkreleasehdr.sh')
|
release_hdr := $(shell sh -c './mkreleasehdr.sh')
|
||||||
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
|
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
|
||||||
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
|
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
|
||||||
OPTIMIZATION?=-O2 -flto
|
OPTIMIZATION?=-O2
|
||||||
DEPENDENCY_TARGETS=hiredis linenoise lua rocksdb
|
DEPENDENCY_TARGETS=hiredis linenoise lua rocksdb
|
||||||
NODEPS:=clean distclean
|
NODEPS:=clean distclean
|
||||||
|
|
||||||
|
@ -146,6 +146,7 @@ client *createClient(connection *conn, int iel) {
|
|||||||
c->flags = 0;
|
c->flags = 0;
|
||||||
c->fPendingAsyncWrite = FALSE;
|
c->fPendingAsyncWrite = FALSE;
|
||||||
c->fPendingAsyncWriteHandler = FALSE;
|
c->fPendingAsyncWriteHandler = FALSE;
|
||||||
|
c->fPendingReplicaWrite = FALSE;
|
||||||
c->ctime = c->lastinteraction = g_pserver->unixtime;
|
c->ctime = c->lastinteraction = g_pserver->unixtime;
|
||||||
/* If the default user does not require authentication, the user is
|
/* If the default user does not require authentication, the user is
|
||||||
* directly authenticated. */
|
* directly authenticated. */
|
||||||
@ -221,6 +222,10 @@ void clientInstallWriteHandler(client *c) {
|
|||||||
/* Schedule the client to write the output buffers to the socket only
|
/* Schedule the client to write the output buffers to the socket only
|
||||||
* if not already done and, for slaves, if the replica can actually receive
|
* if not already done and, for slaves, if the replica can actually receive
|
||||||
* writes at this stage. */
|
* writes at this stage. */
|
||||||
|
|
||||||
|
if (c->flags & CLIENT_SLAVE)
|
||||||
|
serverLog(LL_NOTICE, "installing write handler");
|
||||||
|
|
||||||
if (!(c->flags & CLIENT_PENDING_WRITE) &&
|
if (!(c->flags & CLIENT_PENDING_WRITE) &&
|
||||||
(c->replstate == REPL_STATE_NONE ||
|
(c->replstate == REPL_STATE_NONE ||
|
||||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||||
@ -272,6 +277,9 @@ void clientInstallAsyncWriteHandler(client *c) {
|
|||||||
int prepareClientToWrite(client *c) {
|
int prepareClientToWrite(client *c) {
|
||||||
bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread
|
bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread
|
||||||
|
|
||||||
|
if (c->flags & CLIENT_SLAVE)
|
||||||
|
serverLog(LL_NOTICE, "got into prepareClientToWrite");
|
||||||
|
|
||||||
if (!fAsync) {
|
if (!fAsync) {
|
||||||
serverAssert(c->conn == nullptr || c->lock.fOwnLock());
|
serverAssert(c->conn == nullptr || c->lock.fOwnLock());
|
||||||
} else {
|
} else {
|
||||||
@ -302,7 +310,7 @@ int prepareClientToWrite(client *c) {
|
|||||||
|
|
||||||
/* Schedule the client to write the output buffers to the socket, unless
|
/* Schedule the client to write the output buffers to the socket, unless
|
||||||
* it should already be setup to do so (it has already pending data). */
|
* it should already be setup to do so (it has already pending data). */
|
||||||
if (!fAsync && !clientHasPendingReplies(c) && c->repl_curr_idx == -1) clientInstallWriteHandler(c);
|
if (!fAsync && !clientHasPendingReplies(c) && !c->fPendingReplicaWrite) clientInstallWriteHandler(c);
|
||||||
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
|
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
|
||||||
|
|
||||||
/* Authorize the caller to queue in the output buffer of this client. */
|
/* Authorize the caller to queue in the output buffer of this client. */
|
||||||
@ -320,7 +328,6 @@ void _clientAsyncReplyBufferReserve(client *c, size_t len) {
|
|||||||
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
|
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
|
||||||
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
|
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
|
||||||
replyNew->used = 0;
|
replyNew->used = 0;
|
||||||
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
|
|
||||||
c->replyAsync = replyNew;
|
c->replyAsync = replyNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -334,7 +341,6 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
|||||||
if (fAsync)
|
if (fAsync)
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
|
|
||||||
if (c->replyAsync == nullptr || (c->replyAsync->size - c->replyAsync->used) < len)
|
if (c->replyAsync == nullptr || (c->replyAsync->size - c->replyAsync->used) < len)
|
||||||
{
|
{
|
||||||
if (c->replyAsync == nullptr) {
|
if (c->replyAsync == nullptr) {
|
||||||
@ -1661,6 +1667,16 @@ client *lookupClientByID(uint64_t id) {
|
|||||||
return (c == raxNotFound) ? NULL : c;
|
return (c == raxNotFound) ? NULL : c;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Compute the corresponding index from a replication backlog offset
|
||||||
|
* by taking the distance between the input offset and the replication backlog offset
|
||||||
|
* and applying that to the replication backlog index, wrapping around if the index
|
||||||
|
* becomes negative.
|
||||||
|
* TODO: Rewrite comment for new logic */
|
||||||
|
long long getReplIndexFromOffset(long long offset){
|
||||||
|
long long index = (offset - g_pserver->repl_backlog_start) % g_pserver->repl_backlog_size;
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
/* Write data in output buffers to client. Return C_OK if the client
|
/* Write data in output buffers to client. Return C_OK if the client
|
||||||
* is still valid after the call, C_ERR if it was freed because of some
|
* is still valid after the call, C_ERR if it was freed because of some
|
||||||
* error. If handler_installed is set, it will attempt to clear the
|
* error. If handler_installed is set, it will attempt to clear the
|
||||||
@ -1680,7 +1696,11 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||||
// serverLog(LL_NOTICE, "acq client");
|
// serverLog(LL_NOTICE, "acq client");
|
||||||
|
|
||||||
|
if (c->flags & CLIENT_SLAVE)
|
||||||
|
serverLog(LL_NOTICE, "writeToClient has happened");
|
||||||
|
|
||||||
while(clientHasPendingReplies(c)) {
|
while(clientHasPendingReplies(c)) {
|
||||||
|
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
|
||||||
if (c->bufpos > 0) {
|
if (c->bufpos > 0) {
|
||||||
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
||||||
if (nwritten <= 0) break;
|
if (nwritten <= 0) break;
|
||||||
@ -1739,80 +1759,67 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
/* We can only directly read from the replication backlog if the client
|
/* We can only directly read from the replication backlog if the client
|
||||||
is a replica, so only attempt to do so if that's the case. */
|
is a replica, so only attempt to do so if that's the case. */
|
||||||
if (c->flags & CLIENT_SLAVE) {
|
if (c->flags & CLIENT_SLAVE) {
|
||||||
/* If there are no more pending replies, then we have transmitted the RDB.
|
|
||||||
* This means further replication commands will be taken straight from the
|
|
||||||
* replication backlog from now on. */
|
|
||||||
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
|
|
||||||
|
|
||||||
if (c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){
|
|
||||||
c->transmittedRDB = true;
|
|
||||||
}
|
|
||||||
bool transmittedRDB = c->transmittedRDB;
|
|
||||||
tRDBLock.unlock();
|
|
||||||
|
|
||||||
/* For replicas, we don't store all the information in the client buffer
|
/* For replicas, we don't store all the information in the client buffer
|
||||||
* Most of the time (aside from immediately after synchronizing), we read
|
* We always read from the replication backlog directly */
|
||||||
* from the replication backlog directly */
|
|
||||||
if (c->repl_curr_idx != -1 && transmittedRDB){
|
|
||||||
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
|
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
|
||||||
|
|
||||||
/* copy global variables into local scope so if they change in between we don't care */
|
/* Right now, we're bringing in the offStart into the scope
|
||||||
long long repl_backlog_idx = g_pserver->repl_backlog_idx;
|
* If repl_batch_offStart is equal to -1, that means the mechanism is disabled
|
||||||
long long repl_backlog_size = g_pserver->repl_backlog_size;
|
* which implies there is no data to flush and that the global offset is accurate */
|
||||||
long long nwrittenPart2 = 0;
|
long long offStart = g_pserver->repl_batch_offStart == -1 ? g_pserver->master_repl_offset : g_pserver->repl_batch_offStart;
|
||||||
|
long long idxStart = getReplIndexFromOffset(offStart);
|
||||||
|
if (g_pserver->repl_batch_offStart != -1)
|
||||||
|
serverAssert(idxStart == g_pserver->repl_batch_idxStart);
|
||||||
|
else
|
||||||
|
serverAssert(idxStart == g_pserver->repl_backlog_idx);
|
||||||
|
|
||||||
ssize_t nrequested; /* The number of bytes requested to write */
|
if (c->repl_curr_off != -1 && c->repl_curr_off != offStart){
|
||||||
|
serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld",
|
||||||
|
c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart);
|
||||||
|
|
||||||
|
long long curr_idx = getReplIndexFromOffset(c->repl_curr_off);
|
||||||
|
long long nwrittenPart2 = 0;
|
||||||
/* normal case with no wrap around */
|
/* normal case with no wrap around */
|
||||||
if (repl_backlog_idx >= c->repl_curr_idx){
|
if (idxStart >= curr_idx){
|
||||||
nrequested = repl_backlog_idx - c->repl_curr_idx;
|
nwritten = connWrite(c->conn, g_pserver->repl_backlog + curr_idx, idxStart - curr_idx);
|
||||||
nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_idx - c->repl_curr_idx);
|
|
||||||
/* wrap around case, v. rare */
|
/* wrap around case, v. rare */
|
||||||
/* also v. buggy so there's that */
|
/* also v. buggy so there's that */
|
||||||
} else {
|
} else {
|
||||||
nrequested = repl_backlog_size + repl_backlog_idx - c->repl_curr_idx;
|
serverLog(LL_NOTICE, "ROAD OF RESISTANCE");
|
||||||
nwritten = connWrite(c->conn, g_pserver->repl_backlog + c->repl_curr_idx, repl_backlog_size - c->repl_curr_idx);
|
nwritten = connWrite(c->conn, g_pserver->repl_backlog + curr_idx, g_pserver->repl_backlog_size - curr_idx);
|
||||||
/* only attempt wrapping if we write the correct number of bytes */
|
/* only attempt wrapping if we write the correct number of bytes */
|
||||||
if (nwritten == repl_backlog_size - c->repl_curr_idx){
|
if (nwritten == g_pserver->repl_backlog_size - curr_idx){
|
||||||
long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, repl_backlog_idx);
|
long long nwrittenPart2 = connWrite(c->conn, g_pserver->repl_backlog, idxStart);
|
||||||
if (nwrittenPart2 != -1)
|
if (nwrittenPart2 != -1)
|
||||||
nwritten += nwrittenPart2;
|
nwritten += nwrittenPart2;
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* only update the replica's current index if bytes were sent */
|
|
||||||
|
|
||||||
// if (nrequested != nwritten){
|
|
||||||
// serverLog(LL_NOTICE, "-----------------------------------------");
|
|
||||||
// serverLog(LL_NOTICE, "AFTER THE FACT");
|
|
||||||
// serverLog(LL_NOTICE, "requested to write: %ld", nrequested);
|
|
||||||
// serverLog(LL_NOTICE, "actually written: %ld", nwritten);
|
|
||||||
// serverLog(LL_NOTICE, "repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
|
|
||||||
// serverLog(LL_NOTICE, "repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
|
|
||||||
// serverLog(LL_NOTICE, "-----------------------------------------");
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
if (nwritten == nrequested && g_pserver->repl_backlog_idx == repl_backlog_idx){
|
|
||||||
c->repl_curr_idx = -1; /* -1 denotes no more replica writes */
|
|
||||||
}
|
|
||||||
else if (nwritten > 0)
|
|
||||||
c->repl_curr_idx = (c->repl_curr_idx + nwritten) % repl_backlog_size;
|
|
||||||
|
|
||||||
serverAssert(c->repl_curr_idx < repl_backlog_size);
|
|
||||||
|
|
||||||
/* only increment bytes if an error didn't occur */
|
/* only increment bytes if an error didn't occur */
|
||||||
if (nwritten > 0){
|
if (nwritten > 0){
|
||||||
totwritten += nwritten;
|
totwritten += nwritten;
|
||||||
c->repl_curr_off += nwritten;
|
c->repl_curr_off += nwritten;
|
||||||
|
if (1){
|
||||||
|
serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld",
|
||||||
|
c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart);
|
||||||
|
}
|
||||||
|
serverAssert(c->repl_curr_off <= offStart);
|
||||||
|
/* If the client offset matches the global offset, we wrote all we needed to,
|
||||||
|
* in which case, there is no pending write */
|
||||||
|
if (c->repl_curr_off == offStart){
|
||||||
|
serverLog(LL_NOTICE, "good, %lld", offStart);
|
||||||
|
c->fPendingReplicaWrite = false;
|
||||||
|
} else {
|
||||||
|
serverLog(LL_NOTICE, "mismatch between repl_curr_off (%lld) and offStart (%lld)", c->repl_curr_off, offStart);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If the second part of a write didn't go through, we still need to register that */
|
/* If the second part of a write didn't go through, we still need to register that */
|
||||||
if (nwrittenPart2 == -1) nwritten = -1;
|
if (nwrittenPart2 == -1) nwritten = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (c->flags & CLIENT_SLAVE && handler_installed)
|
// if (c->flags & CLIENT_SLAVE && handler_installed)
|
||||||
serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed);
|
// serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1836,12 +1843,12 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
* We just rely on data / pings received for timeout detection. */
|
* We just rely on data / pings received for timeout detection. */
|
||||||
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
|
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
|
||||||
}
|
}
|
||||||
if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) {
|
if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) {
|
||||||
if(c->flags & CLIENT_SLAVE && handler_installed){
|
// if(c->flags & CLIENT_SLAVE && handler_installed){
|
||||||
serverLog(LL_NOTICE, "Uninstalling handler");
|
// serverLog(LL_NOTICE, "Uninstalling handler");
|
||||||
serverLog(LL_NOTICE, "handler repl_curr_idx: %lld, repl_backlog_size: %lld", c->repl_curr_idx, g_pserver->repl_backlog_size);
|
// serverLog(LL_NOTICE, "handler repl_curr_idx: %lld, repl_backlog_size: %lld", c->repl_curr_idx, g_pserver->repl_backlog_size);
|
||||||
serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
|
// serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
|
||||||
}
|
// }
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
if (handler_installed) connSetWriteHandler(c->conn, NULL);
|
if (handler_installed) connSetWriteHandler(c->conn, NULL);
|
||||||
|
|
||||||
@ -1857,7 +1864,7 @@ int writeToClient(client *c, int handler_installed) {
|
|||||||
/* Write event handler. Just send data to the client. */
|
/* Write event handler. Just send data to the client. */
|
||||||
void sendReplyToClient(connection *conn) {
|
void sendReplyToClient(connection *conn) {
|
||||||
client *c = (client*)connGetPrivateData(conn);
|
client *c = (client*)connGetPrivateData(conn);
|
||||||
serverLog(LL_NOTICE, "called the sendreplytoclient");
|
// serverLog(LL_NOTICE, "called the sendreplytoclient");
|
||||||
if (writeToClient(c,1) == C_ERR)
|
if (writeToClient(c,1) == C_ERR)
|
||||||
{
|
{
|
||||||
AeLocker ae;
|
AeLocker ae;
|
||||||
@ -1886,7 +1893,6 @@ void ProcessPendingAsyncWrites()
|
|||||||
serverAssert(c->fPendingAsyncWrite);
|
serverAssert(c->fPendingAsyncWrite);
|
||||||
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
|
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
|
||||||
{
|
{
|
||||||
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
|
|
||||||
if (c->replyAsync != nullptr){
|
if (c->replyAsync != nullptr){
|
||||||
zfree(c->replyAsync);
|
zfree(c->replyAsync);
|
||||||
c->replyAsync = nullptr;
|
c->replyAsync = nullptr;
|
||||||
@ -1898,7 +1904,6 @@ void ProcessPendingAsyncWrites()
|
|||||||
/* since writes from master to replica can come directly from the replication backlog,
|
/* since writes from master to replica can come directly from the replication backlog,
|
||||||
* writes may have been signalled without having been copied to the replyAsync buffer,
|
* writes may have been signalled without having been copied to the replyAsync buffer,
|
||||||
* thus causing the buffer to be NULL */
|
* thus causing the buffer to be NULL */
|
||||||
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
|
|
||||||
if (c->replyAsync != nullptr){
|
if (c->replyAsync != nullptr){
|
||||||
int size = c->replyAsync->used;
|
int size = c->replyAsync->used;
|
||||||
|
|
||||||
@ -1919,7 +1924,6 @@ void ProcessPendingAsyncWrites()
|
|||||||
}
|
}
|
||||||
|
|
||||||
c->fPendingAsyncWrite = FALSE;
|
c->fPendingAsyncWrite = FALSE;
|
||||||
tRDBLock.unlock();
|
|
||||||
// Now install the write event handler
|
// Now install the write event handler
|
||||||
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
|
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
|
||||||
/* For the fsync=always policy, we want that a given FD is never
|
/* For the fsync=always policy, we want that a given FD is never
|
||||||
@ -2032,8 +2036,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
|||||||
|
|
||||||
/* If after the synchronous writes above we still have data to
|
/* If after the synchronous writes above we still have data to
|
||||||
* output to the client, we need to install the writable handler. */
|
* output to the client, we need to install the writable handler. */
|
||||||
if (clientHasPendingReplies(c) || c->repl_curr_idx != -1) {
|
if (clientHasPendingReplies(c) || c->fPendingReplicaWrite) {
|
||||||
serverLog(LL_NOTICE, "Setting a write handler for later");
|
|
||||||
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
|
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
|
||||||
freeClientAsync(c);
|
freeClientAsync(c);
|
||||||
}
|
}
|
||||||
@ -2214,6 +2217,34 @@ static void setProtocolError(const char *errstr, client *c) {
|
|||||||
c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR);
|
c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void printQueryBuffer(client *c) {
|
||||||
|
if (cserver.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) {
|
||||||
|
sds client = catClientInfoString(sdsempty(),c);
|
||||||
|
|
||||||
|
/* Sample some protocol to given an idea about what was inside. */
|
||||||
|
char buf[PROTO_DUMP_LEN*2];
|
||||||
|
if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) {
|
||||||
|
snprintf(buf,sizeof(buf),"%s", c->querybuf+c->qb_pos);
|
||||||
|
} else {
|
||||||
|
snprintf(buf,sizeof(buf),"%.*s (... more %zu bytes ...) %.*s", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Remove non printable chars. */
|
||||||
|
char *p = buf;
|
||||||
|
while (*p != '\0') {
|
||||||
|
if (!isprint(*p)) *p = '.';
|
||||||
|
p++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Log all the client and protocol info. */
|
||||||
|
int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING :
|
||||||
|
LL_VERBOSE;
|
||||||
|
serverLog(loglevel,
|
||||||
|
"Query buffer from client %lu: %s. %s", c->id, client, buf);
|
||||||
|
sdsfree(client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Process the query buffer for client 'c', setting up the client argument
|
/* Process the query buffer for client 'c', setting up the client argument
|
||||||
* vector for command execution. Returns C_OK if after running the function
|
* vector for command execution. Returns C_OK if after running the function
|
||||||
* the client has a well-formed ready to be processed command, otherwise
|
* the client has a well-formed ready to be processed command, otherwise
|
||||||
@ -2468,6 +2499,8 @@ void parseClientCommandBuffer(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
size_t cqueriesStart = c->vecqueuedcmd.size();
|
size_t cqueriesStart = c->vecqueuedcmd.size();
|
||||||
|
// if (c->flags & CLIENT_MASTER)
|
||||||
|
// printQueryBuffer(c);
|
||||||
if (c->reqtype == PROTO_REQ_INLINE) {
|
if (c->reqtype == PROTO_REQ_INLINE) {
|
||||||
if (processInlineBuffer(c) != C_OK) break;
|
if (processInlineBuffer(c) != C_OK) break;
|
||||||
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
|
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
|
||||||
|
@ -88,18 +88,6 @@ int RDBGeneratedByReplication = 0;
|
|||||||
|
|
||||||
void resizeReplicationBacklogForClients(long long newsize);
|
void resizeReplicationBacklogForClients(long long newsize);
|
||||||
|
|
||||||
void setReplIdx(client *c, long long idx, long long off){
|
|
||||||
// serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx);
|
|
||||||
// serverLog(LL_NOTICE, "Repl Index started at: %lld", c->repl_curr_idx);
|
|
||||||
if (c->repl_curr_idx == -1 && off >= c->repl_curr_off){
|
|
||||||
if (prepareClientToWrite(c) != C_OK) return;
|
|
||||||
c->repl_curr_idx = idx;
|
|
||||||
c->repl_curr_off = off;
|
|
||||||
}
|
|
||||||
// serverLog(LL_NOTICE, "Repl Index has become: %lld", c->repl_curr_idx);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/* --------------------------- Utility functions ---------------------------- */
|
/* --------------------------- Utility functions ---------------------------- */
|
||||||
|
|
||||||
/* Return the pointer to a string representing the replica ip:listening_port
|
/* Return the pointer to a string representing the replica ip:listening_port
|
||||||
@ -232,6 +220,7 @@ void createReplicationBacklog(void) {
|
|||||||
g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL);
|
g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL);
|
||||||
g_pserver->repl_backlog_histlen = 0;
|
g_pserver->repl_backlog_histlen = 0;
|
||||||
g_pserver->repl_backlog_idx = 0;
|
g_pserver->repl_backlog_idx = 0;
|
||||||
|
g_pserver->repl_backlog_start = g_pserver->master_repl_offset;
|
||||||
|
|
||||||
/* We don't have any data inside our buffer, but virtually the first
|
/* We don't have any data inside our buffer, but virtually the first
|
||||||
* byte we have is the next byte that will be generated for the
|
* byte we have is the next byte that will be generated for the
|
||||||
@ -284,6 +273,7 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
g_pserver->repl_backlog = backlog;
|
g_pserver->repl_backlog = backlog;
|
||||||
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
||||||
g_pserver->repl_batch_idxStart = 0;
|
g_pserver->repl_batch_idxStart = 0;
|
||||||
|
g_pserver->repl_backlog_start = g_pserver->master_repl_offset;
|
||||||
} else {
|
} else {
|
||||||
zfree(g_pserver->repl_backlog);
|
zfree(g_pserver->repl_backlog);
|
||||||
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
||||||
@ -296,6 +286,7 @@ void resizeReplicationBacklog(long long newsize) {
|
|||||||
g_pserver->repl_backlog_size = newsize;
|
g_pserver->repl_backlog_size = newsize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long long getReplIndexFromOffset(long long offset);
|
||||||
|
|
||||||
/* The above but for when clients need extra replication backlog because ??? */
|
/* The above but for when clients need extra replication backlog because ??? */
|
||||||
void resizeReplicationBacklogForClients(long long newsize) {
|
void resizeReplicationBacklogForClients(long long newsize) {
|
||||||
@ -305,32 +296,8 @@ void resizeReplicationBacklogForClients(long long newsize) {
|
|||||||
|
|
||||||
serverLog(LL_NOTICE, "WE HAVE TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize);
|
serverLog(LL_NOTICE, "WE HAVE TO RESIZE from %lld to %lld", g_pserver->repl_backlog_size, newsize);
|
||||||
/* get the critical client size, i.e. the size of the data unflushed to clients */
|
/* get the critical client size, i.e. the size of the data unflushed to clients */
|
||||||
long long earliest_off = LONG_LONG_MAX;
|
long long earliest_off = g_pserver->repl_lowest_off.load();
|
||||||
long long earliest_idx = -1;
|
|
||||||
listIter li;
|
|
||||||
listNode *ln;
|
|
||||||
listRewind(g_pserver->slaves, &li);
|
|
||||||
while ((ln = listNext(&li))) {
|
|
||||||
client *replica = (client*)listNodeValue(ln);
|
|
||||||
if (replica->repl_curr_off != -1 && replica->repl_curr_off < earliest_off){
|
|
||||||
earliest_off = replica->repl_curr_off;
|
|
||||||
earliest_idx = replica->repl_curr_idx;
|
|
||||||
}
|
|
||||||
serverLog(LL_NOTICE, "repl_curr_idx: %lld, earlistidx: %lld", replica->repl_curr_idx, earliest_idx);
|
|
||||||
}
|
|
||||||
serverLog(LL_NOTICE, "We are starting with: master_repl_offset: %lld, repl_batch_offStart: %lld, earliest_off: %lld, "
|
|
||||||
"repl_backlog_idx: %lld, repl_batch_idxStart: %lld, earliest_idx: %lld, repl_backlog_size: %lld",
|
|
||||||
g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, earliest_off,
|
|
||||||
g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, earliest_idx, g_pserver->repl_backlog_size
|
|
||||||
);
|
|
||||||
|
|
||||||
long long new_off = 0, new_idx = 0;
|
|
||||||
|
|
||||||
/* if no earliest offset is found amongst the clients, they are all up to date with the flushed index */
|
|
||||||
if (earliest_off == LONG_LONG_MAX && earliest_idx == -1){
|
|
||||||
earliest_idx = g_pserver->repl_batch_idxStart;
|
|
||||||
earliest_off = g_pserver->repl_batch_offStart;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (g_pserver->repl_backlog != NULL) {
|
if (g_pserver->repl_backlog != NULL) {
|
||||||
/* What we actually do is to flush the old buffer and realloc a new
|
/* What we actually do is to flush the old buffer and realloc a new
|
||||||
@ -339,17 +306,18 @@ void resizeReplicationBacklogForClients(long long newsize) {
|
|||||||
* worse often we need to alloc additional space before freeing the
|
* worse often we need to alloc additional space before freeing the
|
||||||
* old buffer. */
|
* old buffer. */
|
||||||
|
|
||||||
if (earliest_idx >= 0) {
|
if (earliest_off != -1) {
|
||||||
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
||||||
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
|
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
|
||||||
char *backlog = (char*)zmalloc(newsize);
|
char *backlog = (char*)zmalloc(newsize);
|
||||||
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off;
|
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off;
|
||||||
|
long long earliest_idx = getReplIndexFromOffset(earliest_off);
|
||||||
|
|
||||||
if (g_pserver->repl_backlog_idx >= earliest_idx) {
|
if (g_pserver->repl_backlog_idx >= earliest_idx) {
|
||||||
auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx;
|
auto cbActiveBacklog = g_pserver->repl_backlog_idx - earliest_idx;
|
||||||
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog);
|
memcpy(backlog, g_pserver->repl_backlog + earliest_idx, cbActiveBacklog);
|
||||||
serverLog(LL_NOTICE, "g_pserver->master_repl_offset: %lld, earliest_off: %lld, g_pserver->repl_backlog_idx: %lld, earliest_idx: %lld",
|
serverLog(LL_NOTICE, "g_pserver->master_repl_offset: %lld, earliest_off: %lld, g_pserver->repl_backlog_idx: %lld, earliest_idx: %lld, repl_backlog_start: %lld",
|
||||||
g_pserver->master_repl_offset, earliest_off, g_pserver->repl_backlog_idx, earliest_idx);
|
g_pserver->master_repl_offset, earliest_off, g_pserver->repl_backlog_idx, earliest_idx, g_pserver->repl_backlog_start);
|
||||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
||||||
} else {
|
} else {
|
||||||
auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx;
|
auto cbPhase1 = g_pserver->repl_backlog_size - earliest_idx;
|
||||||
@ -361,20 +329,10 @@ void resizeReplicationBacklogForClients(long long newsize) {
|
|||||||
zfree(g_pserver->repl_backlog);
|
zfree(g_pserver->repl_backlog);
|
||||||
g_pserver->repl_backlog = backlog;
|
g_pserver->repl_backlog = backlog;
|
||||||
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
||||||
listRewind(g_pserver->slaves, &li);
|
|
||||||
/* Go through the clients and update their replication indicies */
|
|
||||||
while ((ln = listNext(&li))) {
|
|
||||||
client *replica = (client*)listNodeValue(ln);
|
|
||||||
if (replica->repl_curr_idx != -1){
|
|
||||||
replica->repl_curr_idx -= earliest_idx;
|
|
||||||
if (replica->repl_curr_idx < 0)
|
|
||||||
replica->repl_curr_idx += g_pserver->repl_backlog_size;
|
|
||||||
}
|
|
||||||
new_idx = replica->repl_curr_idx;
|
|
||||||
}
|
|
||||||
g_pserver->repl_batch_idxStart -= earliest_idx;
|
g_pserver->repl_batch_idxStart -= earliest_idx;
|
||||||
if (g_pserver->repl_batch_idxStart < 0)
|
if (g_pserver->repl_batch_idxStart < 0)
|
||||||
g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size;
|
g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size;
|
||||||
|
g_pserver->repl_backlog_start = earliest_off;
|
||||||
} else {
|
} else {
|
||||||
zfree(g_pserver->repl_backlog);
|
zfree(g_pserver->repl_backlog);
|
||||||
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
||||||
@ -382,14 +340,15 @@ void resizeReplicationBacklogForClients(long long newsize) {
|
|||||||
g_pserver->repl_backlog_idx = 0;
|
g_pserver->repl_backlog_idx = 0;
|
||||||
/* Next byte we have is... the next since the buffer is empty. */
|
/* Next byte we have is... the next since the buffer is empty. */
|
||||||
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
|
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
|
||||||
|
g_pserver->repl_backlog_start = g_pserver->master_repl_offset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
g_pserver->repl_backlog_size = newsize;
|
g_pserver->repl_backlog_size = newsize;
|
||||||
|
|
||||||
serverLog(LL_NOTICE, "We are ending with: master_repl_offset: %lld, repl_batch_offStart: %lld, new_off: %lld, "
|
serverLog(LL_NOTICE, "We are ending with: master_repl_offset: %lld, repl_batch_offStart: %lld, new_off: %lld, "
|
||||||
"repl_backlog_idx: %lld, repl_batch_idxStart: %lld, new_idx: %lld, repl_backlog_size: %lld",
|
"repl_backlog_idx: %lld, repl_batch_idxStart: %lld, new_idx: %lld, repl_backlog_size: %lld",
|
||||||
g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, new_off,
|
g_pserver->master_repl_offset, g_pserver->repl_batch_offStart, 0LL,
|
||||||
g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, new_idx, g_pserver->repl_backlog_size
|
g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, 0LL, g_pserver->repl_backlog_size
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -456,11 +415,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
|||||||
len -= thislen;
|
len -= thislen;
|
||||||
p += thislen;
|
p += thislen;
|
||||||
g_pserver->repl_backlog_histlen += thislen;
|
g_pserver->repl_backlog_histlen += thislen;
|
||||||
// serverLog(LL_NOTICE, "Pt2 intermediate with: master_repl_offset: %lld, repl_batch_offStart: %lld, "
|
|
||||||
// "repl_backlog_idx: %lld, repl_batch_idxStart: %lld, repl_backlog_size: %lld",
|
|
||||||
// g_pserver->master_repl_offset, g_pserver->repl_batch_offStart,
|
|
||||||
// g_pserver->repl_backlog_idx, g_pserver->repl_batch_idxStart, g_pserver->repl_backlog_size
|
|
||||||
// );
|
|
||||||
}
|
}
|
||||||
if (g_pserver->repl_backlog_histlen > g_pserver->repl_backlog_size)
|
if (g_pserver->repl_backlog_histlen > g_pserver->repl_backlog_size)
|
||||||
g_pserver->repl_backlog_histlen = g_pserver->repl_backlog_size;
|
g_pserver->repl_backlog_histlen = g_pserver->repl_backlog_size;
|
||||||
@ -722,7 +676,7 @@ void replicationFeedSlaves(list *replicas, int dictid, robj **argv, int argc) {
|
|||||||
void showLatestBacklog(void) {
|
void showLatestBacklog(void) {
|
||||||
if (g_pserver->repl_backlog == NULL) return;
|
if (g_pserver->repl_backlog == NULL) return;
|
||||||
|
|
||||||
long long dumplen = 256;
|
long long dumplen = 1024;
|
||||||
if (g_pserver->repl_backlog_histlen < dumplen)
|
if (g_pserver->repl_backlog_histlen < dumplen)
|
||||||
dumplen = g_pserver->repl_backlog_histlen;
|
dumplen = g_pserver->repl_backlog_histlen;
|
||||||
|
|
||||||
@ -813,7 +767,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
|||||||
}
|
}
|
||||||
decrRefCount(cmdobj);
|
decrRefCount(cmdobj);
|
||||||
}
|
}
|
||||||
|
#define BYPASS_PSYNC
|
||||||
/* Feed the replica 'c' with the replication backlog starting from the
|
/* Feed the replica 'c' with the replication backlog starting from the
|
||||||
* specified 'offset' up to the end of the backlog. */
|
* specified 'offset' up to the end of the backlog. */
|
||||||
long long addReplyReplicationBacklog(client *c, long long offset) {
|
long long addReplyReplicationBacklog(client *c, long long offset) {
|
||||||
@ -854,7 +808,8 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
|
|||||||
len = g_pserver->repl_backlog_histlen - skip;
|
len = g_pserver->repl_backlog_histlen - skip;
|
||||||
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
|
||||||
#ifdef BYPASS_PSYNC
|
#ifdef BYPASS_PSYNC
|
||||||
setReplIdx(c, j, offset);
|
c->repl_curr_off = offset - 1;
|
||||||
|
serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", c->id, getClientPeerId(c), c->repl_curr_off);
|
||||||
#else
|
#else
|
||||||
while(len) {
|
while(len) {
|
||||||
long long thislen =
|
long long thislen =
|
||||||
@ -900,6 +855,11 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) {
|
|||||||
|
|
||||||
replica->psync_initial_offset = offset;
|
replica->psync_initial_offset = offset;
|
||||||
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
||||||
|
|
||||||
|
replica->repl_curr_off = offset;
|
||||||
|
|
||||||
|
serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off);
|
||||||
|
|
||||||
/* We are going to accumulate the incremental changes for this
|
/* We are going to accumulate the incremental changes for this
|
||||||
* replica as well. Set replicaseldb to -1 in order to force to re-emit
|
* replica as well. Set replicaseldb to -1 in order to force to re-emit
|
||||||
* a SELECT statement in the replication stream. */
|
* a SELECT statement in the replication stream. */
|
||||||
@ -2006,7 +1966,6 @@ void replicationCreateMasterClient(redisMaster *mi, connection *conn, int dbid)
|
|||||||
mi->master->reploff_skipped = 0;
|
mi->master->reploff_skipped = 0;
|
||||||
mi->master->read_reploff = mi->master->reploff;
|
mi->master->read_reploff = mi->master->reploff;
|
||||||
mi->master->puser = NULL; /* This client can do everything. */
|
mi->master->puser = NULL; /* This client can do everything. */
|
||||||
|
|
||||||
memcpy(mi->master->uuid, mi->master_uuid, UUID_BINARY_LEN);
|
memcpy(mi->master->uuid, mi->master_uuid, UUID_BINARY_LEN);
|
||||||
memset(mi->master_uuid, 0, UUID_BINARY_LEN); // make sure people don't use this temp storage buffer
|
memset(mi->master_uuid, 0, UUID_BINARY_LEN); // make sure people don't use this temp storage buffer
|
||||||
|
|
||||||
@ -4652,12 +4611,17 @@ void flushReplBacklogToClients()
|
|||||||
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
|
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
|
||||||
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
|
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
|
||||||
|
|
||||||
|
serverLog(LL_NOTICE, "the master repl offset is %lld", g_pserver->master_repl_offset);
|
||||||
|
showLatestBacklog();
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listRewind(g_pserver->slaves, &li);
|
listRewind(g_pserver->slaves, &li);
|
||||||
while ((ln = listNext(&li))) {
|
while ((ln = listNext(&li))) {
|
||||||
client *replica = (client*)listNodeValue(ln);
|
client *replica = (client*)listNodeValue(ln);
|
||||||
|
|
||||||
|
// serverLog(LL_NOTICE, "client %lu is in the party", replica->id);
|
||||||
|
|
||||||
|
// serverLog(LL_NOTICE, "is there a write pending for %lu, %d", replica->id, replica->fPendingReplicaWrite);
|
||||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||||
|
|
||||||
@ -4675,11 +4639,21 @@ void flushReplBacklogToClients()
|
|||||||
asyncUl.lock();
|
asyncUl.lock();
|
||||||
/* If we are online and the RDB has been sent, there is no need to feed the client buffer
|
/* If we are online and the RDB has been sent, there is no need to feed the client buffer
|
||||||
* We will send our replies directly from the replication backlog instead */
|
* We will send our replies directly from the replication backlog instead */
|
||||||
std::unique_lock<fastlock> tRDBLock (replica->transmittedRDBLock);
|
if (replica->repl_curr_off == -1){
|
||||||
if (replica->replstate == SLAVE_STATE_ONLINE && replica->transmittedRDB){
|
replica->repl_curr_off = g_pserver->repl_batch_offStart;
|
||||||
setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart);
|
|
||||||
continue;
|
serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Only if the there isn't already a pending write do we prepare the client to write */
|
||||||
|
if (!replica->fPendingReplicaWrite){
|
||||||
|
serverAssert(replica->repl_curr_off != g_pserver->master_repl_offset);
|
||||||
|
prepareClientToWrite(replica);
|
||||||
|
replica->fPendingReplicaWrite = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
||||||
|
@ -1516,8 +1516,11 @@ struct client {
|
|||||||
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
|
||||||
copying this replica output buffer
|
copying this replica output buffer
|
||||||
should use. */
|
should use. */
|
||||||
|
|
||||||
long long repl_curr_idx = -1; /* Replication index sent, if this is a replica */
|
long long repl_curr_idx = -1; /* Replication index sent, if this is a replica */
|
||||||
long long repl_curr_off = -1;
|
long long repl_curr_off = -1;
|
||||||
|
int fPendingReplicaWrite;
|
||||||
|
|
||||||
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
|
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
|
||||||
int slave_listening_port; /* As configured with: REPLCONF listening-port */
|
int slave_listening_port; /* As configured with: REPLCONF listening-port */
|
||||||
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
|
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
|
||||||
@ -1577,12 +1580,8 @@ struct client {
|
|||||||
robj **argv;
|
robj **argv;
|
||||||
size_t argv_len_sumActive = 0;
|
size_t argv_len_sumActive = 0;
|
||||||
|
|
||||||
bool transmittedRDB = false; /* Have we finished transmitting the RDB to this replica? */
|
|
||||||
/* If so, we can read from the replication backlog instead of the client buffer */
|
|
||||||
|
|
||||||
// post a function from a non-client thread to run on its client thread
|
// post a function from a non-client thread to run on its client thread
|
||||||
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
|
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
|
||||||
fastlock transmittedRDBLock {"transmittedRDB"};
|
|
||||||
size_t argv_len_sum() const;
|
size_t argv_len_sum() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -2229,6 +2228,8 @@ struct redisServer {
|
|||||||
that is the next byte will'll write to.*/
|
that is the next byte will'll write to.*/
|
||||||
long long repl_backlog_off; /* Replication "master offset" of first
|
long long repl_backlog_off; /* Replication "master offset" of first
|
||||||
byte in the replication backlog buffer.*/
|
byte in the replication backlog buffer.*/
|
||||||
|
long long repl_backlog_start; /* Used to compute indicies from offsets
|
||||||
|
basically, index = (offset - start) % size */
|
||||||
fastlock repl_backlog_lock {"replication backlog"};
|
fastlock repl_backlog_lock {"replication backlog"};
|
||||||
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
|
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
|
||||||
gets released. */
|
gets released. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user