During AOF reload we can erroneously read incorrect aof_state values, so this variable must be read with the global lock acquired
Former-commit-id: 6ff9d23fd4541a011d754209d9fda3ef3af4a7f9
This commit is contained in:
parent
32b028b9cb
commit
822f64ed2f
@ -1856,7 +1856,7 @@ void ProcessPendingAsyncWrites()
|
|||||||
* we can just write the replies to the client output buffer without any
|
* we can just write the replies to the client output buffer without any
|
||||||
* need to use a syscall in order to install the writable event handler,
|
* need to use a syscall in order to install the writable event handler,
|
||||||
* get it called, and so forth. */
|
* get it called, and so forth. */
|
||||||
int handleClientsWithPendingWrites(int iel) {
|
int handleClientsWithPendingWrites(int iel, int aof_state) {
|
||||||
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[iel].lockPendingWrite);
|
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[iel].lockPendingWrite);
|
||||||
auto &vec = g_pserver->rgthreadvar[iel].clients_pending_write;
|
auto &vec = g_pserver->rgthreadvar[iel].clients_pending_write;
|
||||||
int processed = (int)vec.size();
|
int processed = (int)vec.size();
|
||||||
@ -1868,7 +1868,7 @@ int handleClientsWithPendingWrites(int iel) {
|
|||||||
* so that in the middle of receiving the query, and serving it
|
* so that in the middle of receiving the query, and serving it
|
||||||
* to the client, we'll call beforeSleep() that will do the
|
* to the client, we'll call beforeSleep() that will do the
|
||||||
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
|
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
|
||||||
if (g_pserver->aof_state == AOF_ON &&
|
if (aof_state == AOF_ON &&
|
||||||
g_pserver->aof_fsync == AOF_FSYNC_ALWAYS)
|
g_pserver->aof_fsync == AOF_FSYNC_ALWAYS)
|
||||||
{
|
{
|
||||||
ae_flags |= AE_BARRIER;
|
ae_flags |= AE_BARRIER;
|
||||||
@ -3359,6 +3359,7 @@ int processEventsWhileBlocked(int iel) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int aof_state = g_pserver->aof_state;
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
serverAssertDebug(!GlobalLocksAcquired());
|
serverAssertDebug(!GlobalLocksAcquired());
|
||||||
try
|
try
|
||||||
@ -3366,7 +3367,7 @@ int processEventsWhileBlocked(int iel) {
|
|||||||
while (iterations--) {
|
while (iterations--) {
|
||||||
int events = 0;
|
int events = 0;
|
||||||
events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
||||||
events += handleClientsWithPendingWrites(iel);
|
events += handleClientsWithPendingWrites(iel, aof_state);
|
||||||
if (!events) break;
|
if (!events) break;
|
||||||
count += events;
|
count += events;
|
||||||
}
|
}
|
||||||
|
@ -2191,8 +2191,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
flushAppendOnlyFile(0);
|
flushAppendOnlyFile(0);
|
||||||
|
|
||||||
/* Handle writes with pending output buffers. */
|
/* Handle writes with pending output buffers. */
|
||||||
|
int aof_state = g_pserver->aof_state;
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN);
|
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN, aof_state);
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
|
|
||||||
/* Close clients that need to be closed asynchronous */
|
/* Close clients that need to be closed asynchronous */
|
||||||
@ -2217,10 +2218,11 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
|||||||
/* Check if there are clients unblocked by modules that implement
|
/* Check if there are clients unblocked by modules that implement
|
||||||
* blocking commands. */
|
* blocking commands. */
|
||||||
if (moduleCount()) moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
|
if (moduleCount()) moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
|
||||||
|
int aof_state = g_pserver->aof_state;
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
|
|
||||||
/* Handle writes with pending output buffers. */
|
/* Handle writes with pending output buffers. */
|
||||||
handleClientsWithPendingWrites(iel);
|
handleClientsWithPendingWrites(iel, aof_state);
|
||||||
|
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
/* Close clients that need to be closed asynchronous */
|
/* Close clients that need to be closed asynchronous */
|
||||||
|
@ -2203,7 +2203,7 @@ void pauseClients(mstime_t duration);
|
|||||||
int clientsArePaused(void);
|
int clientsArePaused(void);
|
||||||
void unpauseClientsIfNecessary();
|
void unpauseClientsIfNecessary();
|
||||||
int processEventsWhileBlocked(int iel);
|
int processEventsWhileBlocked(int iel);
|
||||||
int handleClientsWithPendingWrites(int iel);
|
int handleClientsWithPendingWrites(int iel, int aof_state);
|
||||||
int clientHasPendingReplies(client *c);
|
int clientHasPendingReplies(client *c);
|
||||||
void unlinkClient(client *c);
|
void unlinkClient(client *c);
|
||||||
int writeToClient(client *c, int handler_installed);
|
int writeToClient(client *c, int handler_installed);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user