Merge branch 'free_clients_during_loading' into unstable
This commit is contained in:
commit
c38fd1f661
10
src/ae.c
10
src/ae.c
@ -370,6 +370,7 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
|
|||||||
* if flags has AE_DONT_WAIT set the function returns ASAP until all
|
* if flags has AE_DONT_WAIT set the function returns ASAP until all
|
||||||
* the events that's possible to process without to wait are processed.
|
* the events that's possible to process without to wait are processed.
|
||||||
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
|
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
|
||||||
|
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
|
||||||
*
|
*
|
||||||
* The function returns the number of events processed. */
|
* The function returns the number of events processed. */
|
||||||
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||||
@ -428,6 +429,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
|||||||
tvp = &tv;
|
tvp = &tv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
|
||||||
|
eventLoop->beforesleep(eventLoop);
|
||||||
|
|
||||||
/* Call the multiplexing API, will return only on timeout or when
|
/* Call the multiplexing API, will return only on timeout or when
|
||||||
* some event fires. */
|
* some event fires. */
|
||||||
numevents = aeApiPoll(eventLoop, tvp);
|
numevents = aeApiPoll(eventLoop, tvp);
|
||||||
@ -522,9 +526,9 @@ int aeWait(int fd, int mask, long long milliseconds) {
|
|||||||
void aeMain(aeEventLoop *eventLoop) {
|
void aeMain(aeEventLoop *eventLoop) {
|
||||||
eventLoop->stop = 0;
|
eventLoop->stop = 0;
|
||||||
while (!eventLoop->stop) {
|
while (!eventLoop->stop) {
|
||||||
if (eventLoop->beforesleep != NULL)
|
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
|
||||||
eventLoop->beforesleep(eventLoop);
|
AE_CALL_BEFORE_SLEEP|
|
||||||
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
|
AE_CALL_AFTER_SLEEP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
9
src/ae.h
9
src/ae.h
@ -47,11 +47,12 @@
|
|||||||
things to disk before sending replies, and want
|
things to disk before sending replies, and want
|
||||||
to do that in a group fashion. */
|
to do that in a group fashion. */
|
||||||
|
|
||||||
#define AE_FILE_EVENTS 1
|
#define AE_FILE_EVENTS (1<<0)
|
||||||
#define AE_TIME_EVENTS 2
|
#define AE_TIME_EVENTS (1<<1)
|
||||||
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
|
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
|
||||||
#define AE_DONT_WAIT 4
|
#define AE_DONT_WAIT (1<<2)
|
||||||
#define AE_CALL_AFTER_SLEEP 8
|
#define AE_CALL_BEFORE_SLEEP (1<<3)
|
||||||
|
#define AE_CALL_AFTER_SLEEP (1<<4)
|
||||||
|
|
||||||
#define AE_NOMORE -1
|
#define AE_NOMORE -1
|
||||||
#define AE_DELETED_EVENT_ID -1
|
#define AE_DELETED_EVENT_ID -1
|
||||||
|
@ -222,6 +222,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len);
|
|||||||
|
|
||||||
/* Helpers for tls special considerations */
|
/* Helpers for tls special considerations */
|
||||||
int tlsHasPendingData();
|
int tlsHasPendingData();
|
||||||
void tlsProcessPendingData();
|
int tlsProcessPendingData();
|
||||||
|
|
||||||
#endif /* __REDIS_CONNECTION_H */
|
#endif /* __REDIS_CONNECTION_H */
|
||||||
|
@ -1257,7 +1257,10 @@ void freeClientAsync(client *c) {
|
|||||||
pthread_mutex_unlock(&async_free_queue_mutex);
|
pthread_mutex_unlock(&async_free_queue_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void freeClientsInAsyncFreeQueue(void) {
|
/* Free the clietns marked as CLOSE_ASAP, return the number of clients
|
||||||
|
* freed. */
|
||||||
|
int freeClientsInAsyncFreeQueue(void) {
|
||||||
|
int freed = listLength(server.clients_to_close);
|
||||||
while (listLength(server.clients_to_close)) {
|
while (listLength(server.clients_to_close)) {
|
||||||
listNode *ln = listFirst(server.clients_to_close);
|
listNode *ln = listFirst(server.clients_to_close);
|
||||||
client *c = listNodeValue(ln);
|
client *c = listNodeValue(ln);
|
||||||
@ -1266,6 +1269,7 @@ void freeClientsInAsyncFreeQueue(void) {
|
|||||||
freeClient(c);
|
freeClient(c);
|
||||||
listDelNode(server.clients_to_close,ln);
|
listDelNode(server.clients_to_close,ln);
|
||||||
}
|
}
|
||||||
|
return freed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return a client by ID, or NULL if the client ID is not in the set
|
/* Return a client by ID, or NULL if the client ID is not in the set
|
||||||
@ -2852,9 +2856,8 @@ int clientsArePaused(void) {
|
|||||||
* write, close sequence needed to serve a client.
|
* write, close sequence needed to serve a client.
|
||||||
*
|
*
|
||||||
* The function returns the total number of events processed. */
|
* The function returns the total number of events processed. */
|
||||||
int processEventsWhileBlocked(void) {
|
void processEventsWhileBlocked(void) {
|
||||||
int iterations = 4; /* See the function top-comment. */
|
int iterations = 4; /* See the function top-comment. */
|
||||||
int count = 0;
|
|
||||||
|
|
||||||
/* Note: when we are processing events while blocked (for instance during
|
/* Note: when we are processing events while blocked (for instance during
|
||||||
* busy Lua scripts), we set a global flag. When such flag is set, we
|
* busy Lua scripts), we set a global flag. When such flag is set, we
|
||||||
@ -2862,14 +2865,17 @@ int processEventsWhileBlocked(void) {
|
|||||||
* See https://github.com/antirez/redis/issues/6988 for more info. */
|
* See https://github.com/antirez/redis/issues/6988 for more info. */
|
||||||
ProcessingEventsWhileBlocked = 1;
|
ProcessingEventsWhileBlocked = 1;
|
||||||
while (iterations--) {
|
while (iterations--) {
|
||||||
int events = 0;
|
long long startval = server.events_processed_while_blocked;
|
||||||
events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
long long ae_events = aeProcessEvents(server.el,
|
||||||
events += handleClientsWithPendingWrites();
|
AE_FILE_EVENTS|AE_DONT_WAIT|
|
||||||
|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
|
||||||
|
/* Note that server.events_processed_while_blocked will also get
|
||||||
|
* incremeted by callbacks called by the event loop handlers. */
|
||||||
|
server.events_processed_while_blocked += ae_events;
|
||||||
|
long long events = server.events_processed_while_blocked - startval;
|
||||||
if (!events) break;
|
if (!events) break;
|
||||||
count += events;
|
|
||||||
}
|
}
|
||||||
ProcessingEventsWhileBlocked = 0;
|
ProcessingEventsWhileBlocked = 0;
|
||||||
return count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ==========================================================================
|
/* ==========================================================================
|
||||||
|
43
src/server.c
43
src/server.c
@ -2087,12 +2087,40 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
return 1000/server.hz;
|
return 1000/server.hz;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern int ProcessingEventsWhileBlocked;
|
||||||
|
|
||||||
/* This function gets called every time Redis is entering the
|
/* This function gets called every time Redis is entering the
|
||||||
* main loop of the event driven library, that is, before to sleep
|
* main loop of the event driven library, that is, before to sleep
|
||||||
* for ready file descriptors. */
|
* for ready file descriptors.
|
||||||
|
*
|
||||||
|
* Note: This function is (currently) called from two functions:
|
||||||
|
* 1. aeMain - The main server loop
|
||||||
|
* 2. processEventsWhileBlocked - Process clients during RDB/AOF load
|
||||||
|
*
|
||||||
|
* If it was called from processEventsWhileBlocked we don't want
|
||||||
|
* to perform all actions (For example, we don't want to expire
|
||||||
|
* keys), but we do need to perform some actions.
|
||||||
|
*
|
||||||
|
* The most important is freeClientsInAsyncFreeQueue but we also
|
||||||
|
* call some other low-risk functions. */
|
||||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||||
UNUSED(eventLoop);
|
UNUSED(eventLoop);
|
||||||
|
|
||||||
|
/* Just call a subset of vital functions in case we are re-entering
|
||||||
|
* the event loop from processEventsWhileBlocked(). Note that in this
|
||||||
|
* case we keep track of the number of events we are processing, since
|
||||||
|
* processEventsWhileBlocked() wants to stop ASAP if there are no longer
|
||||||
|
* events to handle. */
|
||||||
|
if (ProcessingEventsWhileBlocked) {
|
||||||
|
uint64_t processed = 0;
|
||||||
|
processed += handleClientsWithPendingReadsUsingThreads();
|
||||||
|
processed += tlsProcessPendingData();
|
||||||
|
processed += handleClientsWithPendingWrites();
|
||||||
|
processed += freeClientsInAsyncFreeQueue();
|
||||||
|
server.events_processed_while_blocked += processed;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* Handle precise timeouts of blocked clients. */
|
/* Handle precise timeouts of blocked clients. */
|
||||||
handleBlockedClientsTimeout();
|
handleBlockedClientsTimeout();
|
||||||
|
|
||||||
@ -2171,7 +2199,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
* the different events callbacks. */
|
* the different events callbacks. */
|
||||||
void afterSleep(struct aeEventLoop *eventLoop) {
|
void afterSleep(struct aeEventLoop *eventLoop) {
|
||||||
UNUSED(eventLoop);
|
UNUSED(eventLoop);
|
||||||
if (moduleCount()) moduleAcquireGIL();
|
|
||||||
|
if (!ProcessingEventsWhileBlocked) {
|
||||||
|
if (moduleCount()) moduleAcquireGIL();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* =========================== Server initialization ======================== */
|
/* =========================== Server initialization ======================== */
|
||||||
@ -2737,6 +2768,7 @@ void initServer(void) {
|
|||||||
server.clients_waiting_acks = listCreate();
|
server.clients_waiting_acks = listCreate();
|
||||||
server.get_ack_from_slaves = 0;
|
server.get_ack_from_slaves = 0;
|
||||||
server.clients_paused = 0;
|
server.clients_paused = 0;
|
||||||
|
server.events_processed_while_blocked = 0;
|
||||||
server.system_memory_size = zmalloc_get_memory_size();
|
server.system_memory_size = zmalloc_get_memory_size();
|
||||||
|
|
||||||
if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) {
|
if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) {
|
||||||
@ -2879,6 +2911,11 @@ void initServer(void) {
|
|||||||
"blocked clients subsystem.");
|
"blocked clients subsystem.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Register before and after sleep handlers (note this needs to be done
|
||||||
|
* before loading persistence since it is used by processEventsWhileBlocked. */
|
||||||
|
aeSetBeforeSleepProc(server.el,beforeSleep);
|
||||||
|
aeSetAfterSleepProc(server.el,afterSleep);
|
||||||
|
|
||||||
/* Open the AOF file if needed. */
|
/* Open the AOF file if needed. */
|
||||||
if (server.aof_state == AOF_ON) {
|
if (server.aof_state == AOF_ON) {
|
||||||
server.aof_fd = open(server.aof_filename,
|
server.aof_fd = open(server.aof_filename,
|
||||||
@ -5135,8 +5172,6 @@ int main(int argc, char **argv) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
redisSetCpuAffinity(server.server_cpulist);
|
redisSetCpuAffinity(server.server_cpulist);
|
||||||
aeSetBeforeSleepProc(server.el,beforeSleep);
|
|
||||||
aeSetAfterSleepProc(server.el,afterSleep);
|
|
||||||
aeMain(server.el);
|
aeMain(server.el);
|
||||||
aeDeleteEventLoop(server.el);
|
aeDeleteEventLoop(server.el);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1095,6 +1095,7 @@ struct redisServer {
|
|||||||
queries. Will still serve RESP2 queries. */
|
queries. Will still serve RESP2 queries. */
|
||||||
int io_threads_num; /* Number of IO threads to use. */
|
int io_threads_num; /* Number of IO threads to use. */
|
||||||
int io_threads_do_reads; /* Read and parse from IO threads? */
|
int io_threads_do_reads; /* Read and parse from IO threads? */
|
||||||
|
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
|
||||||
|
|
||||||
/* RDB / AOF loading information */
|
/* RDB / AOF loading information */
|
||||||
int loading; /* We are loading data from disk if true */
|
int loading; /* We are loading data from disk if true */
|
||||||
@ -1652,7 +1653,7 @@ void rewriteClientCommandVector(client *c, int argc, ...);
|
|||||||
void rewriteClientCommandArgument(client *c, int i, robj *newval);
|
void rewriteClientCommandArgument(client *c, int i, robj *newval);
|
||||||
void replaceClientCommandVector(client *c, int argc, robj **argv);
|
void replaceClientCommandVector(client *c, int argc, robj **argv);
|
||||||
unsigned long getClientOutputBufferMemoryUsage(client *c);
|
unsigned long getClientOutputBufferMemoryUsage(client *c);
|
||||||
void freeClientsInAsyncFreeQueue(void);
|
int freeClientsInAsyncFreeQueue(void);
|
||||||
void asyncCloseClientOnOutputBufferLimitReached(client *c);
|
void asyncCloseClientOnOutputBufferLimitReached(client *c);
|
||||||
int getClientType(client *c);
|
int getClientType(client *c);
|
||||||
int getClientTypeByName(char *name);
|
int getClientTypeByName(char *name);
|
||||||
@ -1662,7 +1663,7 @@ void disconnectSlaves(void);
|
|||||||
int listenToPort(int port, int *fds, int *count);
|
int listenToPort(int port, int *fds, int *count);
|
||||||
void pauseClients(mstime_t duration);
|
void pauseClients(mstime_t duration);
|
||||||
int clientsArePaused(void);
|
int clientsArePaused(void);
|
||||||
int processEventsWhileBlocked(void);
|
void processEventsWhileBlocked(void);
|
||||||
int handleClientsWithPendingWrites(void);
|
int handleClientsWithPendingWrites(void);
|
||||||
int handleClientsWithPendingWritesUsingThreads(void);
|
int handleClientsWithPendingWritesUsingThreads(void);
|
||||||
int handleClientsWithPendingReadsUsingThreads(void);
|
int handleClientsWithPendingReadsUsingThreads(void);
|
||||||
|
@ -768,15 +768,17 @@ int tlsHasPendingData() {
|
|||||||
return listLength(pending_list) > 0;
|
return listLength(pending_list) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tlsProcessPendingData() {
|
int tlsProcessPendingData() {
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
|
|
||||||
|
int processed = listLength(pending_list);
|
||||||
listRewind(pending_list,&li);
|
listRewind(pending_list,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
tls_connection *conn = listNodeValue(ln);
|
tls_connection *conn = listNodeValue(ln);
|
||||||
tlsHandleEvent(conn, AE_READABLE);
|
tlsHandleEvent(conn, AE_READABLE);
|
||||||
}
|
}
|
||||||
|
return processed;
|
||||||
}
|
}
|
||||||
|
|
||||||
#else /* USE_OPENSSL */
|
#else /* USE_OPENSSL */
|
||||||
@ -804,7 +806,8 @@ int tlsHasPendingData() {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tlsProcessPendingData() {
|
int tlsProcessPendingData() {
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -129,3 +129,55 @@ start_server {} {
|
|||||||
r set x xx
|
r set x xx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {client freed during loading} {
|
||||||
|
start_server [list overrides [list key-load-delay 10 rdbcompression no]] {
|
||||||
|
# create a big rdb that will take long to load. it is important
|
||||||
|
# for keys to be big since the server processes events only once in 2mb.
|
||||||
|
# 100mb of rdb, 100k keys will load in more than 1 second
|
||||||
|
r debug populate 100000 key 1000
|
||||||
|
|
||||||
|
catch {
|
||||||
|
r debug restart
|
||||||
|
}
|
||||||
|
|
||||||
|
set stdout [srv 0 stdout]
|
||||||
|
while 1 {
|
||||||
|
# check that the new server actually started and is ready for connections
|
||||||
|
if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
after 10
|
||||||
|
}
|
||||||
|
# make sure it's still loading
|
||||||
|
assert_equal [s loading] 1
|
||||||
|
|
||||||
|
# connect and disconnect 10 clients
|
||||||
|
set clients {}
|
||||||
|
for {set j 0} {$j < 10} {incr j} {
|
||||||
|
lappend clients [redis_deferring_client]
|
||||||
|
}
|
||||||
|
foreach rd $clients {
|
||||||
|
$rd debug log bla
|
||||||
|
}
|
||||||
|
foreach rd $clients {
|
||||||
|
$rd read
|
||||||
|
}
|
||||||
|
foreach rd $clients {
|
||||||
|
$rd close
|
||||||
|
}
|
||||||
|
|
||||||
|
# make sure the server freed the clients
|
||||||
|
wait_for_condition 100 100 {
|
||||||
|
[s connected_clients] < 3
|
||||||
|
} else {
|
||||||
|
fail "clients didn't disconnect"
|
||||||
|
}
|
||||||
|
|
||||||
|
# make sure it's still loading
|
||||||
|
assert_equal [s loading] 1
|
||||||
|
|
||||||
|
# no need to keep waiting for loading to complete
|
||||||
|
exec kill [srv 0 pid]
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user