Show threading configuration in INFO output (#7446)

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Arun Ranganathan 2020-07-29 01:46:44 -04:00 committed by GitHub
parent 63dae52324
commit f6cad30bb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 46 additions and 14 deletions

View File

@ -1295,6 +1295,9 @@ client *lookupClientByID(uint64_t id) {
* set to 0. So when handler_installed is set to 0 the function must be * set to 0. So when handler_installed is set to 0 the function must be
* thread safe. */ * thread safe. */
int writeToClient(client *c, int handler_installed) { int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
server.stat_total_writes_processed++;
ssize_t nwritten = 0, totwritten = 0; ssize_t nwritten = 0, totwritten = 0;
size_t objlen; size_t objlen;
clientReplyBlock *o; clientReplyBlock *o;
@ -1910,6 +1913,9 @@ void readQueryFromClient(connection *conn) {
* the event loop. This is the case if threaded I/O is enabled. */ * the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return; if (postponeClientRead(c)) return;
/* Update total number of reads on server */
server.stat_total_reads_processed++;
readlen = PROTO_IOBUF_LEN; readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply /* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query * that is large enough, try to maximize the probability that the query
@ -2907,7 +2913,6 @@ int tio_debug = 0;
pthread_t io_threads[IO_THREADS_MAX_NUM]; pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; _Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
int io_threads_active; /* Are the threads currently spinning waiting I/O? */
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is /* This is the list of clients each thread will serve when threaded I/O is
@ -2966,7 +2971,7 @@ void *IOThreadMain(void *myid) {
/* Initialize the data structures needed for threaded I/O. */ /* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) { void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */ server.io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread: /* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */ * we'll handle I/O directly from the main thread. */
@ -3000,10 +3005,10 @@ void initThreadedIO(void) {
void startThreadedIO(void) { void startThreadedIO(void) {
if (tio_debug) { printf("S"); fflush(stdout); } if (tio_debug) { printf("S"); fflush(stdout); }
if (tio_debug) printf("--- STARTING THREADED IO ---\n"); if (tio_debug) printf("--- STARTING THREADED IO ---\n");
serverAssert(io_threads_active == 0); serverAssert(server.io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++) for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]); pthread_mutex_unlock(&io_threads_mutex[j]);
io_threads_active = 1; server.io_threads_active = 1;
} }
void stopThreadedIO(void) { void stopThreadedIO(void) {
@ -3014,10 +3019,10 @@ void stopThreadedIO(void) {
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n", if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
(int) listLength(server.clients_pending_read), (int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write)); (int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1); serverAssert(server.io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++) for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]); pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0; server.io_threads_active = 0;
} }
/* This function checks if there are not enough pending clients to justify /* This function checks if there are not enough pending clients to justify
@ -3036,7 +3041,7 @@ int stopThreadedIOIfNeeded(void) {
if (server.io_threads_num == 1) return 1; if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) { if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO(); if (server.io_threads_active) stopThreadedIO();
return 1; return 1;
} else { } else {
return 0; return 0;
@ -3054,7 +3059,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
} }
/* Start threads if needed. */ /* Start threads if needed. */
if (!io_threads_active) startThreadedIO(); if (!server.io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed); if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
@ -3111,6 +3116,10 @@ int handleClientsWithPendingWritesUsingThreads(void) {
} }
} }
listEmpty(server.clients_pending_write); listEmpty(server.clients_pending_write);
/* Update processed count on server */
server.stat_io_writes_processed += processed;
return processed; return processed;
} }
@ -3119,7 +3128,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
* As a side effect of calling this function the client is put in the * As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */ * pending read clients and flagged as such. */
int postponeClientRead(client *c) { int postponeClientRead(client *c) {
if (io_threads_active && if (server.io_threads_active &&
server.io_threads_do_reads && server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked && !ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
@ -3139,7 +3148,7 @@ int postponeClientRead(client *c) {
* the reads in the buffers, and also parse the first command available * the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */ * rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) { int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0; if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read); int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0; if (processed == 0) return 0;
@ -3200,5 +3209,9 @@ int handleClientsWithPendingReadsUsingThreads(void) {
} }
processInputBuffer(c); processInputBuffer(c);
} }
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed; return processed;
} }

