From d4c1e981247cb8ac2c33d64dcef9937ee422f451 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 01:41:00 -0500 Subject: [PATCH] Implement an error handler so bug #125 can't happen Former-commit-id: 16a019dba053fd0654116ff98a2ad0b66a9ed4e6 --- src/aof.cpp | 28 +++++++++++++++++++--------- src/networking.cpp | 4 ++-- src/server.cpp | 1 + src/server.h | 1 + 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 65647bbba..de8a8260e 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { } } +void installAofRewriteEvent() +{ + serverTL->fRetrySetAofEvent = false; + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + if (res != AE_OK) + serverTL->fRetrySetAofEvent = true; + } +} + /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks); @@ -165,15 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (!g_pserver->aof_rewrite_pending) { - g_pserver->aof_rewrite_pending = true; - int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { - g_pserver->aof_rewrite_pending = false; - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); - serverAssert(res == AE_OK); // we can't handle an error here - } + installAofRewriteEvent(); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -349,6 +356,9 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; + if (serverTL->fRetrySetAofEvent) + installAofRewriteEvent(); + if (sdslen(g_pserver->aof_buf) == 0) { /* Check if we need to do fsync even the aof buffer is empty, * because previously in AOF_FSYNC_EVERYSEC mode, fsync is diff --git a/src/networking.cpp b/src/networking.cpp index 097df0f87..54e04406f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,7 +1003,7 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } -int chooseBestThreadForAccept(int ielCur) +int chooseBestThreadForAccept() { listIter li; listNode *ln; @@ -1133,7 +1133,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { { - int ielTarget = chooseBestThreadForAccept(ielCur); + int ielTarget = chooseBestThreadForAccept(); if (ielTarget != ielCur) { char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); diff --git a/src/server.cpp b/src/server.cpp index 2d6027348..15f52ab52 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2884,6 +2884,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; pvar->clients_paused = 0; + pvar->fRetrySetAofEvent = false; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", diff --git a/src/server.h b/src/server.h index 3ff023677..3ec2e8948 100644 --- a/src/server.h +++ b/src/server.h @@ -1526,6 +1526,7 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; + bool fRetrySetAofEvent = false; }; struct redisMaster {