Cluster multithreading fixes
Former-commit-id: 3dd78a6101df0a980e520dcb55d80651bfc5a3a7
This commit is contained in:
parent
b8cc7e2b9c
commit
a098681bbf
@ -617,6 +617,17 @@ clusterLink *createClusterLink(clusterNode *node) {
|
|||||||
* This function will just make sure that the original node associated
|
* This function will just make sure that the original node associated
|
||||||
* with this link will have the 'link' field set to NULL. */
|
* with this link will have the 'link' field set to NULL. */
|
||||||
void freeClusterLink(clusterLink *link) {
|
void freeClusterLink(clusterLink *link) {
|
||||||
|
if (ielFromEventLoop(serverTL->el) != IDX_EVENT_LOOP_MAIN)
|
||||||
|
{
|
||||||
|
// we can't perform this operation on this thread, queue it on the main thread
|
||||||
|
if (link->node)
|
||||||
|
link->node->link = NULL;
|
||||||
|
link->node = nullptr;
|
||||||
|
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
||||||
|
freeClusterLink(link);
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (link->fd != -1) {
|
if (link->fd != -1) {
|
||||||
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE);
|
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE);
|
||||||
}
|
}
|
||||||
@ -2139,21 +2150,35 @@ void handleLinkIOError(clusterLink *link) {
|
|||||||
* consumed by write(). We don't try to optimize this for speed too much
|
* consumed by write(). We don't try to optimize this for speed too much
|
||||||
* as this is a very low traffic channel. */
|
* as this is a very low traffic channel. */
|
||||||
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
|
serverAssert(ielFromEventLoop(el) == IDX_EVENT_LOOP_MAIN);
|
||||||
clusterLink *link = (clusterLink*) privdata;
|
clusterLink *link = (clusterLink*) privdata;
|
||||||
ssize_t nwritten;
|
ssize_t nwritten;
|
||||||
UNUSED(el);
|
UNUSED(el);
|
||||||
UNUSED(mask);
|
UNUSED(mask);
|
||||||
|
|
||||||
nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
|
// We're about to release the lock, so the link's sndbuf needs to be owned fully by us
|
||||||
|
// allocate a new one in case anyone tries to write while we're waiting
|
||||||
|
sds sndbuf = link->sndbuf;
|
||||||
|
link->sndbuf = sdsempty();
|
||||||
|
|
||||||
|
aeReleaseLock();
|
||||||
|
nwritten = write(fd, sndbuf, sdslen(sndbuf));
|
||||||
|
aeAcquireLock();
|
||||||
|
|
||||||
if (nwritten <= 0) {
|
if (nwritten <= 0) {
|
||||||
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
|
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
|
||||||
(nwritten == -1) ? strerror(errno) : "short write");
|
(nwritten == -1) ? strerror(errno) : "short write");
|
||||||
|
sdsfree(sndbuf);
|
||||||
handleLinkIOError(link);
|
handleLinkIOError(link);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
sdsrange(link->sndbuf,nwritten,-1);
|
sdsrange(sndbuf,nwritten,-1);
|
||||||
|
// Restore our send buffer, ensuring any unsent data is first
|
||||||
|
sndbuf = sdscat(sndbuf, link->sndbuf);
|
||||||
|
sdsfree(link->sndbuf);
|
||||||
|
link->sndbuf = sndbuf;
|
||||||
if (sdslen(link->sndbuf) == 0)
|
if (sdslen(link->sndbuf) == 0)
|
||||||
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_WRITABLE);
|
aeDeleteFileEvent(el, link->fd, AE_WRITABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Read data. Try to read the first field of the header first to check the
|
/* Read data. Try to read the first field of the header first to check the
|
||||||
@ -2228,9 +2253,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* the link to be invalidated, so it is safe to call this function
|
* the link to be invalidated, so it is safe to call this function
|
||||||
* from event handlers that will do stuff with the same link later. */
|
* from event handlers that will do stuff with the same link later. */
|
||||||
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
||||||
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER,
|
aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER,
|
||||||
clusterWriteHandler,link);
|
clusterWriteHandler,link,false);
|
||||||
|
|
||||||
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
||||||
|
|
||||||
@ -3284,7 +3310,7 @@ void clusterHandleSlaveMigration(int max_slaves) {
|
|||||||
void resetManualFailover(void) {
|
void resetManualFailover(void) {
|
||||||
if (g_pserver->cluster->mf_end && clientsArePaused()) {
|
if (g_pserver->cluster->mf_end && clientsArePaused()) {
|
||||||
g_pserver->clients_pause_end_time = 0;
|
g_pserver->clients_pause_end_time = 0;
|
||||||
clientsArePaused(); /* Just use the side effect of the function. */
|
unpauseClientsIfNecessary();
|
||||||
}
|
}
|
||||||
g_pserver->cluster->mf_end = 0; /* No manual failover in progress. */
|
g_pserver->cluster->mf_end = 0; /* No manual failover in progress. */
|
||||||
g_pserver->cluster->mf_can_start = 0;
|
g_pserver->cluster->mf_can_start = 0;
|
||||||
|
@ -2960,38 +2960,48 @@ void flushSlavesOutputBuffers(void) {
|
|||||||
* than the time left for the previous pause, no change is made to the
|
* than the time left for the previous pause, no change is made to the
|
||||||
* left duration. */
|
* left duration. */
|
||||||
void pauseClients(mstime_t end) {
|
void pauseClients(mstime_t end) {
|
||||||
if (!g_pserver->clients_paused || end > g_pserver->clients_pause_end_time)
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
if (!serverTL->clients_paused || end > g_pserver->clients_pause_end_time)
|
||||||
g_pserver->clients_pause_end_time = end;
|
g_pserver->clients_pause_end_time = end;
|
||||||
g_pserver->clients_paused = 1;
|
|
||||||
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
|
{
|
||||||
|
g_pserver->rgthreadvar[iel].clients_paused = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return non-zero if clients are currently paused. As a side effect the
|
/* Return non-zero if clients are currently paused. As a side effect the
|
||||||
* function checks if the pause time was reached and clear it. */
|
* function checks if the pause time was reached and clear it. */
|
||||||
int clientsArePaused(void) {
|
int clientsArePaused(void) {
|
||||||
if (g_pserver->clients_paused &&
|
return serverTL->clients_paused;
|
||||||
|
}
|
||||||
|
|
||||||
|
void unpauseClientsIfNecessary()
|
||||||
|
{
|
||||||
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
if (serverTL->clients_paused &&
|
||||||
g_pserver->clients_pause_end_time < g_pserver->mstime)
|
g_pserver->clients_pause_end_time < g_pserver->mstime)
|
||||||
{
|
{
|
||||||
aeAcquireLock();
|
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
client *c;
|
client *c;
|
||||||
|
|
||||||
g_pserver->clients_paused = 0;
|
serverTL->clients_paused = 0;
|
||||||
|
|
||||||
/* Put all the clients in the unblocked clients queue in order to
|
/* Put all the clients in the unblocked clients queue in order to
|
||||||
* force the re-processing of the input buffer if any. */
|
* force the re-processing of the input buffer if any. */
|
||||||
listRewind(g_pserver->clients,&li);
|
listRewind(g_pserver->clients,&li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
c = (client*)listNodeValue(ln);
|
c = (client*)listNodeValue(ln);
|
||||||
|
if (!FCorrectThread(c))
|
||||||
|
continue;
|
||||||
|
|
||||||
/* Don't touch slaves and blocked clients.
|
/* Don't touch slaves and blocked clients.
|
||||||
* The latter pending requests will be processed when unblocked. */
|
* The latter pending requests will be processed when unblocked. */
|
||||||
if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
|
if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
|
||||||
queueClientForReprocessing(c);
|
queueClientForReprocessing(c);
|
||||||
}
|
}
|
||||||
aeReleaseLock();
|
|
||||||
}
|
}
|
||||||
return g_pserver->clients_paused;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called by Redis in order to process a few events from
|
/* This function is called by Redis in order to process a few events from
|
||||||
|
@ -1692,6 +1692,9 @@ void clientsCron(int iel) {
|
|||||||
fastlock_unlock(&c->lock);
|
fastlock_unlock(&c->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Free any pending clients */
|
||||||
|
freeClientsInAsyncFreeQueue(iel);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function handles 'background' operations we are required to do
|
/* This function handles 'background' operations we are required to do
|
||||||
@ -1812,6 +1815,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
/* Update the time cache. */
|
/* Update the time cache. */
|
||||||
updateCachedTime();
|
updateCachedTime();
|
||||||
|
|
||||||
|
/* Unpause clients if enough time has elapsed */
|
||||||
|
unpauseClientsIfNecessary();
|
||||||
|
|
||||||
g_pserver->hz = g_pserver->config_hz;
|
g_pserver->hz = g_pserver->config_hz;
|
||||||
/* Adapt the g_pserver->hz value to the number of configured clients. If we have
|
/* Adapt the g_pserver->hz value to the number of configured clients. If we have
|
||||||
* many clients, we want to call serverCron() with an higher frequency. */
|
* many clients, we want to call serverCron() with an higher frequency. */
|
||||||
@ -1819,7 +1825,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
while (listLength(g_pserver->clients) / g_pserver->hz >
|
while (listLength(g_pserver->clients) / g_pserver->hz >
|
||||||
MAX_CLIENTS_PER_CLOCK_TICK)
|
MAX_CLIENTS_PER_CLOCK_TICK)
|
||||||
{
|
{
|
||||||
g_pserver->hz *= 2;
|
g_pserver->hz += g_pserver->hz; // *= 2
|
||||||
if (g_pserver->hz > CONFIG_MAX_HZ) {
|
if (g_pserver->hz > CONFIG_MAX_HZ) {
|
||||||
g_pserver->hz = CONFIG_MAX_HZ;
|
g_pserver->hz = CONFIG_MAX_HZ;
|
||||||
break;
|
break;
|
||||||
@ -2019,9 +2025,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
flushAppendOnlyFile(0);
|
flushAppendOnlyFile(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Clear the paused clients flag if needed. */
|
|
||||||
clientsArePaused(); /* Don't check return value, just use the side effect.*/
|
|
||||||
|
|
||||||
/* Replication cron function -- used to reconnect to master,
|
/* Replication cron function -- used to reconnect to master,
|
||||||
* detect transfer failures, start background RDB transfers and so forth. */
|
* detect transfer failures, start background RDB transfers and so forth. */
|
||||||
run_with_period(1000) replicationCron();
|
run_with_period(1000) replicationCron();
|
||||||
@ -2079,6 +2082,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
|||||||
processUnblockedClients(iel);
|
processUnblockedClients(iel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Unpause clients if enough time has elapsed */
|
||||||
|
unpauseClientsIfNecessary();
|
||||||
|
|
||||||
ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves
|
ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves
|
||||||
clientsCron(iel);
|
clientsCron(iel);
|
||||||
|
|
||||||
@ -2871,6 +2877,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
|||||||
pvar->cclients = 0;
|
pvar->cclients = 0;
|
||||||
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
|
||||||
pvar->current_client = nullptr;
|
pvar->current_client = nullptr;
|
||||||
|
pvar->clients_paused = 0;
|
||||||
if (pvar->el == NULL) {
|
if (pvar->el == NULL) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Failed creating the event loop. Error message: '%s'",
|
"Failed creating the event loop. Error message: '%s'",
|
||||||
@ -2967,7 +2974,6 @@ void initServer(void) {
|
|||||||
g_pserver->ready_keys = listCreate();
|
g_pserver->ready_keys = listCreate();
|
||||||
g_pserver->clients_waiting_acks = listCreate();
|
g_pserver->clients_waiting_acks = listCreate();
|
||||||
g_pserver->get_ack_from_slaves = 0;
|
g_pserver->get_ack_from_slaves = 0;
|
||||||
g_pserver->clients_paused = 0;
|
|
||||||
cserver.system_memory_size = zmalloc_get_memory_size();
|
cserver.system_memory_size = zmalloc_get_memory_size();
|
||||||
|
|
||||||
createSharedObjects();
|
createSharedObjects();
|
||||||
@ -4088,7 +4094,7 @@ sds genRedisInfoString(const char *section) {
|
|||||||
g_pserver->port,
|
g_pserver->port,
|
||||||
(intmax_t)uptime,
|
(intmax_t)uptime,
|
||||||
(intmax_t)(uptime/(3600*24)),
|
(intmax_t)(uptime/(3600*24)),
|
||||||
g_pserver->hz,
|
g_pserver->hz.load(),
|
||||||
g_pserver->config_hz,
|
g_pserver->config_hz,
|
||||||
(unsigned long) lruclock,
|
(unsigned long) lruclock,
|
||||||
cserver.executable ? cserver.executable : "",
|
cserver.executable ? cserver.executable : "",
|
||||||
|
@ -1426,6 +1426,7 @@ struct redisServerThreadVars {
|
|||||||
aeEventLoop *el;
|
aeEventLoop *el;
|
||||||
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
||||||
int ipfd_count; /* Used slots in ipfd[] */
|
int ipfd_count; /* Used slots in ipfd[] */
|
||||||
|
int clients_paused; /* True if clients are currently paused */
|
||||||
std::vector<client*> clients_pending_write; /* There is to write or install handler. */
|
std::vector<client*> clients_pending_write; /* There is to write or install handler. */
|
||||||
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
|
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
|
||||||
list *clients_pending_asyncwrite;
|
list *clients_pending_asyncwrite;
|
||||||
@ -1518,7 +1519,7 @@ struct redisServer {
|
|||||||
int config_hz; /* Configured HZ value. May be different than
|
int config_hz; /* Configured HZ value. May be different than
|
||||||
the actual 'hz' field value if dynamic-hz
|
the actual 'hz' field value if dynamic-hz
|
||||||
is enabled. */
|
is enabled. */
|
||||||
int hz; /* serverCron() calls frequency in hertz */
|
std::atomic<int> hz; /* serverCron() calls frequency in hertz */
|
||||||
redisDb *db;
|
redisDb *db;
|
||||||
dict *commands; /* Command table */
|
dict *commands; /* Command table */
|
||||||
dict *orig_commands; /* Command table before command renaming. */
|
dict *orig_commands; /* Command table before command renaming. */
|
||||||
@ -1553,7 +1554,6 @@ struct redisServer {
|
|||||||
list *clients_to_close; /* Clients to close asynchronously */
|
list *clients_to_close; /* Clients to close asynchronously */
|
||||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||||
int clients_paused; /* True if clients are currently paused */
|
|
||||||
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
|
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
|
||||||
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
|
||||||
std::atomic<uint64_t> next_client_id; /* Next client unique ID. Incremental. */
|
std::atomic<uint64_t> next_client_id; /* Next client unique ID. Incremental. */
|
||||||
@ -2042,6 +2042,7 @@ void disconnectSlavesExcept(unsigned char *uuid);
|
|||||||
int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen);
|
int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListen);
|
||||||
void pauseClients(mstime_t duration);
|
void pauseClients(mstime_t duration);
|
||||||
int clientsArePaused(void);
|
int clientsArePaused(void);
|
||||||
|
void unpauseClientsIfNecessary();
|
||||||
int processEventsWhileBlocked(int iel);
|
int processEventsWhileBlocked(int iel);
|
||||||
int handleClientsWithPendingWrites(int iel);
|
int handleClientsWithPendingWrites(int iel);
|
||||||
int clientHasPendingReplies(client *c);
|
int clientHasPendingReplies(client *c);
|
||||||
|
@ -14,6 +14,8 @@ proc main {} {
|
|||||||
spawn_instance redis $::redis_base_port $::instances_count {
|
spawn_instance redis $::redis_base_port $::instances_count {
|
||||||
"cluster-enabled yes"
|
"cluster-enabled yes"
|
||||||
"appendonly yes"
|
"appendonly yes"
|
||||||
|
"testmode yes"
|
||||||
|
"server-threads 3"
|
||||||
}
|
}
|
||||||
run_tests
|
run_tests
|
||||||
cleanup
|
cleanup
|
||||||
|
Loading…
x
Reference in New Issue
Block a user