From e23d281e489c3cda134803b2e6302d1fc27e5948 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 12 Nov 2012 00:45:10 +0100 Subject: [PATCH 1/7] MIGRATE TCP connections caching. By caching TCP connections used by MIGRATE to chat with other Redis instances a 5x performance improvement was measured with redis-benchmark against small keys. This can dramatically speedup cluster resharding and other processes where an high load of MIGRATE commands are used. --- src/cluster.c | 132 ++++++++++++++++++++++++++++++++++++++++++++------ src/redis.c | 20 +++++++- src/redis.h | 4 +- 3 files changed, 139 insertions(+), 17 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 498715782..3b560cb0d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1625,6 +1625,120 @@ void restoreCommand(redisClient *c) { server.dirty++; } +/* MIGRATE socket cache implementation. + * + * We take a map between host:ip and a TCP socket that we used to connect + * to this instance in recent time. + * This sockets are closed when the max number we cache is reached, and also + * in serverCron() when they are around for more than a few seconds. */ +#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */ +#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */ + +typedef struct migrateCachedSocket { + int fd; + time_t last_use_time; +} migrateCachedSocket; + +/* Return a TCP scoket connected with the target instance, possibly returning + * a cached one. + * + * This function is responsible of sending errors to the client if a + * connection can't be established. In this case -1 is returned. + * Otherwise on success the socket is returned, and the caller should not + * attempt to free it after usage. + * + * If the caller detects an error while using the socket, migrateCloseSocket() + * should be called so that the connection will be craeted from scratch + * the next time. */ +int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) { + int fd; + sds name = sdsempty(); + migrateCachedSocket *cs; + + /* Check if we have an already cached socket for this ip:port pair. */ + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (cs) { + sdsfree(name); + cs->last_use_time = server.unixtime; + return cs->fd; + } + + /* No cached socket, create one. */ + if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { + /* Too many items, drop one at random. */ + dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); + cs = dictGetVal(de); + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + + /* Create the socket */ + fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, + atoi(c->argv[2]->ptr)); + if (fd == -1) { + sdsfree(name); + addReplyErrorFormat(c,"Can't connect to target node: %s", + server.neterr); + return -1; + } + anetTcpNoDelay(server.neterr,fd); + + /* Check if it connects within the specified timeout. */ + if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { + sdsfree(name); + addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); + close(fd); + return -1; + } + + /* Add to the cache and return it to the caller. */ + cs = zmalloc(sizeof(*cs)); + cs->fd = fd; + cs->last_use_time = server.unixtime; + dictAdd(server.migrate_cached_sockets,name,cs); + return fd; +} + +/* Free a migrate cached connection. */ +void migrateCloseSocket(robj *host, robj *port) { + sds name = sdsempty(); + migrateCachedSocket *cs; + + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (!cs) { + sdsfree(name); + return; + } + + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,name); + sdsfree(name); +} + +void migrateCloseTimedoutSockets(void) { + dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + migrateCachedSocket *cs = dictGetVal(de); + + if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + } + dictReleaseIterator(di); +} + /* MIGRATE host port key dbid timeout [COPY | REPLACE] */ void migrateCommand(redisClient *c) { int fd, copy = 0, replace = 0, j; @@ -1662,17 +1776,8 @@ void migrateCommand(redisClient *c) { } /* Connect */ - fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, - atoi(c->argv[2]->ptr)); - if (fd == -1) { - addReplyErrorFormat(c,"Can't connect to target node: %s", - server.neterr); - return; - } - if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { - addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); - return; - } + fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); + if (fd == -1) return; /* error sent to the client by migrateGetSocket() */ /* Create RESTORE payload and generate the protocol to call the command. */ rioInitWithBuffer(&cmd,sdsempty()); @@ -1749,19 +1854,18 @@ void migrateCommand(redisClient *c) { } sdsfree(cmd.io.buffer.ptr); - close(fd); return; socket_wr_err: addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n")); sdsfree(cmd.io.buffer.ptr); - close(fd); + migrateCloseSocket(c->argv[1],c->argv[2]); return; socket_rd_err: addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n")); sdsfree(cmd.io.buffer.ptr); - close(fd); + migrateCloseSocket(c->argv[1],c->argv[2]); return; } diff --git a/src/redis.c b/src/redis.c index 1e90edef6..f0f3b6c0a 100644 --- a/src/redis.c +++ b/src/redis.c @@ -561,6 +561,16 @@ dictType clusterNodesDictType = { NULL /* val destructor */ }; +/* Migrate cache dict type. */ +dictType migrateCacheDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL /* val destructor */ +}; + int htNeedsResize(dict *dict) { long long size, used; @@ -972,16 +982,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * to detect transfer failures. */ run_with_period(1000) replicationCron(); - /* Run other sub-systems specific cron jobs */ + /* Run the Redis Cluster cron. */ run_with_period(1000) { if (server.cluster_enabled) clusterCron(); } - /* Run the sentinel timer if we are in sentinel mode. */ + /* Run the Sentinel timer if we are in sentinel mode. */ run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } + /* Cleanup expired MIGRATE cached sockets. */ + run_with_period(1000) { + migrateCloseTimedoutSockets(); + } + server.cronloops++; return 1000/REDIS_HZ; } @@ -1149,6 +1164,7 @@ void initServerConfig() { server.lua_time_limit = REDIS_LUA_TIME_LIMIT; server.lua_client = NULL; server.lua_timedout = 0; + server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); updateLRUClock(); resetServerSaveParams(); diff --git a/src/redis.h b/src/redis.h index 6b3d58e53..44ff03b67 100644 --- a/src/redis.h +++ b/src/redis.h @@ -646,7 +646,8 @@ struct redisServer { list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ redisClient *current_client; /* Current client, only used on crash report */ - char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + dict *migrate_cached_sockets;/* MIGRATE cached sockets */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ off_t loading_total_bytes; @@ -1174,6 +1175,7 @@ int clusterAddNode(clusterNode *node); void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); void clusterPropagatePublish(robj *channel, robj *message); +void migrateCloseTimedoutSockets(void); /* Sentinel */ void initSentinelConfig(void); From 149b527a74a8336e8b0a287472c9ae25e3286fba Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 12 Nov 2012 14:00:59 +0100 Subject: [PATCH 2/7] MIGRATE timeout should be in milliseconds. While it is documented that the MIGRATE timeout is in milliseconds, it was in seconds instead. This commit fixes the problem. --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 3b560cb0d..3e23c73fb 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1688,7 +1688,7 @@ int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) { anetTcpNoDelay(server.neterr,fd); /* Check if it connects within the specified timeout. */ - if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { + if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) { sdsfree(name); addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); close(fd); From c8852ebf191387779275a70b5540f30976b9ef1f Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 12 Nov 2012 14:01:56 +0100 Subject: [PATCH 3/7] MIGRATE count of cached sockets in INFO output. --- src/redis.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/redis.c b/src/redis.c index f0f3b6c0a..82dade155 100644 --- a/src/redis.c +++ b/src/redis.c @@ -2082,7 +2082,8 @@ sds genRedisInfoString(char *section) { "keyspace_misses:%lld\r\n" "pubsub_channels:%ld\r\n" "pubsub_patterns:%lu\r\n" - "latest_fork_usec:%lld\r\n", + "latest_fork_usec:%lld\r\n" + "migrate_cached_sockets:%ld\r\n", server.stat_numconnections, server.stat_numcommands, getOperationsPerSecond(), @@ -2093,7 +2094,8 @@ sds genRedisInfoString(char *section) { server.stat_keyspace_misses, dictSize(server.pubsub_channels), listLength(server.pubsub_patterns), - server.stat_fork_time); + server.stat_fork_time, + dictSize(server.migrate_cached_sockets)); } /* Replication */ From 05705bc8bb8685827609813fe48a928260c6a21b Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 12 Nov 2012 15:04:54 +0100 Subject: [PATCH 4/7] MIGRATE: fix default timeout to 1000 milliseconds. When a timeout <= 0 is provided we set a default timeout of 1 second. It was set to 1 millisecond for an error resulting from a recent change. --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 3e23c73fb..a1e576bd0 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1765,7 +1765,7 @@ void migrateCommand(redisClient *c) { return; if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) return; - if (timeout <= 0) timeout = 1; + if (timeout <= 0) timeout = 1000; /* Check if the key is here. If not we reply with success as there is * nothing to migrate (for instance the key expired in the meantime), but From 17411f7afd30f76962652e28c408dcd07e213fd9 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 13 Nov 2012 18:11:48 +0100 Subject: [PATCH 5/7] Test: check if MIGRATE is caching connections. --- tests/unit/dump.tcl | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl index 202098da2..4ebce0ca2 100644 --- a/tests/unit/dump.tcl +++ b/tests/unit/dump.tcl @@ -186,4 +186,23 @@ start_server {tags {"dump"}} { assert_match {IOERR*} $e } } + + test {MIGRATE is caching connections} { + set first [srv 0 client] + r set key "Some Value" + start_server {tags {"repl"}} { + set second [srv 0 client] + set second_host [srv 0 host] + set second_port [srv 0 port] + + assert {[$first exists key] == 1} + assert {[$second exists key] == 0} + + set rd [redis_deferring_client] + $rd debug sleep 5.0 ; # Make second server unable to reply. + set e {} + catch {r -1 migrate $second_host $second_port key 9 1000} e + assert_match {IOERR*} $e + } + } } From 2feef47aa1642df8637522c3d0b8efa16ec4bbc0 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 14 Nov 2012 11:30:24 +0100 Subject: [PATCH 6/7] MIGRATE: retry one time on I/O error. Now that we cache connections, a retry attempt makes sure that the operation don't fail just because there is an existing connection error on the socket, like the other end closing the connection. Unfortunately this condition is not detectable using getsockopt(SO_ERROR), so the only option left is to retry. We don't retry on timeouts. --- src/cluster.c | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index a1e576bd0..cbcdf373d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1741,12 +1741,19 @@ void migrateCloseTimedoutSockets(void) { /* MIGRATE host port key dbid timeout [COPY | REPLACE] */ void migrateCommand(redisClient *c) { - int fd, copy = 0, replace = 0, j; + int fd, copy, replace, j; long timeout; long dbid; - long long ttl = 0, expireat; + long long ttl, expireat; robj *o; rio cmd, payload; + int retry_num = 0; + +try_again: + /* Initialization */ + copy = 0; + replace = 0; + ttl = 0; /* Parse additional options */ for (j = 6; j < c->argc; j++) { @@ -1809,6 +1816,7 @@ void migrateCommand(redisClient *c) { redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); /* Tranfer the query to the other node in 64K chunks. */ + errno = 0; { sds buf = cmd.io.buffer.ptr; size_t pos = 0, towrite; @@ -1857,15 +1865,19 @@ void migrateCommand(redisClient *c) { return; socket_wr_err: - addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n")); sdsfree(cmd.io.buffer.ptr); migrateCloseSocket(c->argv[1],c->argv[2]); + if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; + addReplySds(c, + sdsnew("-IOERR error or timeout writing to target instance\r\n")); return; socket_rd_err: - addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n")); sdsfree(cmd.io.buffer.ptr); migrateCloseSocket(c->argv[1],c->argv[2]); + if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; + addReplySds(c, + sdsnew("-IOERR error or timeout reading from target node\r\n")); return; } From 989a7820ca0cb1b88493797fdecd2e7168558859 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 14 Nov 2012 12:12:52 +0100 Subject: [PATCH 7/7] Test: more MIGRATE tests. --- tests/test_helper.tcl | 2 +- tests/unit/dump.tcl | 44 ++++++++++++++++++++++--------------------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 853193ccd..a376b8bb1 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -11,6 +11,7 @@ source tests/support/util.tcl set ::all_tests { unit/printver + unit/dump unit/auth unit/protocol unit/basic @@ -40,7 +41,6 @@ set ::all_tests { unit/introspection unit/limits unit/obuf-limits - unit/dump unit/bitops } # Index to the next test to run in the ::all_tests list. diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl index 4ebce0ca2..5c7291361 100644 --- a/tests/unit/dump.tcl +++ b/tests/unit/dump.tcl @@ -43,6 +43,27 @@ start_server {tags {"dump"}} { r dump nonexisting_key } {} + test {MIGRATE is caching connections} { + # Note, we run this as first test so that the connection cache + # is empty. + set first [srv 0 client] + r set key "Some Value" + start_server {tags {"repl"}} { + set second [srv 0 client] + set second_host [srv 0 host] + set second_port [srv 0 port] + + assert_match {*migrate_cached_sockets:0*} [r -1 info] + r -1 migrate $second_host $second_port key 9 1000 + assert_match {*migrate_cached_sockets:1*} [r -1 info] + } + } + + test {MIGRATE cached connections are released after some time} { + after 15000 + assert_match {*migrate_cached_sockets:0*} [r info] + } + test {MIGRATE is able to migrate a key between two instances} { set first [srv 0 client] r set key "Some Value" @@ -180,28 +201,9 @@ start_server {tags {"dump"}} { assert {[$second exists key] == 0} set rd [redis_deferring_client] - $rd debug sleep 5.0 ; # Make second server unable to reply. + $rd debug sleep 1.0 ; # Make second server unable to reply. set e {} - catch {r -1 migrate $second_host $second_port key 9 1000} e - assert_match {IOERR*} $e - } - } - - test {MIGRATE is caching connections} { - set first [srv 0 client] - r set key "Some Value" - start_server {tags {"repl"}} { - set second [srv 0 client] - set second_host [srv 0 host] - set second_port [srv 0 port] - - assert {[$first exists key] == 1} - assert {[$second exists key] == 0} - - set rd [redis_deferring_client] - $rd debug sleep 5.0 ; # Make second server unable to reply. - set e {} - catch {r -1 migrate $second_host $second_port key 9 1000} e + catch {r -1 migrate $second_host $second_port key 9 500} e assert_match {IOERR*} $e } }