Two fixes: 1) Remove race conditions by not locking clients when async writing. 2) Don't derefence dangling pointers in lambda
Former-commit-id: cb93752aff4c67d4475e9ed17833335716c45744
This commit is contained in:
parent
1e952f28c2
commit
bd9ea70609
@ -248,7 +248,11 @@ void clientInstallAsyncWriteHandler(client *c) {
|
|||||||
int prepareClientToWrite(client *c, bool fAsync) {
|
int prepareClientToWrite(client *c, bool fAsync) {
|
||||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||||
serverAssert(FCorrectThread(c) || fAsync);
|
serverAssert(FCorrectThread(c) || fAsync);
|
||||||
|
if (FCorrectThread(c)) {
|
||||||
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
|
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
|
||||||
|
} else {
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
}
|
||||||
|
|
||||||
if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer.
|
if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer.
|
||||||
// do not install a write handler
|
// do not install a write handler
|
||||||
|
@ -385,7 +385,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
||||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||||
std::unique_lock<decltype(replica->lock)> lock(replica->lock);
|
std::unique_lock<decltype(replica->lock)> lock(replica->lock, std::defer_lock);
|
||||||
|
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
|
||||||
|
if (FCorrectThread(replica))
|
||||||
|
lock.lock();
|
||||||
if (serverTL->current_client && FSameHost(serverTL->current_client, replica))
|
if (serverTL->current_client && FSameHost(serverTL->current_client, replica))
|
||||||
{
|
{
|
||||||
replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start;
|
replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start;
|
||||||
@ -434,7 +437,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
|
|||||||
|
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *replica = (client*)ln->value;
|
client *replica = (client*)ln->value;
|
||||||
std::lock_guard<decltype(replica->lock)> ulock(replica->lock);
|
std::unique_lock<decltype(replica->lock)> ulock(replica->lock, std::defer_lock);
|
||||||
|
if (FCorrectThread(replica))
|
||||||
|
ulock.lock();
|
||||||
if (FMasterHost(replica))
|
if (FMasterHost(replica))
|
||||||
continue; // Active Active case, don't feed back
|
continue; // Active Active case, don't feed back
|
||||||
|
|
||||||
@ -483,7 +488,10 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
|||||||
listRewind(monitors,&li);
|
listRewind(monitors,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *monitor = (client*)ln->value;
|
client *monitor = (client*)ln->value;
|
||||||
std::lock_guard<decltype(monitor->lock)> lock(monitor->lock);
|
std::unique_lock<decltype(monitor->lock)> lock(monitor->lock, std::defer_lock);
|
||||||
|
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
|
||||||
|
if (FCorrectThread(c))
|
||||||
|
lock.lock();
|
||||||
addReplyAsync(monitor,cmdobj);
|
addReplyAsync(monitor,cmdobj);
|
||||||
}
|
}
|
||||||
decrRefCount(cmdobj);
|
decrRefCount(cmdobj);
|
||||||
@ -1207,6 +1215,20 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] {
|
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] {
|
||||||
|
// Because the client could have been closed while the lambda waited to run we need to
|
||||||
|
// verify the replica is still connected
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(g_pserver->slaves,&li);
|
||||||
|
bool fFound = false;
|
||||||
|
while ((ln = listNext(&li))) {
|
||||||
|
if (listNodeValue(ln) == replica) {
|
||||||
|
fFound = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!fFound)
|
||||||
|
return;
|
||||||
aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
|
aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
|
||||||
if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
|
if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user