Performance optimizations
Former-commit-id: 7fd83d467784d293f7da78b74f9b9763ce387238
This commit is contained in:
parent
089cbfa580
commit
f0728a7ead
@ -47,8 +47,7 @@
|
|||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
// #define BYPASS_BUFFER
|
#define BYPASS_BUFFER
|
||||||
// #define RESIZE_BACKLOG
|
|
||||||
|
|
||||||
void replicationDiscardCachedMaster(redisMaster *mi);
|
void replicationDiscardCachedMaster(redisMaster *mi);
|
||||||
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
|
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
|
||||||
@ -89,10 +88,10 @@ int RDBGeneratedByReplication = 0;
|
|||||||
void resizeReplicationBacklogForClients(long long newsize);
|
void resizeReplicationBacklogForClients(long long newsize);
|
||||||
|
|
||||||
void setReplIdx(client *c, long long idx, long long off){
|
void setReplIdx(client *c, long long idx, long long off){
|
||||||
if (prepareClientToWrite(c) != C_OK) return;
|
|
||||||
// serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx);
|
// serverLog(LL_NOTICE, "calling this garbage function w/ idx and off: %lld, %lld, %lld", idx, off, off-idx);
|
||||||
// serverLog(LL_NOTICE, "Repl Index started at: %lld", c->repl_curr_idx);
|
// serverLog(LL_NOTICE, "Repl Index started at: %lld", c->repl_curr_idx);
|
||||||
if (c->repl_curr_idx == -1){
|
if (c->repl_curr_idx == -1){
|
||||||
|
if (prepareClientToWrite(c) != C_OK) return;
|
||||||
c->repl_curr_idx = idx;
|
c->repl_curr_idx = idx;
|
||||||
c->repl_curr_off = off;
|
c->repl_curr_off = off;
|
||||||
}
|
}
|
||||||
@ -432,17 +431,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
|||||||
resizeReplicationBacklogForClients(newsize);
|
resizeReplicationBacklogForClients(newsize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#ifdef RESIZE_BACKLOG
|
|
||||||
long long lowest_replica_offset = getLowestOffsetAmongReplicas();
|
|
||||||
minimumsize = g_pserver->master_repl_offset + len - lowest_replica_offset;
|
|
||||||
if (lowest_replica_offset != -1 && minimumsize > g_pserver->repl_backlog_size){
|
|
||||||
serverLog(LL_WARNING, "THE REPLICATION BACKLOG SIZE IS TOO SMALL, THIS IS A PROBLEM");
|
|
||||||
long long oldsize = g_pserver->repl_backlog_size;
|
|
||||||
resizeReplicationBacklogForClients(std::max(g_pserver->repl_backlog_size * 2, minimumsize));
|
|
||||||
serverLog(LL_WARNING, "changed size from %lld to %lld", oldsize, g_pserver->repl_backlog_size);
|
|
||||||
flushReplBacklogToClients();
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// serverLog(LL_NOTICE, "Pt2 start with: master_repl_offset: %lld, repl_batch_offStart: %lld, "
|
// serverLog(LL_NOTICE, "Pt2 start with: master_repl_offset: %lld, repl_batch_offStart: %lld, "
|
||||||
@ -4635,30 +4623,11 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long
|
|||||||
|
|
||||||
void _clientAsyncReplyBufferReserve(client *c, size_t len);
|
void _clientAsyncReplyBufferReserve(client *c, size_t len);
|
||||||
|
|
||||||
/* Has the end of the replication backlog overflowed past the beginning? */
|
|
||||||
bool replOverflowHasOccured(){
|
|
||||||
|
|
||||||
if (g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart){
|
|
||||||
long long repl_idx_difference = g_pserver->repl_backlog_idx > g_pserver->repl_batch_idxStart ?
|
|
||||||
g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart :
|
|
||||||
(g_pserver->repl_backlog_size + g_pserver->repl_backlog_idx) - g_pserver->repl_batch_idxStart;
|
|
||||||
|
|
||||||
return g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > repl_idx_difference;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
thread_local int transmittedCount = 0;
|
|
||||||
|
|
||||||
void flushReplBacklogToClients()
|
void flushReplBacklogToClients()
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
if (g_pserver->repl_batch_offStart < 0){
|
if (g_pserver->repl_batch_offStart < 0)
|
||||||
if (getLowestOffsetAmongReplicas() == -1){
|
|
||||||
serverLog(LL_NOTICE, "this is a case i probably have to handle");
|
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
|
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
|
||||||
@ -4668,39 +4637,9 @@ void flushReplBacklogToClients()
|
|||||||
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
|
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
|
||||||
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
|
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
|
||||||
|
|
||||||
// serverAssert(!replOverflowHasOccured());
|
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listRewind(g_pserver->slaves, &li);
|
listRewind(g_pserver->slaves, &li);
|
||||||
|
|
||||||
#if 0
|
|
||||||
// check for potential overflow first
|
|
||||||
while ((ln = listNext(&li))) {
|
|
||||||
client *replica = (client*)listNodeValue(ln);
|
|
||||||
// serverLog(LL_NOTICE, "replica state: %d", replica->replstate);
|
|
||||||
|
|
||||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
|
||||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
|
||||||
if (replica->repl_curr_idx == -1) continue;
|
|
||||||
|
|
||||||
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
|
|
||||||
if (FCorrectThread(replica))
|
|
||||||
ul.lock();
|
|
||||||
else
|
|
||||||
fAsyncWrite = true;
|
|
||||||
|
|
||||||
if (g_pserver->master_repl_offset - replica->repl_curr_off > g_pserver->repl_backlog_size){
|
|
||||||
serverLog(LL_WARNING, "THE REPLICATION BACKLOG SIZE IS TOO SMALL, THIS IS A PROBLEM");
|
|
||||||
long long oldsize = g_pserver->repl_backlog_size;
|
|
||||||
resizeReplicationBacklogForClients(std::max(g_pserver->repl_backlog_size * 2, g_pserver->master_repl_offset - replica->repl_curr_off));
|
|
||||||
serverLog(LL_WARNING, "changing size from %lld to %lld", oldsize, g_pserver->repl_backlog_size);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
listRewind(g_pserver->slaves, &li);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
while ((ln = listNext(&li))) {
|
while ((ln = listNext(&li))) {
|
||||||
client *replica = (client*)listNodeValue(ln);
|
client *replica = (client*)listNodeValue(ln);
|
||||||
|
|
||||||
@ -4721,10 +4660,6 @@ void flushReplBacklogToClients()
|
|||||||
setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart);
|
setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
#else
|
|
||||||
if (replica->replstate == SLAVE_STATE_ONLINE){
|
|
||||||
// serverLog(LL_NOTICE, "would be calling this garbage function w/ offset: %lld", g_pserver->repl_batch_idxStart);
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
|
||||||
long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
|
long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user