diff --git a/src/networking.c b/src/networking.c index 8f06c6ba6..495be0ece 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1313,6 +1313,9 @@ client *lookupClientByID(uint64_t id) { * set to 0. So when handler_installed is set to 0 the function must be * thread safe. */ int writeToClient(client *c, int handler_installed) { + /* Update total number of writes on server */ + server.stat_total_writes_processed++; + ssize_t nwritten = 0, totwritten = 0; size_t objlen; clientReplyBlock *o; @@ -1929,6 +1932,9 @@ void readQueryFromClient(connection *conn) { * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return; + /* Update total number of reads on server */ + server.stat_total_reads_processed++; + readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query @@ -2926,7 +2932,6 @@ int tio_debug = 0; pthread_t io_threads[IO_THREADS_MAX_NUM]; pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; _Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; -int io_threads_active; /* Are the threads currently spinning waiting I/O? */ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ /* This is the list of clients each thread will serve when threaded I/O is @@ -2985,7 +2990,7 @@ void *IOThreadMain(void *myid) { /* Initialize the data structures needed for threaded I/O. */ void initThreadedIO(void) { - io_threads_active = 0; /* We start with threads not active. */ + server.io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ @@ -3019,10 +3024,10 @@ void initThreadedIO(void) { void startThreadedIO(void) { if (tio_debug) { printf("S"); fflush(stdout); } if (tio_debug) printf("--- STARTING THREADED IO ---\n"); - serverAssert(io_threads_active == 0); + serverAssert(server.io_threads_active == 0); for (int j = 1; j < server.io_threads_num; j++) pthread_mutex_unlock(&io_threads_mutex[j]); - io_threads_active = 1; + server.io_threads_active = 1; } void stopThreadedIO(void) { @@ -3033,10 +3038,10 @@ void stopThreadedIO(void) { if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n", (int) listLength(server.clients_pending_read), (int) listLength(server.clients_pending_write)); - serverAssert(io_threads_active == 1); + serverAssert(server.io_threads_active == 1); for (int j = 1; j < server.io_threads_num; j++) pthread_mutex_lock(&io_threads_mutex[j]); - io_threads_active = 0; + server.io_threads_active = 0; } /* This function checks if there are not enough pending clients to justify @@ -3055,7 +3060,7 @@ int stopThreadedIOIfNeeded(void) { if (server.io_threads_num == 1) return 1; if (pending < (server.io_threads_num*2)) { - if (io_threads_active) stopThreadedIO(); + if (server.io_threads_active) stopThreadedIO(); return 1; } else { return 0; @@ -3073,7 +3078,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { } /* Start threads if needed. */ - if (!io_threads_active) startThreadedIO(); + if (!server.io_threads_active) startThreadedIO(); if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed); @@ -3130,6 +3135,10 @@ int handleClientsWithPendingWritesUsingThreads(void) { } } listEmpty(server.clients_pending_write); + + /* Update processed count on server */ + server.stat_io_writes_processed += processed; + return processed; } @@ -3138,7 +3147,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { * As a side effect of calling this function the client is put in the * pending read clients and flagged as such. */ int postponeClientRead(client *c) { - if (io_threads_active && + if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) @@ -3158,7 +3167,7 @@ int postponeClientRead(client *c) { * the reads in the buffers, and also parse the first command available * rendering it in the client structures. */ int handleClientsWithPendingReadsUsingThreads(void) { - if (!io_threads_active || !server.io_threads_do_reads) return 0; + if (!server.io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); if (processed == 0) return 0; @@ -3219,5 +3228,9 @@ int handleClientsWithPendingReadsUsingThreads(void) { } processInputBuffer(c); } + + /* Update processed count on server */ + server.stat_io_reads_processed += processed; + return processed; } diff --git a/src/server.c b/src/server.c index 9c3d71d6b..3381356ea 100644 --- a/src/server.c +++ b/src/server.c @@ -2726,6 +2726,10 @@ void resetServerStats(void) { server.stat_sync_full = 0; server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; + server.stat_io_reads_processed = 0; + server.stat_total_reads_processed = 0; + server.stat_io_writes_processed = 0; + server.stat_total_writes_processed = 0; for (j = 0; j < STATS_METRIC_COUNT; j++) { server.inst_metric[j].idx = 0; server.inst_metric[j].last_sample_time = mstime(); @@ -4075,7 +4079,8 @@ sds genRedisInfoString(const char *section) { "configured_hz:%i\r\n" "lru_clock:%u\r\n" "executable:%s\r\n" - "config_file:%s\r\n", + "config_file:%s\r\n" + "io_threads_active:%d\r\n", REDIS_VERSION, redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, @@ -4099,7 +4104,8 @@ sds genRedisInfoString(const char *section) { server.config_hz, server.lruclock, server.executable ? server.executable : "", - server.configfile ? server.configfile : ""); + server.configfile ? server.configfile : "", + server.io_threads_active); } /* Clients */ @@ -4371,7 +4377,11 @@ sds genRedisInfoString(const char *section) { "tracking_total_keys:%lld\r\n" "tracking_total_items:%lld\r\n" "tracking_total_prefixes:%lld\r\n" - "unexpected_error_replies:%lld\r\n", + "unexpected_error_replies:%lld\r\n" + "total_reads_processed:%lld\r\n" + "total_writes_processed:%lld\r\n" + "io_threaded_reads_processed:%lld\r\n" + "io_threaded_writes_processed:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -4402,7 +4412,11 @@ sds genRedisInfoString(const char *section) { (unsigned long long) trackingGetTotalKeys(), (unsigned long long) trackingGetTotalItems(), (unsigned long long) trackingGetTotalPrefixes(), - server.stat_unexpected_error_replies); + server.stat_unexpected_error_replies, + server.stat_total_reads_processed, + server.stat_total_writes_processed, + server.stat_io_reads_processed, + server.stat_io_writes_processed); } /* Replication */ diff --git a/src/server.h b/src/server.h index e9b4777ef..2d8279264 100644 --- a/src/server.h +++ b/src/server.h @@ -1108,6 +1108,7 @@ struct redisServer { queries. Will still serve RESP2 queries. */ int io_threads_num; /* Number of IO threads to use. */ int io_threads_do_reads; /* Read and parse from IO threads? */ + int io_threads_active; /* Is IO threads currently active? */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ /* RDB / AOF loading information */ @@ -1157,6 +1158,10 @@ struct redisServer { size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ + long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */ + long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */ + _Atomic long long stat_total_reads_processed; /* Total number of read events processed */ + _Atomic long long stat_total_writes_processed; /* Total number of write events processed */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct {