Durability enhancement for appendfsync=always policy (#9678)

Durability of database is a big and old topic, in this regard Redis use AOF to
support it, and `appendfsync=alwasys` policy is the most strict level, guarantee
all data is both written and synced on disk before reply success to client.

But there are some cases have been overlooked, and could lead to durability broken.

1. The most clear one is about threaded-io mode
   we should also set client's write handler with `ae_barrier` in
   `handleClientsWithPendingWritesUsingThreads`, or the write handler would be
   called after read handler in the next event loop, it means the write command result
   could be replied to client before flush to AOF.
2. About blocked client (mostly by module)
   in `beforeSleep()`, `handleClientsBlockedOnKeys()` should be called before
   `flushAppendOnlyFile()`, in case the unblocked clients modify data without persistence
   but send reply.
3. When handling `ProcessingEventsWhileBlocked`
   normally it takes place when lua/function/module timeout, and we give a chance to users
   to kill the slow operation, but we should call `flushAppendOnlyFile()` before
   `handleClientsWithPendingWrites()`, in case the other clients in the last event loop get
   acknowledge before data persistence.
   for a instance:
   ```
   in the same event loop
   client A executes set foo bar
   client B executes eval "for var=1,10000000,1 do end" 0
   ```
   after the script timeout, client A will get `OK` but lose data after restart (kill redis when
   timeout) if we don't flush the write command to AOF.
4. A more complex case about `ProcessingEventsWhileBlocked`
   it is lua timeout in transaction, for example
   `MULTI; set foo bar; eval "for var=1,10000000,1 do end" 0; EXEC`, then client will get set
   command's result before the whole transaction done, that breaks atomicity too.
   fortunately, it's already fixed by #5428 (although it's not the original purpose just a side
   effect : )), but module timeout should be fixed too.

case 1, 2, 3 are fixed in this commit, the module issue in case 4 needs a followup PR.
This commit is contained in:
zhaozhao.zz 2022-04-11 16:08:39 +08:00 committed by GitHub
parent 574ed6b0ce
commit 1a7765cb7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 33 deletions

View File

@ -215,6 +215,23 @@ client *createClient(connection *conn) {
return c;
}
void installClientWriteHandler(client *c) {
int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
/* This function puts the client in the queue of clients that should write
* their output buffers to the socket. Note that it does not *yet* install
* the write handler, to start clients are put in a queue of clients that need
@ -222,7 +239,7 @@ client *createClient(connection *conn) {
* handleClientsWithPendingWrites() function).
* If we fail and there is more data to write, compared to what the socket
* buffers can hold, then we'll really install the handler. */
void clientInstallWriteHandler(client *c) {
void putClientInPendingWriteQueue(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
@ -285,11 +302,11 @@ int prepareClientToWrite(client *c) {
* it should already be setup to do so (it has already pending data).
*
* If CLIENT_PENDING_READ is set, we're in an IO thread and should
* not install a write handler. Instead, it will be done by
* handleClientsWithPendingReadsUsingThreads() upon return.
* not put the client in pending write queue. Instead, it will be
* done by handleClientsWithPendingReadsUsingThreads() upon return.
*/
if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
clientInstallWriteHandler(c);
putClientInPendingWriteQueue(c);
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
@ -1995,20 +2012,7 @@ int handleClientsWithPendingWrites(void) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
installClientWriteHandler(c);
}
}
return processed;
@ -2075,7 +2079,7 @@ void unprotectClient(client *c) {
c->flags &= ~CLIENT_PROTECTED;
if (c->conn) {
connSetReadHandler(c->conn,readQueryFromClient);
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
}
}
}
@ -4212,10 +4216,8 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
}
}
listEmpty(server.clients_pending_write);
@ -4327,10 +4329,10 @@ int handleClientsWithPendingReadsUsingThreads(void) {
}
/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not install a write handler (it can't).
* replies and did not put the client in pending write queue (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
clientInstallWriteHandler(c);
putClientInPendingWriteQueue(c);
}
/* Update processed count on server */

View File

@ -1285,7 +1285,7 @@ void replicaStartCommandStream(client *slave) {
return;
}
clientInstallWriteHandler(slave);
putClientInPendingWriteQueue(slave);
}
/* We call this function periodically to remove an RDB file that was

View File

@ -1509,6 +1509,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
uint64_t processed = 0;
processed += handleClientsWithPendingReadsUsingThreads();
processed += tlsProcessPendingData();
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
flushAppendOnlyFile(0);
processed += handleClientsWithPendingWrites();
processed += freeClientsInAsyncFreeQueue();
server.events_processed_while_blocked += processed;
@ -1584,15 +1586,21 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
/* Write the AOF buffer on disk */
/* Try to process blocked clients every once in while.
*
* Example: A module calls RM_SignalKeyAsReady from within a timer callback
* (So we don't visit processCommand() at all).
*
* must be done before flushAppendOnlyFile, in case of appendfsync=always,
* since the unblocked clients may write data. */
handleClientsBlockedOnKeys();
/* Write the AOF buffer on disk,
* must be done before handleClientsWithPendingWritesUsingThreads,
* in case of appendfsync=always. */
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
flushAppendOnlyFile(0);
/* Try to process blocked clients every once in while. Example: A module
* calls RM_SignalKeyAsReady from within a timer callback (So we don't
* visit processCommand() at all). */
handleClientsBlockedOnKeys();
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();

View File

@ -2510,7 +2510,7 @@ void unprotectClient(client *c);
void initThreadedIO(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void clientInstallWriteHandler(client *c);
void putClientInPendingWriteQueue(client *c);
#ifdef __GNUC__
void addReplyErrorFormatEx(client *c, int flags, const char *fmt, ...)