View File

@ -2726,6 +2726,10 @@ void resetServerStats(void) {
server.stat_sync_full = 0; server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0; server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0; server.stat_sync_partial_err = 0;
server.stat_io_reads_processed = 0;
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_total_writes_processed = 0;
for (j = 0; j < STATS_METRIC_COUNT; j++) { for (j = 0; j < STATS_METRIC_COUNT; j++) {
server.inst_metric[j].idx = 0; server.inst_metric[j].idx = 0;
server.inst_metric[j].last_sample_time = mstime(); server.inst_metric[j].last_sample_time = mstime();
@ -4071,7 +4075,8 @@ sds genRedisInfoString(const char *section) {
"configured_hz:%i\r\n" "configured_hz:%i\r\n"
"lru_clock:%u\r\n" "lru_clock:%u\r\n"
"executable:%s\r\n" "executable:%s\r\n"
"config_file:%s\r\n", "config_file:%s\r\n"
"io_threads_active:%d\r\n",
REDIS_VERSION, REDIS_VERSION,
redisGitSHA1(), redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0, strtol(redisGitDirty(),NULL,10) > 0,
@ -4095,7 +4100,8 @@ sds genRedisInfoString(const char *section) {
server.config_hz, server.config_hz,
server.lruclock, server.lruclock,
server.executable ? server.executable : "", server.executable ? server.executable : "",
server.configfile ? server.configfile : ""); server.configfile ? server.configfile : "",
server.io_threads_active);
} }
/* Clients */ /* Clients */
@ -4367,7 +4373,11 @@ sds genRedisInfoString(const char *section) {
"tracking_total_keys:%lld\r\n" "tracking_total_keys:%lld\r\n"
"tracking_total_items:%lld\r\n" "tracking_total_items:%lld\r\n"
"tracking_total_prefixes:%lld\r\n" "tracking_total_prefixes:%lld\r\n"
"unexpected_error_replies:%lld\r\n", "unexpected_error_replies:%lld\r\n"
"total_reads_processed:%lld\r\n"
"total_writes_processed:%lld\r\n"
"io_threaded_reads_processed:%lld\r\n"
"io_threaded_writes_processed:%lld\r\n",
server.stat_numconnections, server.stat_numconnections,
server.stat_numcommands, server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND), getInstantaneousMetric(STATS_METRIC_COMMAND),
@ -4398,7 +4408,11 @@ sds genRedisInfoString(const char *section) {
(unsigned long long) trackingGetTotalKeys(), (unsigned long long) trackingGetTotalKeys(),
(unsigned long long) trackingGetTotalItems(), (unsigned long long) trackingGetTotalItems(),
(unsigned long long) trackingGetTotalPrefixes(), (unsigned long long) trackingGetTotalPrefixes(),
server.stat_unexpected_error_replies); server.stat_unexpected_error_replies,
server.stat_total_reads_processed,
server.stat_total_writes_processed,
server.stat_io_reads_processed,
server.stat_io_writes_processed);
} }
/* Replication */ /* Replication */

View File

@ -1108,6 +1108,7 @@ struct redisServer {
queries. Will still serve RESP2 queries. */ queries. Will still serve RESP2 queries. */
int io_threads_num; /* Number of IO threads to use. */ int io_threads_num; /* Number of IO threads to use. */
int io_threads_do_reads; /* Read and parse from IO threads? */ int io_threads_do_reads; /* Read and parse from IO threads? */
int io_threads_active; /* Is IO threads currently active? */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */
/* RDB / AOF loading information */ /* RDB / AOF loading information */
@ -1157,6 +1158,10 @@ struct redisServer {
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */
long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */
_Atomic long long stat_total_reads_processed; /* Total number of read events processed */
_Atomic long long stat_total_writes_processed; /* Total number of write events processed */
/* The following two are used to track instantaneous metrics, like /* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */ * number of operations per second, network traffic. */
struct { struct {