Now tracks memory and resizes 'accurately', need to fix cluster

Former-commit-id: 5f0e01cc199427ab6dfd7f8f28321f6a1f34fd1c
This commit is contained in:
VivekSainiEQ 2021-05-03 16:29:11 +00:00
parent 7ef58a333f
commit f6305ed15b
4 changed files with 39 additions and 8 deletions

View File

@ -2347,6 +2347,7 @@ static int updateReplBacklogSize(long long val, long long prev, const char **err
UNUSED(err);
g_pserver->repl_backlog_size = prev;
resizeReplicationBacklog(val);
g_pserver->repl_backlog_config_size = g_pserver->repl_backlog_size;
return 1;
}

View File

@ -392,9 +392,16 @@ size_t freeMemoryGetNotCountedMemory(void) {
while((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
std::unique_lock<fastlock>(replica->lock);
overhead += getClientOutputBufferMemoryUsage(replica);
/* we don't wish to multiple count the replication backlog shared usage */
overhead += (getClientOutputBufferMemoryUsage(replica) - getClientReplicationBacklogSharedUsage(replica));
}
}
/* also don't count the replication backlog memory
* that's where the replication clients get their memory from */
overhead += (g_pserver->repl_backlog_size - g_pserver->repl_backlog_config_size);
if (g_pserver->aof_state != AOF_OFF) {
overhead += sdsalloc(g_pserver->aof_buf)+aofRewriteBufferSize();
}
@ -516,6 +523,7 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
if (g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION)
goto cant_free; /* We need to free memory, but policy forbids. */
serverLog(LL_NOTICE, "evicting i guess lol, the overhead was %ld, the repl_backlog_size, %lld", freeMemoryGetNotCountedMemory(), g_pserver->repl_backlog_size);
while (mem_freed < mem_tofree) {
int j, k, i;
static unsigned int next_db = 0;

View File

@ -224,7 +224,6 @@ void clientInstallWriteHandler(client *c) {
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
// serverLog(LL_NOTICE, "we installing boyz");
AssertCorrectThread(c);
serverAssert(c->lock.fOwnLock());
/* Here instead of installing the write handler, we just flag the
@ -1801,6 +1800,9 @@ int writeToClient(client *c, int handler_installed) {
if (nwrittenPart2 == -1) nwritten = -1;
}
if (c->flags & CLIENT_SLAVE && handler_installed)
serverLog(LL_NOTICE, "Total bytes written, %ld, write handler installed?: %d", totwritten, handler_installed);
g_pserver->stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
@ -1821,6 +1823,11 @@ int writeToClient(client *c, int handler_installed) {
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = g_pserver->unixtime;
}
if (!clientHasPendingReplies(c) && c->repl_curr_idx == -1) {
if(c->flags & CLIENT_SLAVE && handler_installed){
serverLog(LL_NOTICE, "Uninstalling handler");
serverLog(LL_NOTICE, "handler repl_backlog_idx: %lld, repl_curr_idx: %lld, repl_backlog_size: %lld", repl_backlog_idx, c->repl_curr_idx, g_pserver->repl_backlog_size);
serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset);
}
c->sentlen = 0;
if (handler_installed) connSetWriteHandler(c->conn, NULL);
@ -1836,6 +1843,7 @@ int writeToClient(client *c, int handler_installed) {
/* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {
client *c = (client*)connGetPrivateData(conn);
serverLog(LL_NOTICE, "called the sendreplytoclient");
if (writeToClient(c,1) == C_ERR)
{
AeLocker ae;
@ -1970,6 +1978,7 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
auto vec = std::move(g_pserver->rgthreadvar[iel].clients_pending_write);
processed += (int)vec.size();
// serverLog(LL_NOTICE, "entered handleClientsWithPendingWrites");
for (client *c : vec) {
serverAssertDebug(FCorrectThread(c));
@ -2008,8 +2017,10 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c) || c->repl_curr_idx != -1) {
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR)
serverLog(LL_NOTICE, "Setting a write handler for later");
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_flags, true) == C_ERR) {
freeClientAsync(c);
}
}
}
@ -3359,11 +3370,6 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* that writes to said replica are using data from the replication backlog
* as opposed to it's own internal buffer, this number should keep track of that */
unsigned long getClientReplicationBacklogSharedUsage(client *c) {
if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1){
// serverLog(LL_NOTICE, "repl_backlog_size %lld, repl_backlog_idx %lld, master_repl_offset %lld, repl_curr_idx %lld, repl_curr_off %lld",
// g_pserver->repl_backlog_size, g_pserver->repl_backlog_idx, g_pserver->master_repl_offset, c->repl_curr_idx, c->repl_curr_off);
}
return (!(c->flags & CLIENT_SLAVE) || c->repl_curr_idx == -1) ? 0 : g_pserver->master_repl_offset - c->repl_curr_off;
}

View File

@ -4684,5 +4684,21 @@ void flushReplBacklogToClients()
// This may be called multiple times per "frame" so update with our progress flushing to clients
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
} else if (getLowestOffsetAmongReplicas() != -1){
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
if (FCorrectThread(replica))
ul.lock();
/* try to force prepare client to write i guess? */
if (replica->repl_curr_idx != -1){
if (prepareClientToWrite(replica) != C_OK) continue;
}
}
}
}