Merge branch 'migrate-cache' into unstable

This commit is contained in:
antirez 2012-11-14 12:21:23 +01:00
commit d9b02a38e6
5 changed files with 184 additions and 27 deletions

View File

@ -1625,14 +1625,135 @@ void restoreCommand(redisClient *c) {
server.dirty++; server.dirty++;
} }
/* MIGRATE socket cache implementation.
*
* We take a map between host:ip and a TCP socket that we used to connect
* to this instance in recent time.
* This sockets are closed when the max number we cache is reached, and also
* in serverCron() when they are around for more than a few seconds. */
#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */
typedef struct migrateCachedSocket {
int fd;
time_t last_use_time;
} migrateCachedSocket;
/* Return a TCP scoket connected with the target instance, possibly returning
* a cached one.
*
* This function is responsible of sending errors to the client if a
* connection can't be established. In this case -1 is returned.
* Otherwise on success the socket is returned, and the caller should not
* attempt to free it after usage.
*
* If the caller detects an error while using the socket, migrateCloseSocket()
* should be called so that the connection will be craeted from scratch
* the next time. */
int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
int fd;
sds name = sdsempty();
migrateCachedSocket *cs;
/* Check if we have an already cached socket for this ip:port pair. */
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (cs) {
sdsfree(name);
cs->last_use_time = server.unixtime;
return cs->fd;
}
/* No cached socket, create one. */
if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
/* Too many items, drop one at random. */
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
close(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
/* Create the socket */
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
atoi(c->argv[2]->ptr));
if (fd == -1) {
sdsfree(name);
addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr);
return -1;
}
anetTcpNoDelay(server.neterr,fd);
/* Check if it connects within the specified timeout. */
if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
sdsfree(name);
addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
close(fd);
return -1;
}
/* Add to the cache and return it to the caller. */
cs = zmalloc(sizeof(*cs));
cs->fd = fd;
cs->last_use_time = server.unixtime;
dictAdd(server.migrate_cached_sockets,name,cs);
return fd;
}
/* Free a migrate cached connection. */
void migrateCloseSocket(robj *host, robj *port) {
sds name = sdsempty();
migrateCachedSocket *cs;
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (!cs) {
sdsfree(name);
return;
}
close(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,name);
sdsfree(name);
}
void migrateCloseTimedoutSockets(void) {
dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
migrateCachedSocket *cs = dictGetVal(de);
if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
close(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
}
dictReleaseIterator(di);
}
/* MIGRATE host port key dbid timeout [COPY | REPLACE] */ /* MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) { void migrateCommand(redisClient *c) {
int fd, copy = 0, replace = 0, j; int fd, copy, replace, j;
long timeout; long timeout;
long dbid; long dbid;
long long ttl = 0, expireat; long long ttl, expireat;
robj *o; robj *o;
rio cmd, payload; rio cmd, payload;
int retry_num = 0;
try_again:
/* Initialization */
copy = 0;
replace = 0;
ttl = 0;
/* Parse additional options */ /* Parse additional options */
for (j = 6; j < c->argc; j++) { for (j = 6; j < c->argc; j++) {
@ -1651,7 +1772,7 @@ void migrateCommand(redisClient *c) {
return; return;
if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
return; return;
if (timeout <= 0) timeout = 1; if (timeout <= 0) timeout = 1000;
/* Check if the key is here. If not we reply with success as there is /* Check if the key is here. If not we reply with success as there is
* nothing to migrate (for instance the key expired in the meantime), but * nothing to migrate (for instance the key expired in the meantime), but
@ -1662,17 +1783,8 @@ void migrateCommand(redisClient *c) {
} }
/* Connect */ /* Connect */
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
atoi(c->argv[2]->ptr)); if (fd == -1) return; /* error sent to the client by migrateGetSocket() */
if (fd == -1) {
addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr);
return;
}
if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
return;
}
/* Create RESTORE payload and generate the protocol to call the command. */ /* Create RESTORE payload and generate the protocol to call the command. */
rioInitWithBuffer(&cmd,sdsempty()); rioInitWithBuffer(&cmd,sdsempty());
@ -1704,6 +1816,7 @@ void migrateCommand(redisClient *c) {
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
/* Tranfer the query to the other node in 64K chunks. */ /* Tranfer the query to the other node in 64K chunks. */
errno = 0;
{ {
sds buf = cmd.io.buffer.ptr; sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite; size_t pos = 0, towrite;
@ -1749,19 +1862,22 @@ void migrateCommand(redisClient *c) {
} }
sdsfree(cmd.io.buffer.ptr); sdsfree(cmd.io.buffer.ptr);
close(fd);
return; return;
socket_wr_err: socket_wr_err:
addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
sdsfree(cmd.io.buffer.ptr); sdsfree(cmd.io.buffer.ptr);
close(fd); migrateCloseSocket(c->argv[1],c->argv[2]);
if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
addReplySds(c,
sdsnew("-IOERR error or timeout writing to target instance\r\n"));
return; return;
socket_rd_err: socket_rd_err:
addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
sdsfree(cmd.io.buffer.ptr); sdsfree(cmd.io.buffer.ptr);
close(fd); migrateCloseSocket(c->argv[1],c->argv[2]);
if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
addReplySds(c,
sdsnew("-IOERR error or timeout reading from target node\r\n"));
return; return;
} }

View File

