Implement an error handler so bug #125 can't happen

Former-commit-id: 16a019dba053fd0654116ff98a2ad0b66a9ed4e6
This commit is contained in:
John Sully 2020-02-11 01:41:00 -05:00
parent fef9925b7f
commit d4c1e98124
4 changed files with 23 additions and 11 deletions

View File

@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
}
}
void installAofRewriteEvent()
{
serverTL->fRetrySetAofEvent = false;
if (!g_pserver->aof_rewrite_pending) {
g_pserver->aof_rewrite_pending = true;
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
g_pserver->aof_rewrite_pending = false;
if (g_pserver->aof_pipe_write_data_to_child >= 0)
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
});
if (res != AE_OK)
serverTL->fRetrySetAofEvent = true;
}
}
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks);
@ -165,15 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
/* Install a file event to send data to the rewrite child if there is
* not one already. */
if (!g_pserver->aof_rewrite_pending) {
g_pserver->aof_rewrite_pending = true;
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
g_pserver->aof_rewrite_pending = false;
if (g_pserver->aof_pipe_write_data_to_child >= 0)
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
});
serverAssert(res == AE_OK); // we can't handle an error here
}
installAofRewriteEvent();
}
/* Write the buffer (possibly composed of multiple blocks) into the specified
@ -349,6 +356,9 @@ void flushAppendOnlyFile(int force) {
int sync_in_progress = 0;
mstime_t latency;
if (serverTL->fRetrySetAofEvent)
installAofRewriteEvent();
if (sdslen(g_pserver->aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is

View File

@ -1003,7 +1003,7 @@ int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
}
int chooseBestThreadForAccept(int ielCur)
int chooseBestThreadForAccept()
{
listIter li;
listNode *ln;
@ -1133,7 +1133,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (!g_fTestMode)
{
{
int ielTarget = chooseBestThreadForAccept(ielCur);
int ielTarget = chooseBestThreadForAccept();
if (ielTarget != ielCur)
{
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);

View File

@ -2884,6 +2884,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
pvar->current_client = nullptr;
pvar->clients_paused = 0;
pvar->fRetrySetAofEvent = false;
if (pvar->el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",

View File

@ -1526,6 +1526,7 @@ struct redisServerThreadVars {
struct fastlock lockPendingWrite { "thread pending write" };
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0;
bool fRetrySetAofEvent = false;
};
struct redisMaster {