diff --git a/src/cluster.c b/src/cluster.c index 498715782..cbcdf373d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1625,14 +1625,135 @@ void restoreCommand(redisClient *c) { server.dirty++; } +/* MIGRATE socket cache implementation. + * + * We take a map between host:ip and a TCP socket that we used to connect + * to this instance in recent time. + * This sockets are closed when the max number we cache is reached, and also + * in serverCron() when they are around for more than a few seconds. */ +#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */ +#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */ + +typedef struct migrateCachedSocket { + int fd; + time_t last_use_time; +} migrateCachedSocket; + +/* Return a TCP scoket connected with the target instance, possibly returning + * a cached one. + * + * This function is responsible of sending errors to the client if a + * connection can't be established. In this case -1 is returned. + * Otherwise on success the socket is returned, and the caller should not + * attempt to free it after usage. + * + * If the caller detects an error while using the socket, migrateCloseSocket() + * should be called so that the connection will be craeted from scratch + * the next time. */ +int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) { + int fd; + sds name = sdsempty(); + migrateCachedSocket *cs; + + /* Check if we have an already cached socket for this ip:port pair. */ + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (cs) { + sdsfree(name); + cs->last_use_time = server.unixtime; + return cs->fd; + } + + /* No cached socket, create one. */ + if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { + /* Too many items, drop one at random. */ + dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); + cs = dictGetVal(de); + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + + /* Create the socket */ + fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, + atoi(c->argv[2]->ptr)); + if (fd == -1) { + sdsfree(name); + addReplyErrorFormat(c,"Can't connect to target node: %s", + server.neterr); + return -1; + } + anetTcpNoDelay(server.neterr,fd); + + /* Check if it connects within the specified timeout. */ + if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) { + sdsfree(name); + addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); + close(fd); + return -1; + } + + /* Add to the cache and return it to the caller. */ + cs = zmalloc(sizeof(*cs)); + cs->fd = fd; + cs->last_use_time = server.unixtime; + dictAdd(server.migrate_cached_sockets,name,cs); + return fd; +} + +/* Free a migrate cached connection. */ +void migrateCloseSocket(robj *host, robj *port) { + sds name = sdsempty(); + migrateCachedSocket *cs; + + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (!cs) { + sdsfree(name); + return; + } + + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,name); + sdsfree(name); +} + +void migrateCloseTimedoutSockets(void) { + dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + migrateCachedSocket *cs = dictGetVal(de); + + if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + } + dictReleaseIterator(di); +} + /* MIGRATE host port key dbid timeout [COPY | REPLACE] */ void migrateCommand(redisClient *c) { - int fd, copy = 0, replace = 0, j; + int fd, copy, replace, j; long timeout; long dbid; - long long ttl = 0, expireat; + long long ttl, expireat; robj *o; rio cmd, payload; + int retry_num = 0; + +try_again: + /* Initialization */ + copy = 0; + replace = 0; + ttl = 0; /* Parse additional options */ for (j = 6; j < c->argc; j++) { @@ -1651,7 +1772,7 @@ void migrateCommand(redisClient *c) { return; if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) return; - if (timeout <= 0) timeout = 1; + if (timeout <= 0) timeout = 1000; /* Check if the key is here. If not we reply with success as there is * nothing to migrate (for instance the key expired in the meantime), but @@ -1662,17 +1783,8 @@ void migrateCommand(redisClient *c) { } /* Connect */ - fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, - atoi(c->argv[2]->ptr)); - if (fd == -1) { - addReplyErrorFormat(c,"Can't connect to target node: %s", - server.neterr); - return; - } - if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { - addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); - return; - } + fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); + if (fd == -1) return; /* error sent to the client by migrateGetSocket() */ /* Create RESTORE payload and generate the protocol to call the command. */ rioInitWithBuffer(&cmd,sdsempty()); @@ -1704,6 +1816,7 @@ void migrateCommand(redisClient *c) { redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); /* Tranfer the query to the other node in 64K chunks. */ + errno = 0; { sds buf = cmd.io.buffer.ptr; size_t pos = 0, towrite; @@ -1749,19 +1862,22 @@ void migrateCommand(redisClient *c) { } sdsfree(cmd.io.buffer.ptr); - close(fd); return; socket_wr_err: - addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n")); sdsfree(cmd.io.buffer.ptr); - close(fd); + migrateCloseSocket(c->argv[1],c->argv[2]); + if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; + addReplySds(c, + sdsnew("-IOERR error or timeout writing to target instance\r\n")); return; socket_rd_err: - addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n")); sdsfree(cmd.io.buffer.ptr); - close(fd); + migrateCloseSocket(c->argv[1],c->argv[2]); + if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; + addReplySds(c, + sdsnew("-IOERR error or timeout reading from target node\r\n")); return; } diff --git a/src/redis.c b/src/redis.c index 1e90edef6..82dade155 100644 --- a/src/redis.c +++ b/src/redis.c @@ -561,6 +561,16 @@ dictType clusterNodesDictType = { NULL /* val destructor */ }; +/* Migrate cache dict type. */ +dictType migrateCacheDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL /* val destructor */ +}; + int htNeedsResize(dict *dict) { long long size, used; @@ -972,16 +982,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * to detect transfer failures. */ run_with_period(1000) replicationCron(); - /* Run other sub-systems specific cron jobs */ + /* Run the Redis Cluster cron. */ run_with_period(1000) { if (server.cluster_enabled) clusterCron(); } - /* Run the sentinel timer if we are in sentinel mode. */ + /* Run the Sentinel timer if we are in sentinel mode. */ run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } + /* Cleanup expired MIGRATE cached sockets. */ + run_with_period(1000) { + migrateCloseTimedoutSockets(); + } + server.cronloops++; return 1000/REDIS_HZ; } @@ -1149,6 +1164,7 @@ void initServerConfig() { server.lua_time_limit = REDIS_LUA_TIME_LIMIT; server.lua_client = NULL; server.lua_timedout = 0; + server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); updateLRUClock(); resetServerSaveParams(); @@ -2066,7 +2082,8 @@ sds genRedisInfoString(char *section) { "keyspace_misses:%lld\r\n" "pubsub_channels:%ld\r\n" "pubsub_patterns:%lu\r\n" - "latest_fork_usec:%lld\r\n", + "latest_fork_usec:%lld\r\n" + "migrate_cached_sockets:%ld\r\n", server.stat_numconnections, server.stat_numcommands, getOperationsPerSecond(), @@ -2077,7 +2094,8 @@ sds genRedisInfoString(char *section) { server.stat_keyspace_misses, dictSize(server.pubsub_channels), listLength(server.pubsub_patterns), - server.stat_fork_time); + server.stat_fork_time, + dictSize(server.migrate_cached_sockets)); } /* Replication */ diff --git a/src/redis.h b/src/redis.h index 6b3d58e53..44ff03b67 100644 --- a/src/redis.h +++ b/src/redis.h @@ -646,7 +646,8 @@ struct redisServer { list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ redisClient *current_client; /* Current client, only used on crash report */ - char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + dict *migrate_cached_sockets;/* MIGRATE cached sockets */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ off_t loading_total_bytes; @@ -1174,6 +1175,7 @@ int clusterAddNode(clusterNode *node); void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); void clusterPropagatePublish(robj *channel, robj *message); +void migrateCloseTimedoutSockets(void); /* Sentinel */ void initSentinelConfig(void); diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 853193ccd..a376b8bb1 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -11,6 +11,7 @@ source tests/support/util.tcl set ::all_tests { unit/printver + unit/dump unit/auth unit/protocol unit/basic @@ -40,7 +41,6 @@ set ::all_tests { unit/introspection unit/limits unit/obuf-limits - unit/dump unit/bitops } # Index to the next test to run in the ::all_tests list. diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl index 202098da2..5c7291361 100644 --- a/tests/unit/dump.tcl +++ b/tests/unit/dump.tcl @@ -43,6 +43,27 @@ start_server {tags {"dump"}} { r dump nonexisting_key } {} + test {MIGRATE is caching connections} { + # Note, we run this as first test so that the connection cache + # is empty. + set first [srv 0 client] + r set key "Some Value" + start_server {tags {"repl"}} { + set second [srv 0 client] + set second_host [srv 0 host] + set second_port [srv 0 port] + + assert_match {*migrate_cached_sockets:0*} [r -1 info] + r -1 migrate $second_host $second_port key 9 1000 + assert_match {*migrate_cached_sockets:1*} [r -1 info] + } + } + + test {MIGRATE cached connections are released after some time} { + after 15000 + assert_match {*migrate_cached_sockets:0*} [r info] + } + test {MIGRATE is able to migrate a key between two instances} { set first [srv 0 client] r set key "Some Value" @@ -180,9 +201,9 @@ start_server {tags {"dump"}} { assert {[$second exists key] == 0} set rd [redis_deferring_client] - $rd debug sleep 5.0 ; # Make second server unable to reply. + $rd debug sleep 1.0 ; # Make second server unable to reply. set e {} - catch {r -1 migrate $second_host $second_port key 9 1000} e + catch {r -1 migrate $second_host $second_port key 9 500} e assert_match {IOERR*} $e } }