Fix replica buffer overflows
Former-commit-id: 738c782f02517744662991091beb3f724661317e
This commit is contained in:
parent
9c65bd8f3d
commit
91388bd42d
@ -1911,12 +1911,13 @@ void ProcessPendingAsyncWrites()
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (!c->fPendingAsyncWriteHandler) {
|
bool expected = false;
|
||||||
c->fPendingAsyncWriteHandler = true;
|
if (c->fPendingAsyncWriteHandler.compare_exchange_strong(expected, true)) {
|
||||||
bool fResult = c->postFunction([](client *c) {
|
bool fResult = c->postFunction([](client *c) {
|
||||||
c->fPendingAsyncWriteHandler = false;
|
c->fPendingAsyncWriteHandler = false;
|
||||||
connSetWriteHandler(c->conn, sendReplyToClient, true);
|
clientInstallWriteHandler(c);
|
||||||
});
|
handleClientsWithPendingWrites(c->iel, g_pserver->aof_state);
|
||||||
|
}, false);
|
||||||
|
|
||||||
if (!fResult)
|
if (!fResult)
|
||||||
c->fPendingAsyncWriteHandler = false; // if we failed to set the handler then prevent this from never being reset
|
c->fPendingAsyncWriteHandler = false; // if we failed to set the handler then prevent this from never being reset
|
||||||
|
@ -3992,13 +3992,13 @@ int processCommand(client *c, int callFlags) {
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool client::postFunction(std::function<void(client *)> fn) {
|
bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
|
||||||
this->casyncOpsPending++;
|
this->casyncOpsPending++;
|
||||||
return aePostFunction(g_pserver->rgthreadvar[this->iel].el, [this, fn]{
|
return aePostFunction(g_pserver->rgthreadvar[this->iel].el, [this, fn]{
|
||||||
std::lock_guard<decltype(this->lock)> lock(this->lock);
|
std::lock_guard<decltype(this->lock)> lock(this->lock);
|
||||||
--casyncOpsPending;
|
--casyncOpsPending;
|
||||||
fn(this);
|
fn(this);
|
||||||
}) == AE_OK;
|
}, false, fLock) == AE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*================================== Shutdown =============================== */
|
/*================================== Shutdown =============================== */
|
||||||
|
@ -1083,7 +1083,7 @@ typedef struct client {
|
|||||||
std::atomic<uint64_t> flags; /* Client flags: CLIENT_* macros. */
|
std::atomic<uint64_t> flags; /* Client flags: CLIENT_* macros. */
|
||||||
int casyncOpsPending;
|
int casyncOpsPending;
|
||||||
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
|
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
|
||||||
int fPendingAsyncWriteHandler;
|
std::atomic<bool> fPendingAsyncWriteHandler;
|
||||||
int authenticated; /* Needed when the default user requires auth. */
|
int authenticated; /* Needed when the default user requires auth. */
|
||||||
int replstate; /* Replication state if this is a replica. */
|
int replstate; /* Replication state if this is a replica. */
|
||||||
int repl_put_online_on_ack; /* Install replica write handler on ACK. */
|
int repl_put_online_on_ack; /* Install replica write handler on ACK. */
|
||||||
@ -1153,7 +1153,7 @@ typedef struct client {
|
|||||||
int master_error;
|
int master_error;
|
||||||
|
|
||||||
// post a function from a non-client thread to run on its client thread
|
// post a function from a non-client thread to run on its client thread
|
||||||
bool postFunction(std::function<void(client *)> fn);
|
bool postFunction(std::function<void(client *)> fn, bool fLock = true);
|
||||||
} client;
|
} client;
|
||||||
|
|
||||||
struct saveparam {
|
struct saveparam {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user