Cleanup lock contention, and ensure clients are written to in an unsafe way when the global lock is released
This commit is contained in:
parent
627c19e02f
commit
8e8abb2ff9
@ -641,6 +641,7 @@ struct client *createFakeClient(void) {
|
||||
c->argv = NULL;
|
||||
c->bufpos = 0;
|
||||
c->flags = 0;
|
||||
c->fPendingAsyncWrite = FALSE;
|
||||
c->btype = BLOCKED_NONE;
|
||||
/* We set the fake client as a slave waiting for the synchronization
|
||||
* so that Redis will not try to send replies to this client. */
|
||||
|
@ -124,6 +124,8 @@ void processUnblockedClients(int iel) {
|
||||
c = ln->value;
|
||||
listDelNode(unblocked_clients,ln);
|
||||
AssertCorrectThread(c);
|
||||
|
||||
fastlock_lock(&c->lock);
|
||||
c->flags &= ~CLIENT_UNBLOCKED;
|
||||
|
||||
/* Process remaining data in the input buffer, unless the client
|
||||
@ -135,6 +137,7 @@ void processUnblockedClients(int iel) {
|
||||
processInputBufferAndReplicate(c);
|
||||
}
|
||||
}
|
||||
fastlock_unlock(&c->lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,15 +31,16 @@ extern "C" void fastlock_lock(struct fastlock *lock)
|
||||
return;
|
||||
}
|
||||
|
||||
int cloops = 1;
|
||||
while (!__sync_bool_compare_and_swap(&lock->m_lock, 0, 1))
|
||||
{
|
||||
sched_yield();
|
||||
if ((++cloops % 1024*1024) == 0)
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
|
||||
lock->m_depth = 1;
|
||||
lock->m_pidOwner = gettid();
|
||||
__sync_synchronize();
|
||||
}
|
||||
|
||||
extern "C" void fastlock_unlock(struct fastlock *lock)
|
||||
|
BIN
src/john@18.191.254.20
Executable file
BIN
src/john@18.191.254.20
Executable file
Binary file not shown.
@ -3700,6 +3700,7 @@ void moduleHandleBlockedClients(void) {
|
||||
!(c->flags & CLIENT_PENDING_WRITE))
|
||||
{
|
||||
c->flags |= CLIENT_PENDING_WRITE;
|
||||
AssertCorrectThread(c);
|
||||
listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c);
|
||||
}
|
||||
}
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include <math.h>
|
||||
#include <ctype.h>
|
||||
#include <vector>
|
||||
#include <mutex>
|
||||
|
||||
static void setProtocolError(const char *errstr, client *c);
|
||||
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
|
||||
@ -145,6 +146,7 @@ client *createClient(int fd, int iel) {
|
||||
uint64_t client_id;
|
||||
atomicGetIncr(server.next_client_id,client_id,1);
|
||||
c->iel = iel;
|
||||
fastlock_init(&c->lock);
|
||||
c->id = client_id;
|
||||
c->resp = 2;
|
||||
c->fd = fd;
|
||||
@ -164,6 +166,7 @@ client *createClient(int fd, int iel) {
|
||||
c->sentlen = 0;
|
||||
c->sentlenAsync = 0;
|
||||
c->flags = 0;
|
||||
c->fPendingAsyncWrite = FALSE;
|
||||
c->ctime = c->lastinteraction = server.unixtime;
|
||||
/* If the default user does not require authentication, the user is
|
||||
* directly authenticated. */
|
||||
@ -223,7 +226,6 @@ void clientInstallWriteHandler(client *c) {
|
||||
/* Schedule the client to write the output buffers to the socket only
|
||||
* if not already done and, for slaves, if the slave can actually receive
|
||||
* writes at this stage. */
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
if (!(c->flags & CLIENT_PENDING_WRITE) &&
|
||||
(c->replstate == REPL_STATE_NONE ||
|
||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||
@ -242,8 +244,8 @@ void clientInstallWriteHandler(client *c) {
|
||||
|
||||
void clientInstallAsyncWriteHandler(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
if (!(c->flags & CLIENT_PENDING_ASYNCWRITE)) {
|
||||
c->flags |= CLIENT_PENDING_ASYNCWRITE;
|
||||
if (!(c->fPendingAsyncWrite)) {
|
||||
c->fPendingAsyncWrite = TRUE;
|
||||
listAddNodeHead(serverTL->clients_pending_asyncwrite,c);
|
||||
}
|
||||
}
|
||||
@ -271,8 +273,8 @@ void clientInstallAsyncWriteHandler(client *c) {
|
||||
* data to the clients output buffers. If the function returns C_ERR no
|
||||
* data should be appended to the output buffers. */
|
||||
int prepareClientToWrite(client *c, bool fAsync) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||
serverAssert(!fAsync || aeThreadOwnsLock());
|
||||
|
||||
/* If it's the Lua client we always return ok without installing any
|
||||
* handler since there is no socket at all. */
|
||||
@ -291,7 +293,7 @@ int prepareClientToWrite(client *c, bool fAsync) {
|
||||
/* Schedule the client to write the output buffers to the socket, unless
|
||||
* it should already be setup to do so (it has already pending data). */
|
||||
if (!fAsync && !clientHasPendingReplies(c, FALSE)) clientInstallWriteHandler(c);
|
||||
if (fAsync && !(c->flags & CLIENT_PENDING_ASYNCWRITE)) clientInstallAsyncWriteHandler(c);
|
||||
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
|
||||
|
||||
/* Authorize the caller to queue in the output buffer of this client. */
|
||||
return C_OK;
|
||||
@ -1213,7 +1215,7 @@ void unlinkClient(client *c) {
|
||||
c->flags &= ~CLIENT_UNBLOCKED;
|
||||
}
|
||||
|
||||
if (c->flags & CLIENT_PENDING_ASYNCWRITE) {
|
||||
if (c->fPendingAsyncWrite) {
|
||||
ln = NULL;
|
||||
int iel = 0;
|
||||
for (; iel < server.cthreads; ++iel)
|
||||
@ -1224,7 +1226,7 @@ void unlinkClient(client *c) {
|
||||
}
|
||||
serverAssert(ln != NULL);
|
||||
listDelNode(server.rgthreadvar[iel].clients_pending_asyncwrite,ln);
|
||||
c->flags &= ~CLIENT_PENDING_ASYNCWRITE;
|
||||
c->fPendingAsyncWrite = FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1329,6 +1331,7 @@ void freeClient(client *c) {
|
||||
zfree(c->argv);
|
||||
freeClientMultiState(c);
|
||||
sdsfree(c->peerid);
|
||||
fastlock_free(&c->lock);
|
||||
zfree(c);
|
||||
}
|
||||
|
||||
@ -1381,6 +1384,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
// a transmission on another thread while transmitting the thread local buffer, resulting in us
|
||||
// overlapping messages
|
||||
AeLocker locker(true);
|
||||
std::lock_guard<decltype(c->lock)> lock(c->lock); // To prevent deadlocks this must be after we acquire the global lock
|
||||
int fSendAsyncBuffer = listLength(c->listbufferDoneAsync) && (c->sentlen == 0 || c->sentlenAsync > 0);
|
||||
if (!fSendAsyncBuffer)
|
||||
locker.disarm();
|
||||
@ -1525,8 +1529,9 @@ void ProcessPendingAsyncWrites()
|
||||
while(listLength(serverTL->clients_pending_asyncwrite)) {
|
||||
client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite));
|
||||
listDelNode(serverTL->clients_pending_asyncwrite, listFirst(serverTL->clients_pending_asyncwrite));
|
||||
std::lock_guard<decltype(c->lock)> lock(c->lock);
|
||||
|
||||
serverAssert(c->flags & CLIENT_PENDING_ASYNCWRITE);
|
||||
serverAssert(c->fPendingAsyncWrite);
|
||||
|
||||
// TODO: Append to end of reply block?
|
||||
|
||||
@ -1541,7 +1546,7 @@ void ProcessPendingAsyncWrites()
|
||||
c->buflenAsync = 0;
|
||||
zfree(c->bufAsync);
|
||||
c->bufAsync = nullptr;
|
||||
c->flags &= ~CLIENT_PENDING_ASYNCWRITE;
|
||||
c->fPendingAsyncWrite = FALSE;
|
||||
|
||||
// Now install the write event handler
|
||||
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
|
||||
@ -1572,8 +1577,6 @@ void ProcessPendingAsyncWrites()
|
||||
int handleClientsWithPendingWrites(int iel) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
AeLocker locker(true);
|
||||
|
||||
list *list = server.rgthreadvar[iel].clients_pending_write;
|
||||
int processed = listLength(list);
|
||||
@ -1582,6 +1585,8 @@ int handleClientsWithPendingWrites(int iel) {
|
||||
listRewind(list,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *c = (client*)listNodeValue(ln);
|
||||
std::lock_guard<decltype(c->lock)> lock(c->lock);
|
||||
|
||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||
listDelNode(list,ln);
|
||||
AssertCorrectThread(c);
|
||||
@ -1613,6 +1618,7 @@ int handleClientsWithPendingWrites(int iel) {
|
||||
}
|
||||
}
|
||||
|
||||
AeLocker locker(true);
|
||||
ProcessPendingAsyncWrites();
|
||||
|
||||
return processed;
|
||||
@ -1921,8 +1927,8 @@ int processMultibulkBuffer(client *c) {
|
||||
* or because a client was blocked and later reactivated, so there could be
|
||||
* pending query buffer, already representing a full command, to process. */
|
||||
void processInputBuffer(client *c) {
|
||||
server.current_client = c;
|
||||
|
||||
AssertCorrectThread(c);
|
||||
|
||||
/* Keep processing while there is something in the input buffer */
|
||||
while(c->qb_pos < sdslen(c->querybuf)) {
|
||||
/* Return if clients are paused. */
|
||||
@ -1960,6 +1966,8 @@ void processInputBuffer(client *c) {
|
||||
} else {
|
||||
serverPanic("Unknown request type");
|
||||
}
|
||||
AeLocker locker(true);
|
||||
server.current_client = c;
|
||||
|
||||
/* Multibulk processing could see a <= 0 length. */
|
||||
if (c->argc == 0) {
|
||||
@ -2007,8 +2015,10 @@ void processInputBufferAndReplicate(client *c) {
|
||||
processInputBuffer(c);
|
||||
size_t applied = c->reploff - prev_offset;
|
||||
if (applied) {
|
||||
aeAcquireLock();
|
||||
replicationFeedSlavesFromMasterStream(server.slaves,
|
||||
c->pending_querybuf, applied);
|
||||
aeReleaseLock();
|
||||
sdsrange(c->pending_querybuf,applied,-1);
|
||||
}
|
||||
}
|
||||
@ -2024,6 +2034,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
serverAssert(c->iel == ielFromEventLoop(el));
|
||||
AeLocker locker;
|
||||
AssertCorrectThread(c);
|
||||
std::lock_guard<decltype(c->lock)> lock(c->lock);
|
||||
|
||||
readlen = PROTO_IOBUF_LEN;
|
||||
/* If this is a multi bulk request, and we are processing a bulk reply
|
||||
@ -2047,19 +2058,22 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
|
||||
|
||||
nread = read(fd, c->querybuf+qblen, readlen);
|
||||
locker.arm();
|
||||
|
||||
if (nread == -1) {
|
||||
if (errno == EAGAIN) {
|
||||
return;
|
||||
} else {
|
||||
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
|
||||
aeAcquireLock();
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
return;
|
||||
}
|
||||
} else if (nread == 0) {
|
||||
serverLog(LL_VERBOSE, "Client closed connection");
|
||||
aeAcquireLock();
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
return;
|
||||
} else if (c->flags & CLIENT_MASTER) {
|
||||
/* Append the query buffer to the pending (not applied) buffer
|
||||
@ -2080,7 +2094,9 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
|
||||
sdsfree(ci);
|
||||
sdsfree(bytes);
|
||||
aeAcquireLock();
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -2091,7 +2107,9 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
* corresponding part of the replication stream, will be propagated to
|
||||
* the sub-slaves and to the replication backlog. */
|
||||
processInputBufferAndReplicate(c);
|
||||
aeAcquireLock();
|
||||
ProcessPendingAsyncWrites();
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
||||
|
@ -864,7 +864,7 @@ void putSlaveOnline(client *slave) {
|
||||
slave->repl_put_online_on_ack = 0;
|
||||
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
||||
//AssertCorrectThread(slave);
|
||||
if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE,
|
||||
if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE|AE_WRITE_THREADSAFE,
|
||||
sendReplyToClient, slave) == AE_ERR) {
|
||||
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
|
||||
freeClient(slave);
|
||||
@ -2288,7 +2288,7 @@ void replicationResurrectCachedMaster(int newfd) {
|
||||
/* We may also need to install the write handler as well if there is
|
||||
* pending data in the write buffers. */
|
||||
if (clientHasPendingReplies(server.master, TRUE)) {
|
||||
if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE,
|
||||
if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE|AE_WRITE_THREADSAFE,
|
||||
sendReplyToClient, server.master)) {
|
||||
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
|
||||
freeClientAsync(server.master); /* Close ASAP. */
|
||||
|
@ -4839,7 +4839,7 @@ void *workerThreadMain(void *parg)
|
||||
|
||||
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
|
||||
aeEventLoop *el = server.rgthreadvar[iel].el;
|
||||
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, /*isMainThread ? 0 : AE_SLEEP_THREADSAFE*/ 0);
|
||||
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
|
||||
aeSetAfterSleepProc(el, isMainThread ? afterSleep : NULL, 0);
|
||||
aeMain(el);
|
||||
aeDeleteEventLoop(el);
|
||||
@ -5017,7 +5017,7 @@ int main(int argc, char **argv) {
|
||||
initServer();
|
||||
|
||||
server.cthreads = 4; //testing
|
||||
initNetworking(0 /* fReusePort */);
|
||||
initNetworking(1 /* fReusePort */);
|
||||
|
||||
if (background || server.pidfile) createPidFile();
|
||||
redisSetProcTitle(argv[0]);
|
||||
|
@ -293,7 +293,6 @@ extern "C" {
|
||||
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
|
||||
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
|
||||
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
|
||||
#define CLIENT_PENDING_ASYNCWRITE (1<<29) /* client is in the async write list */
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if CLIENT_BLOCKED flag is set. */
|
||||
@ -833,6 +832,7 @@ typedef struct client {
|
||||
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
||||
time_t obuf_soft_limit_reached_time;
|
||||
int flags; /* Client flags: CLIENT_* macros. */
|
||||
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
|
||||
int authenticated; /* Needed when the default user requires auth. */
|
||||
int replstate; /* Replication state if this is a slave. */
|
||||
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
|
||||
@ -873,6 +873,7 @@ typedef struct client {
|
||||
list *listbufferDoneAsync;
|
||||
|
||||
int iel; /* the event loop index we're registered with */
|
||||
struct fastlock lock;
|
||||
} client;
|
||||
|
||||
struct saveparam {
|
||||
|
Loading…
x
Reference in New Issue
Block a user