@ -561,6 +561,16 @@ dictType clusterNodesDictType = {
NULL /* val destructor */ NULL /* val destructor */
}; };
/* Migrate cache dict type. */
dictType migrateCacheDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
int htNeedsResize(dict *dict) { int htNeedsResize(dict *dict) {
long long size, used; long long size, used;
@ -972,16 +982,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* to detect transfer failures. */ * to detect transfer failures. */
run_with_period(1000) replicationCron(); run_with_period(1000) replicationCron();
/* Run other sub-systems specific cron jobs */ /* Run the Redis Cluster cron. */
run_with_period(1000) { run_with_period(1000) {
if (server.cluster_enabled) clusterCron(); if (server.cluster_enabled) clusterCron();
} }
/* Run the sentinel timer if we are in sentinel mode. */ /* Run the Sentinel timer if we are in sentinel mode. */
run_with_period(100) { run_with_period(100) {
if (server.sentinel_mode) sentinelTimer(); if (server.sentinel_mode) sentinelTimer();
} }
/* Cleanup expired MIGRATE cached sockets. */
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
server.cronloops++; server.cronloops++;
return 1000/REDIS_HZ; return 1000/REDIS_HZ;
} }
@ -1149,6 +1164,7 @@ void initServerConfig() {
server.lua_time_limit = REDIS_LUA_TIME_LIMIT; server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
server.lua_client = NULL; server.lua_client = NULL;
server.lua_timedout = 0; server.lua_timedout = 0;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
updateLRUClock(); updateLRUClock();
resetServerSaveParams(); resetServerSaveParams();
@ -2066,7 +2082,8 @@ sds genRedisInfoString(char *section) {
"keyspace_misses:%lld\r\n" "keyspace_misses:%lld\r\n"
"pubsub_channels:%ld\r\n" "pubsub_channels:%ld\r\n"
"pubsub_patterns:%lu\r\n" "pubsub_patterns:%lu\r\n"
"latest_fork_usec:%lld\r\n", "latest_fork_usec:%lld\r\n"
"migrate_cached_sockets:%ld\r\n",
server.stat_numconnections, server.stat_numconnections,
server.stat_numcommands, server.stat_numcommands,
getOperationsPerSecond(), getOperationsPerSecond(),
@ -2077,7 +2094,8 @@ sds genRedisInfoString(char *section) {
server.stat_keyspace_misses, server.stat_keyspace_misses,
dictSize(server.pubsub_channels), dictSize(server.pubsub_channels),
listLength(server.pubsub_patterns), listLength(server.pubsub_patterns),
server.stat_fork_time); server.stat_fork_time,
dictSize(server.migrate_cached_sockets));
} }
/* Replication */ /* Replication */

View File

@ -646,7 +646,8 @@ struct redisServer {
list *clients_to_close; /* Clients to close asynchronously */ list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */ list *slaves, *monitors; /* List of slaves and MONITORs */
redisClient *current_client; /* Current client, only used on crash report */ redisClient *current_client; /* Current client, only used on crash report */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
/* RDB / AOF loading information */ /* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */ int loading; /* We are loading data from disk if true */
off_t loading_total_bytes; off_t loading_total_bytes;
@ -1174,6 +1175,7 @@ int clusterAddNode(clusterNode *node);
void clusterCron(void); void clusterCron(void);
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
void clusterPropagatePublish(robj *channel, robj *message); void clusterPropagatePublish(robj *channel, robj *message);
void migrateCloseTimedoutSockets(void);
/* Sentinel */ /* Sentinel */
void initSentinelConfig(void); void initSentinelConfig(void);

View File

@ -11,6 +11,7 @@ source tests/support/util.tcl
set ::all_tests { set ::all_tests {
unit/printver unit/printver
unit/dump
unit/auth unit/auth
unit/protocol unit/protocol
unit/basic unit/basic
@ -40,7 +41,6 @@ set ::all_tests {
unit/introspection unit/introspection
unit/limits unit/limits
unit/obuf-limits unit/obuf-limits
unit/dump
unit/bitops unit/bitops
} }
# Index to the next test to run in the ::all_tests list. # Index to the next test to run in the ::all_tests list.

View File

@ -43,6 +43,27 @@ start_server {tags {"dump"}} {
r dump nonexisting_key r dump nonexisting_key
} {} } {}
test {MIGRATE is caching connections} {
# Note, we run this as first test so that the connection cache
# is empty.
set first [srv 0 client]
r set key "Some Value"
start_server {tags {"repl"}} {
set second [srv 0 client]
set second_host [srv 0 host]
set second_port [srv 0 port]
assert_match {*migrate_cached_sockets:0*} [r -1 info]
r -1 migrate $second_host $second_port key 9 1000
assert_match {*migrate_cached_sockets:1*} [r -1 info]
}
}
test {MIGRATE cached connections are released after some time} {
after 15000
assert_match {*migrate_cached_sockets:0*} [r info]
}
test {MIGRATE is able to migrate a key between two instances} { test {MIGRATE is able to migrate a key between two instances} {
set first [srv 0 client] set first [srv 0 client]
r set key "Some Value" r set key "Some Value"
@ -180,9 +201,9 @@ start_server {tags {"dump"}} {
assert {[$second exists key] == 0} assert {[$second exists key] == 0}
set rd [redis_deferring_client] set rd [redis_deferring_client]
$rd debug sleep 5.0 ; # Make second server unable to reply. $rd debug sleep 1.0 ; # Make second server unable to reply.
set e {} set e {}
catch {r -1 migrate $second_host $second_port key 9 1000} e catch {r -1 migrate $second_host $second_port key 9 500} e
assert_match {IOERR*} $e assert_match {IOERR*} $e
} }
} }