From ac1513221bbb370d49e6dfc9904ab05db306a828 Mon Sep 17 00:00:00 2001 From: Josh Hershberg Date: Wed, 1 Nov 2023 09:44:11 +0200 Subject: [PATCH] Cluster refactor: Move items from cluster_legacy.c to cluster.c Move (but do not change) some items from cluster_legacy.c back info cluster.c. These items are shared code that all clustering implementations will use. Signed-off-by: Josh Hershberg --- src/cluster.c | 746 ++++++++++++++++++++++++++++++++++++++++++ src/cluster.h | 1 + src/cluster_legacy.c | 750 ------------------------------------------- 3 files changed, 747 insertions(+), 750 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e69de29bb..dd053de70 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -0,0 +1,746 @@ +#include "server.h" +#include "cluster.h" + +#include + +/* ----------------------------------------------------------------------------- + * Key space handling + * -------------------------------------------------------------------------- */ + +/* We have 16384 hash slots. The hash slot of a given key is obtained + * as the least significant 14 bits of the crc16 of the key. + * + * However if the key contains the {...} pattern, only the part between + * { and } is hashed. This may be useful in the future to force certain + * keys to be in the same node (assuming no resharding is in progress). */ +unsigned int keyHashSlot(char *key, int keylen) { + int s, e; /* start-end indexes of { and } */ + + for (s = 0; s < keylen; s++) + if (key[s] == '{') break; + + /* No '{' ? Hash the whole key. This is the base case. */ + if (s == keylen) return crc16(key,keylen) & 0x3FFF; + + /* '{' found? Check if we have the corresponding '}'. */ + for (e = s+1; e < keylen; e++) + if (key[e] == '}') break; + + /* No '}' or nothing between {} ? Hash the whole key. */ + if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF; + + /* If we are here there is both a { and a } on its right. Hash + * what is in the middle between { and }. */ + return crc16(key+s+1,e-s-1) & 0x3FFF; +} + +/* If it can be inferred that the given glob-style pattern, as implemented in + * stringmatchlen() in util.c, only can match keys belonging to a single slot, + * that slot is returned. Otherwise -1 is returned. */ +int patternHashSlot(char *pattern, int length) { + int s = -1; /* index of the first '{' */ + + for (int i = 0; i < length; i++) { + if (pattern[i] == '*' || pattern[i] == '?' || pattern[i] == '[') { + /* Wildcard or character class found. Keys can be in any slot. */ + return -1; + } else if (pattern[i] == '\\') { + /* Escaped character. Computing slot in this case is not + * implemented. We would need a temp buffer. */ + return -1; + } else if (s == -1 && pattern[i] == '{') { + /* Opening brace '{' found. */ + s = i; + } else if (s >= 0 && pattern[i] == '}' && i == s + 1) { + /* Empty tag '{}' found. The whole key is hashed. Ignore braces. */ + s = -2; + } else if (s >= 0 && pattern[i] == '}') { + /* Non-empty tag '{...}' found. Hash what's between braces. */ + return crc16(pattern + s + 1, i - s - 1) & 0x3FFF; + } + } + + /* The pattern matches a single key. Hash the whole pattern. */ + return crc16(pattern, length) & 0x3FFF; +} + +ConnectionType *connTypeOfCluster(void) { + if (server.tls_cluster) { + return connectionTypeTls(); + } + + return connectionTypeTcp(); +} + +/* ----------------------------------------------------------------------------- + * DUMP, RESTORE and MIGRATE commands + * -------------------------------------------------------------------------- */ + +/* Generates a DUMP-format representation of the object 'o', adding it to the + * io stream pointed by 'rio'. This function can't fail. */ +void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) { + unsigned char buf[2]; + uint64_t crc; + + /* Serialize the object in an RDB-like format. It consist of an object type + * byte followed by the serialized object. This is understood by RESTORE. */ + rioInitWithBuffer(payload,sdsempty()); + serverAssert(rdbSaveObjectType(payload,o)); + serverAssert(rdbSaveObject(payload,o,key,dbid)); + + /* Write the footer, this is how it looks like: + * ----------------+---------------------+---------------+ + * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | + * ----------------+---------------------+---------------+ + * RDB version and CRC are both in little endian. + */ + + /* RDB version */ + buf[0] = RDB_VERSION & 0xff; + buf[1] = (RDB_VERSION >> 8) & 0xff; + payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2); + + /* CRC64 */ + crc = crc64(0,(unsigned char*)payload->io.buffer.ptr, + sdslen(payload->io.buffer.ptr)); + memrev64ifbe(&crc); + payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8); +} + +/* Verify that the RDB version of the dump payload matches the one of this Redis + * instance and that the checksum is ok. + * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR + * is returned. If rdbver_ptr is not NULL, its populated with the value read + * from the input buffer. */ +int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) { + unsigned char *footer; + uint16_t rdbver; + uint64_t crc; + + /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */ + if (len < 10) return C_ERR; + footer = p+(len-10); + + /* Set and verify RDB version. */ + rdbver = (footer[1] << 8) | footer[0]; + if (rdbver_ptr) { + *rdbver_ptr = rdbver; + } + if (rdbver > RDB_VERSION) return C_ERR; + + if (server.skip_checksum_validation) + return C_OK; + + /* Verify CRC64 */ + crc = crc64(0,p,len-8); + memrev64ifbe(&crc); + return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR; +} + +/* DUMP keyname + * DUMP is actually not used by Redis Cluster but it is the obvious + * complement of RESTORE and can be useful for different applications. */ +void dumpCommand(client *c) { + robj *o; + rio payload; + + /* Check if the key is here. */ + if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { + addReplyNull(c); + return; + } + + /* Create the DUMP encoded representation. */ + createDumpPayload(&payload,o,c->argv[1],c->db->id); + + /* Transfer to the client */ + addReplyBulkSds(c,payload.io.buffer.ptr); + return; +} + +/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */ +void restoreCommand(client *c) { + long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1; + rio payload; + int j, type, replace = 0, absttl = 0; + robj *obj; + + /* Parse additional options */ + for (j = 4; j < c->argc; j++) { + int additional = c->argc-j-1; + if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) { + absttl = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 && + lfu_freq == -1) + { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL) + != C_OK) return; + if (lru_idle < 0) { + addReplyError(c,"Invalid IDLETIME value, must be >= 0"); + return; + } + lru_clock = LRU_CLOCK(); + j++; /* Consume additional arg. */ + } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 && + lru_idle == -1) + { + if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL) + != C_OK) return; + if (lfu_freq < 0 || lfu_freq > 255) { + addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255"); + return; + } + j++; /* Consume additional arg. */ + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + /* Make sure this key does not already exist here... */ + robj *key = c->argv[1]; + if (!replace && lookupKeyWrite(c->db,key) != NULL) { + addReplyErrorObject(c,shared.busykeyerr); + return; + } + + /* Check if the TTL value makes sense */ + if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) { + return; + } else if (ttl < 0) { + addReplyError(c,"Invalid TTL value, must be >= 0"); + return; + } + + /* Verify RDB version and data checksum. */ + if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR) + { + addReplyError(c,"DUMP payload version or checksum are wrong"); + return; + } + + rioInitWithBuffer(&payload,c->argv[3]->ptr); + if (((type = rdbLoadObjectType(&payload)) == -1) || + ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL)) + { + addReplyError(c,"Bad data format"); + return; + } + + /* Remove the old key if needed. */ + int deleted = 0; + if (replace) + deleted = dbDelete(c->db,key); + + if (ttl && !absttl) ttl+=commandTimeSnapshot(); + if (ttl && checkAlreadyExpired(ttl)) { + if (deleted) { + robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del; + rewriteClientCommandVector(c, 2, aux, key); + signalModifiedKey(c,c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); + server.dirty++; + } + decrRefCount(obj); + addReply(c, shared.ok); + return; + } + + /* Create the key and set the TTL if any */ + dbAdd(c->db,key,obj); + if (ttl) { + setExpire(c,c->db,key,ttl); + if (!absttl) { + /* Propagate TTL as absolute timestamp */ + robj *ttl_obj = createStringObjectFromLongLong(ttl); + rewriteClientCommandArgument(c,2,ttl_obj); + decrRefCount(ttl_obj); + rewriteClientCommandArgument(c,c->argc,shared.absttl); + } + } + objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000); + signalModifiedKey(c,c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); + addReply(c,shared.ok); + server.dirty++; +} +/* MIGRATE socket cache implementation. + * + * We take a map between host:ip and a TCP socket that we used to connect + * to this instance in recent time. + * This sockets are closed when the max number we cache is reached, and also + * in serverCron() when they are around for more than a few seconds. */ +#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */ +#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */ + +typedef struct migrateCachedSocket { + connection *conn; + long last_dbid; + time_t last_use_time; +} migrateCachedSocket; + +/* Return a migrateCachedSocket containing a TCP socket connected with the + * target instance, possibly returning a cached one. + * + * This function is responsible of sending errors to the client if a + * connection can't be established. In this case -1 is returned. + * Otherwise on success the socket is returned, and the caller should not + * attempt to free it after usage. + * + * If the caller detects an error while using the socket, migrateCloseSocket() + * should be called so that the connection will be created from scratch + * the next time. */ +migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) { + connection *conn; + sds name = sdsempty(); + migrateCachedSocket *cs; + + /* Check if we have an already cached socket for this ip:port pair. */ + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (cs) { + sdsfree(name); + cs->last_use_time = server.unixtime; + return cs; + } + + /* No cached socket, create one. */ + if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { + /* Too many items, drop one at random. */ + dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); + cs = dictGetVal(de); + connClose(cs->conn); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + + /* Create the connection */ + conn = connCreate(connTypeOfCluster()); + if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout) + != C_OK) { + addReplyError(c,"-IOERR error or timeout connecting to the client"); + connClose(conn); + sdsfree(name); + return NULL; + } + connEnableTcpNoDelay(conn); + + /* Add to the cache and return it to the caller. */ + cs = zmalloc(sizeof(*cs)); + cs->conn = conn; + + cs->last_dbid = -1; + cs->last_use_time = server.unixtime; + dictAdd(server.migrate_cached_sockets,name,cs); + return cs; +} + +/* Free a migrate cached connection. */ +void migrateCloseSocket(robj *host, robj *port) { + sds name = sdsempty(); + migrateCachedSocket *cs; + + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (!cs) { + sdsfree(name); + return; + } + + connClose(cs->conn); + zfree(cs); + dictDelete(server.migrate_cached_sockets,name); + sdsfree(name); +} + +void migrateCloseTimedoutSockets(void) { + dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + migrateCachedSocket *cs = dictGetVal(de); + + if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { + connClose(cs->conn); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + } + dictReleaseIterator(di); +} + +/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password | + * AUTH2 username password] + * + * On in the multiple keys form: + * + * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password | + * AUTH2 username password] KEYS key1 key2 ... keyN */ +void migrateCommand(client *c) { + migrateCachedSocket *cs; + int copy = 0, replace = 0, j; + char *username = NULL; + char *password = NULL; + long timeout; + long dbid; + robj **ov = NULL; /* Objects to migrate. */ + robj **kv = NULL; /* Key names. */ + robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */ + rio cmd, payload; + int may_retry = 1; + int write_error = 0; + int argv_rewritten = 0; + + /* To support the KEYS option we need the following additional state. */ + int first_key = 3; /* Argument index of the first key. */ + int num_keys = 1; /* By default only migrate the 'key' argument. */ + + /* Parse additional options */ + for (j = 6; j < c->argc; j++) { + int moreargs = (c->argc-1) - j; + if (!strcasecmp(c->argv[j]->ptr,"copy")) { + copy = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { + if (!moreargs) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + j++; + password = c->argv[j]->ptr; + redactClientCommandArgument(c,j); + } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) { + if (moreargs < 2) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + username = c->argv[++j]->ptr; + redactClientCommandArgument(c,j); + password = c->argv[++j]->ptr; + redactClientCommandArgument(c,j); + } else if (!strcasecmp(c->argv[j]->ptr,"keys")) { + if (sdslen(c->argv[3]->ptr) != 0) { + addReplyError(c, + "When using MIGRATE KEYS option, the key argument" + " must be set to the empty string"); + return; + } + first_key = j+1; + num_keys = c->argc - j - 1; + break; /* All the remaining args are keys. */ + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + /* Sanity check */ + if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK || + getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK) + { + return; + } + if (timeout <= 0) timeout = 1000; + + /* Check if the keys are here. If at least one key is to migrate, do it + * otherwise if all the keys are missing reply with "NOKEY" to signal + * the caller there was nothing to migrate. We don't return an error in + * this case, since often this is due to a normal condition like the key + * expiring in the meantime. */ + ov = zrealloc(ov,sizeof(robj*)*num_keys); + kv = zrealloc(kv,sizeof(robj*)*num_keys); + int oi = 0; + + for (j = 0; j < num_keys; j++) { + if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { + kv[oi] = c->argv[first_key+j]; + oi++; + } + } + num_keys = oi; + if (num_keys == 0) { + zfree(ov); zfree(kv); + addReplySds(c,sdsnew("+NOKEY\r\n")); + return; + } + + try_again: + write_error = 0; + + /* Connect */ + cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); + if (cs == NULL) { + zfree(ov); zfree(kv); + return; /* error sent to the client by migrateGetSocket() */ + } + + rioInitWithBuffer(&cmd,sdsempty()); + + /* Authentication */ + if (password) { + int arity = username ? 3 : 2; + serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity)); + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); + if (username) { + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username, + sdslen(username))); + } + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, + sdslen(password))); + } + + /* Send the SELECT command if the current DB is not already selected. */ + int select = cs->last_dbid != dbid; /* Should we emit SELECT? */ + if (select) { + serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); + serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); + } + + int non_expired = 0; /* Number of keys that we'll find non expired. + Note that serializing large keys may take some time + so certain keys that were found non expired by the + lookupKey() function, may be expired later. */ + + /* Create RESTORE payload and generate the protocol to call the command. */ + for (j = 0; j < num_keys; j++) { + long long ttl = 0; + long long expireat = getExpire(c->db,kv[j]); + + if (expireat != -1) { + ttl = expireat-commandTimeSnapshot(); + if (ttl < 0) { + continue; + } + if (ttl < 1) ttl = 1; + } + + /* Relocate valid (non expired) keys and values into the array in successive + * positions to remove holes created by the keys that were present + * in the first lookup but are now expired after the second lookup. */ + ov[non_expired] = ov[j]; + kv[non_expired++] = kv[j]; + + serverAssertWithInfo(c,NULL, + rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); + + if (server.cluster_enabled) + serverAssertWithInfo(c,NULL, + rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); + else + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); + serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr, + sdslen(kv[j]->ptr))); + serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); + + /* Emit the payload argument, that is the serialized object using + * the DUMP format. */ + createDumpPayload(&payload,ov[j],kv[j],dbid); + serverAssertWithInfo(c,NULL, + rioWriteBulkString(&cmd,payload.io.buffer.ptr, + sdslen(payload.io.buffer.ptr))); + sdsfree(payload.io.buffer.ptr); + + /* Add the REPLACE option to the RESTORE command if it was specified + * as a MIGRATE option. */ + if (replace) + serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); + } + + /* Fix the actual number of keys we are migrating. */ + num_keys = non_expired; + + /* Transfer the query to the other node in 64K chunks. */ + errno = 0; + { + sds buf = cmd.io.buffer.ptr; + size_t pos = 0, towrite; + int nwritten = 0; + + while ((towrite = sdslen(buf)-pos) > 0) { + towrite = (towrite > (64*1024) ? (64*1024) : towrite); + nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout); + if (nwritten != (signed)towrite) { + write_error = 1; + goto socket_err; + } + pos += nwritten; + } + } + + char buf0[1024]; /* Auth reply. */ + char buf1[1024]; /* Select reply. */ + char buf2[1024]; /* Restore reply. */ + + /* Read the AUTH reply if needed. */ + if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0) + goto socket_err; + + /* Read the SELECT reply if needed. */ + if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0) + goto socket_err; + + /* Read the RESTORE replies. */ + int error_from_target = 0; + int socket_error = 0; + int del_idx = 1; /* Index of the key argument for the replicated DEL op. */ + + /* Allocate the new argument vector that will replace the current command, + * to propagate the MIGRATE as a DEL command (if no COPY option was given). + * We allocate num_keys+1 because the additional argument is for "DEL" + * command name itself. */ + if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1)); + + for (j = 0; j < num_keys; j++) { + if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) { + socket_error = 1; + break; + } + if ((password && buf0[0] == '-') || + (select && buf1[0] == '-') || + buf2[0] == '-') + { + /* On error assume that last_dbid is no longer valid. */ + if (!error_from_target) { + cs->last_dbid = -1; + char *errbuf; + if (password && buf0[0] == '-') errbuf = buf0; + else if (select && buf1[0] == '-') errbuf = buf1; + else errbuf = buf2; + + error_from_target = 1; + addReplyErrorFormat(c,"Target instance replied with error: %s", + errbuf+1); + } + } else { + if (!copy) { + /* No COPY option: remove the local key, signal the change. */ + dbDelete(c->db,kv[j]); + signalModifiedKey(c,c->db,kv[j]); + notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id); + server.dirty++; + + /* Populate the argument vector to replace the old one. */ + newargv[del_idx++] = kv[j]; + incrRefCount(kv[j]); + } + } + } + + /* On socket error, if we want to retry, do it now before rewriting the + * command vector. We only retry if we are sure nothing was processed + * and we failed to read the first reply (j == 0 test). */ + if (!error_from_target && socket_error && j == 0 && may_retry && + errno != ETIMEDOUT) + { + goto socket_err; /* A retry is guaranteed because of tested conditions.*/ + } + + /* On socket errors, close the migration socket now that we still have + * the original host/port in the ARGV. Later the original command may be + * rewritten to DEL and will be too later. */ + if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]); + + if (!copy) { + /* Translate MIGRATE as DEL for replication/AOF. Note that we do + * this only for the keys for which we received an acknowledgement + * from the receiving Redis server, by using the del_idx index. */ + if (del_idx > 1) { + newargv[0] = createStringObject("DEL",3); + /* Note that the following call takes ownership of newargv. */ + replaceClientCommandVector(c,del_idx,newargv); + argv_rewritten = 1; + } else { + /* No key transfer acknowledged, no need to rewrite as DEL. */ + zfree(newargv); + } + newargv = NULL; /* Make it safe to call zfree() on it in the future. */ + } + + /* If we are here and a socket error happened, we don't want to retry. + * Just signal the problem to the client, but only do it if we did not + * already queue a different error reported by the destination server. */ + if (!error_from_target && socket_error) { + may_retry = 0; + goto socket_err; + } + + if (!error_from_target) { + /* Success! Update the last_dbid in migrateCachedSocket, so that we can + * avoid SELECT the next time if the target DB is the same. Reply +OK. + * + * Note: If we reached this point, even if socket_error is true + * still the SELECT command succeeded (otherwise the code jumps to + * socket_err label. */ + cs->last_dbid = dbid; + addReply(c,shared.ok); + } else { + /* On error we already sent it in the for loop above, and set + * the currently selected socket to -1 to force SELECT the next time. */ + } + + sdsfree(cmd.io.buffer.ptr); + zfree(ov); zfree(kv); zfree(newargv); + return; + +/* On socket errors we try to close the cached socket and try again. + * It is very common for the cached socket to get closed, if just reopening + * it works it's a shame to notify the error to the caller. */ + socket_err: + /* Cleanup we want to perform in both the retry and no retry case. + * Note: Closing the migrate socket will also force SELECT next time. */ + sdsfree(cmd.io.buffer.ptr); + + /* If the command was rewritten as DEL and there was a socket error, + * we already closed the socket earlier. While migrateCloseSocket() + * is idempotent, the host/port arguments are now gone, so don't do it + * again. */ + if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]); + zfree(newargv); + newargv = NULL; /* This will get reallocated on retry. */ + + /* Retry only if it's not a timeout and we never attempted a retry + * (or the code jumping here did not set may_retry to zero). */ + if (errno != ETIMEDOUT && may_retry) { + may_retry = 0; + goto try_again; + } + + /* Cleanup we want to do if no retry is attempted. */ + zfree(ov); zfree(kv); + addReplyErrorSds(c, sdscatprintf(sdsempty(), + "-IOERR error or timeout %s to target instance", + write_error ? "writing" : "reading")); + return; +} + +/* Cluster node sanity check. Returns C_OK if the node id + * is valid an C_ERR otherwise. */ +int verifyClusterNodeId(const char *name, int length) { + if (length != CLUSTER_NAMELEN) return C_ERR; + for (int i = 0; i < length; i++) { + if (name[i] >= 'a' && name[i] <= 'z') continue; + if (name[i] >= '0' && name[i] <= '9') continue; + return C_ERR; + } + return C_OK; +} + +int isValidAuxChar(int c) { + return isalnum(c) || (strchr("!#$%&()*+:;<>?@[]^{|}~", c) == NULL); +} + +int isValidAuxString(char *s, unsigned int length) { + for (unsigned i = 0; i < length; i++) { + if (!isValidAuxChar(s[i])) return 0; + } + return 1; +} diff --git a/src/cluster.h b/src/cluster.h index 6a8b0b423..d9c6d9413 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -78,4 +78,5 @@ int clusterNodeIsFailing(clusterNode *node); int clusterNodeIsNoFailover(clusterNode *node); char **clusterDebugCommandHelp(void); +ConnectionType *connTypeOfCluster(void); #endif /* __CLUSTER_H */ diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 88c30f095..aa6625fd1 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -40,9 +40,7 @@ #include #include #include -#include #include -#include /* A global reference to myself is handy to make code more clear. * Myself always points to server.cluster->myself, that is, the clusterNode @@ -136,13 +134,6 @@ static int shouldReturnTlsInfo(void) { } } -/* Links to the next and previous entries for keys in the same slot are stored - * in the dict entry metadata. See Slot to Key API below. */ -#define dictEntryNextInSlot(de) \ - (((clusterDictEntryMetadata *)dictEntryMetadata(de))->next) -#define dictEntryPrevInSlot(de) \ - (((clusterDictEntryMetadata *)dictEntryMetadata(de))->prev) - #define isSlotUnclaimed(slot) \ (server.cluster->slots[slot] == NULL || \ bitmapTestBit(server.cluster->owner_not_claiming_slot, slot)) @@ -175,13 +166,6 @@ dictType clusterNodesBlackListDictType = { NULL /* allow to expand */ }; -static ConnectionType *connTypeOfCluster(void) { - if (server.tls_cluster) { - return connectionTypeTls(); - } - - return connectionTypeTcp(); -} /* Cluster shards hash table, mapping shard id to list of nodes */ dictType clusterSdsToListType = { dictSdsHash, /* hash function */ @@ -238,17 +222,6 @@ auxFieldHandler auxFieldHandlers[] = { {"tls-port", auxTlsPortSetter, auxTlsPortGetter, auxTlsPortPresent}, }; -int isValidAuxChar(int c) { - return isalnum(c) || (strchr("!#$%&()*+:;<>?@[]^{|}~", c) == NULL); -} - -int isValidAuxString(char *s, unsigned int length) { - for (unsigned i = 0; i < length; i++) { - if (!isValidAuxChar(s[i])) return 0; - } - return 1; -} - int auxShardIdSetter(clusterNode *n, void *value, int length) { if (verifyClusterNodeId(value, length) == C_ERR) { return C_ERR; @@ -1339,67 +1312,6 @@ unsigned long getClusterConnectionsCount(void) { ((dictSize(server.cluster->nodes)-1)*2) : 0; } -/* ----------------------------------------------------------------------------- - * Key space handling - * -------------------------------------------------------------------------- */ - -/* We have 16384 hash slots. The hash slot of a given key is obtained - * as the least significant 14 bits of the crc16 of the key. - * - * However if the key contains the {...} pattern, only the part between - * { and } is hashed. This may be useful in the future to force certain - * keys to be in the same node (assuming no resharding is in progress). */ -unsigned int keyHashSlot(char *key, int keylen) { - int s, e; /* start-end indexes of { and } */ - - for (s = 0; s < keylen; s++) - if (key[s] == '{') break; - - /* No '{' ? Hash the whole key. This is the base case. */ - if (s == keylen) return crc16(key,keylen) & 0x3FFF; - - /* '{' found? Check if we have the corresponding '}'. */ - for (e = s+1; e < keylen; e++) - if (key[e] == '}') break; - - /* No '}' or nothing between {} ? Hash the whole key. */ - if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF; - - /* If we are here there is both a { and a } on its right. Hash - * what is in the middle between { and }. */ - return crc16(key+s+1,e-s-1) & 0x3FFF; -} - -/* If it can be inferred that the given glob-style pattern, as implemented in - * stringmatchlen() in util.c, only can match keys belonging to a single slot, - * that slot is returned. Otherwise -1 is returned. */ -int patternHashSlot(char *pattern, int length) { - int s = -1; /* index of the first '{' */ - - for (int i = 0; i < length; i++) { - if (pattern[i] == '*' || pattern[i] == '?' || pattern[i] == '[') { - /* Wildcard or character class found. Keys can be in any slot. */ - return -1; - } else if (pattern[i] == '\\') { - /* Escaped character. Computing slot in this case is not - * implemented. We would need a temp buffer. */ - return -1; - } else if (s == -1 && pattern[i] == '{') { - /* Opening brace '{' found. */ - s = i; - } else if (s >= 0 && pattern[i] == '}' && i == s + 1) { - /* Empty tag '{}' found. The whole key is hashed. Ignore braces. */ - s = -2; - } else if (s >= 0 && pattern[i] == '}') { - /* Non-empty tag '{...}' found. Hash what's between braces. */ - return crc16(pattern + s + 1, i - s - 1) & 0x3FFF; - } - } - - /* The pattern matches a single key. Hash the whole pattern. */ - return crc16(pattern, length) & 0x3FFF; -} - /* ----------------------------------------------------------------------------- * CLUSTER node API * -------------------------------------------------------------------------- */ @@ -1667,18 +1579,6 @@ void clusterDelNode(clusterNode *delnode) { freeClusterNode(delnode); } -/* Cluster node sanity check. Returns C_OK if the node id - * is valid an C_ERR otherwise. */ -int verifyClusterNodeId(const char *name, int length) { - if (length != CLUSTER_NAMELEN) return C_ERR; - for (int i = 0; i < length; i++) { - if (name[i] >= 'a' && name[i] <= 'z') continue; - if (name[i] >= '0' && name[i] <= '9') continue; - return C_ERR; - } - return C_OK; -} - /* Node lookup by name */ clusterNode *clusterLookupNode(const char *name, int length) { if (verifyClusterNodeId(name, length) != C_OK) return NULL; @@ -6537,656 +6437,6 @@ void removeChannelsInSlot(unsigned int slot) { zfree(channels); } -/* ----------------------------------------------------------------------------- - * DUMP, RESTORE and MIGRATE commands - * -------------------------------------------------------------------------- */ - -/* Generates a DUMP-format representation of the object 'o', adding it to the - * io stream pointed by 'rio'. This function can't fail. */ -void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) { - unsigned char buf[2]; - uint64_t crc; - - /* Serialize the object in an RDB-like format. It consist of an object type - * byte followed by the serialized object. This is understood by RESTORE. */ - rioInitWithBuffer(payload,sdsempty()); - serverAssert(rdbSaveObjectType(payload,o)); - serverAssert(rdbSaveObject(payload,o,key,dbid)); - - /* Write the footer, this is how it looks like: - * ----------------+---------------------+---------------+ - * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | - * ----------------+---------------------+---------------+ - * RDB version and CRC are both in little endian. - */ - - /* RDB version */ - buf[0] = RDB_VERSION & 0xff; - buf[1] = (RDB_VERSION >> 8) & 0xff; - payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2); - - /* CRC64 */ - crc = crc64(0,(unsigned char*)payload->io.buffer.ptr, - sdslen(payload->io.buffer.ptr)); - memrev64ifbe(&crc); - payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8); -} - -/* Verify that the RDB version of the dump payload matches the one of this Redis - * instance and that the checksum is ok. - * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR - * is returned. If rdbver_ptr is not NULL, its populated with the value read - * from the input buffer. */ -int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) { - unsigned char *footer; - uint16_t rdbver; - uint64_t crc; - - /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */ - if (len < 10) return C_ERR; - footer = p+(len-10); - - /* Set and verify RDB version. */ - rdbver = (footer[1] << 8) | footer[0]; - if (rdbver_ptr) { - *rdbver_ptr = rdbver; - } - if (rdbver > RDB_VERSION) return C_ERR; - - if (server.skip_checksum_validation) - return C_OK; - - /* Verify CRC64 */ - crc = crc64(0,p,len-8); - memrev64ifbe(&crc); - return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR; -} - -/* DUMP keyname - * DUMP is actually not used by Redis Cluster but it is the obvious - * complement of RESTORE and can be useful for different applications. */ -void dumpCommand(client *c) { - robj *o; - rio payload; - - /* Check if the key is here. */ - if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { - addReplyNull(c); - return; - } - - /* Create the DUMP encoded representation. */ - createDumpPayload(&payload,o,c->argv[1],c->db->id); - - /* Transfer to the client */ - addReplyBulkSds(c,payload.io.buffer.ptr); - return; -} - -/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */ -void restoreCommand(client *c) { - long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1; - rio payload; - int j, type, replace = 0, absttl = 0; - robj *obj; - - /* Parse additional options */ - for (j = 4; j < c->argc; j++) { - int additional = c->argc-j-1; - if (!strcasecmp(c->argv[j]->ptr,"replace")) { - replace = 1; - } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) { - absttl = 1; - } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 && - lfu_freq == -1) - { - if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL) - != C_OK) return; - if (lru_idle < 0) { - addReplyError(c,"Invalid IDLETIME value, must be >= 0"); - return; - } - lru_clock = LRU_CLOCK(); - j++; /* Consume additional arg. */ - } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 && - lru_idle == -1) - { - if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL) - != C_OK) return; - if (lfu_freq < 0 || lfu_freq > 255) { - addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255"); - return; - } - j++; /* Consume additional arg. */ - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - /* Make sure this key does not already exist here... */ - robj *key = c->argv[1]; - if (!replace && lookupKeyWrite(c->db,key) != NULL) { - addReplyErrorObject(c,shared.busykeyerr); - return; - } - - /* Check if the TTL value makes sense */ - if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) { - return; - } else if (ttl < 0) { - addReplyError(c,"Invalid TTL value, must be >= 0"); - return; - } - - /* Verify RDB version and data checksum. */ - if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR) - { - addReplyError(c,"DUMP payload version or checksum are wrong"); - return; - } - - rioInitWithBuffer(&payload,c->argv[3]->ptr); - if (((type = rdbLoadObjectType(&payload)) == -1) || - ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL)) - { - addReplyError(c,"Bad data format"); - return; - } - - /* Remove the old key if needed. */ - int deleted = 0; - if (replace) - deleted = dbDelete(c->db,key); - - if (ttl && !absttl) ttl+=commandTimeSnapshot(); - if (ttl && checkAlreadyExpired(ttl)) { - if (deleted) { - robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del; - rewriteClientCommandVector(c, 2, aux, key); - signalModifiedKey(c,c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); - server.dirty++; - } - decrRefCount(obj); - addReply(c, shared.ok); - return; - } - - /* Create the key and set the TTL if any */ - dbAdd(c->db,key,obj); - if (ttl) { - setExpire(c,c->db,key,ttl); - if (!absttl) { - /* Propagate TTL as absolute timestamp */ - robj *ttl_obj = createStringObjectFromLongLong(ttl); - rewriteClientCommandArgument(c,2,ttl_obj); - decrRefCount(ttl_obj); - rewriteClientCommandArgument(c,c->argc,shared.absttl); - } - } - objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000); - signalModifiedKey(c,c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); - addReply(c,shared.ok); - server.dirty++; -} - -/* MIGRATE socket cache implementation. - * - * We take a map between host:ip and a TCP socket that we used to connect - * to this instance in recent time. - * This sockets are closed when the max number we cache is reached, and also - * in serverCron() when they are around for more than a few seconds. */ -#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */ -#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */ - -typedef struct migrateCachedSocket { - connection *conn; - long last_dbid; - time_t last_use_time; -} migrateCachedSocket; - -/* Return a migrateCachedSocket containing a TCP socket connected with the - * target instance, possibly returning a cached one. - * - * This function is responsible of sending errors to the client if a - * connection can't be established. In this case -1 is returned. - * Otherwise on success the socket is returned, and the caller should not - * attempt to free it after usage. - * - * If the caller detects an error while using the socket, migrateCloseSocket() - * should be called so that the connection will be created from scratch - * the next time. */ -migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) { - connection *conn; - sds name = sdsempty(); - migrateCachedSocket *cs; - - /* Check if we have an already cached socket for this ip:port pair. */ - name = sdscatlen(name,host->ptr,sdslen(host->ptr)); - name = sdscatlen(name,":",1); - name = sdscatlen(name,port->ptr,sdslen(port->ptr)); - cs = dictFetchValue(server.migrate_cached_sockets,name); - if (cs) { - sdsfree(name); - cs->last_use_time = server.unixtime; - return cs; - } - - /* No cached socket, create one. */ - if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { - /* Too many items, drop one at random. */ - dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); - cs = dictGetVal(de); - connClose(cs->conn); - zfree(cs); - dictDelete(server.migrate_cached_sockets,dictGetKey(de)); - } - - /* Create the connection */ - conn = connCreate(connTypeOfCluster()); - if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout) - != C_OK) { - addReplyError(c,"-IOERR error or timeout connecting to the client"); - connClose(conn); - sdsfree(name); - return NULL; - } - connEnableTcpNoDelay(conn); - - /* Add to the cache and return it to the caller. */ - cs = zmalloc(sizeof(*cs)); - cs->conn = conn; - - cs->last_dbid = -1; - cs->last_use_time = server.unixtime; - dictAdd(server.migrate_cached_sockets,name,cs); - return cs; -} - -/* Free a migrate cached connection. */ -void migrateCloseSocket(robj *host, robj *port) { - sds name = sdsempty(); - migrateCachedSocket *cs; - - name = sdscatlen(name,host->ptr,sdslen(host->ptr)); - name = sdscatlen(name,":",1); - name = sdscatlen(name,port->ptr,sdslen(port->ptr)); - cs = dictFetchValue(server.migrate_cached_sockets,name); - if (!cs) { - sdsfree(name); - return; - } - - connClose(cs->conn); - zfree(cs); - dictDelete(server.migrate_cached_sockets,name); - sdsfree(name); -} - -void migrateCloseTimedoutSockets(void) { - dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets); - dictEntry *de; - - while((de = dictNext(di)) != NULL) { - migrateCachedSocket *cs = dictGetVal(de); - - if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { - connClose(cs->conn); - zfree(cs); - dictDelete(server.migrate_cached_sockets,dictGetKey(de)); - } - } - dictReleaseIterator(di); -} - -/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password | - * AUTH2 username password] - * - * On in the multiple keys form: - * - * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password | - * AUTH2 username password] KEYS key1 key2 ... keyN */ -void migrateCommand(client *c) { - migrateCachedSocket *cs; - int copy = 0, replace = 0, j; - char *username = NULL; - char *password = NULL; - long timeout; - long dbid; - robj **ov = NULL; /* Objects to migrate. */ - robj **kv = NULL; /* Key names. */ - robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */ - rio cmd, payload; - int may_retry = 1; - int write_error = 0; - int argv_rewritten = 0; - - /* To support the KEYS option we need the following additional state. */ - int first_key = 3; /* Argument index of the first key. */ - int num_keys = 1; /* By default only migrate the 'key' argument. */ - - /* Parse additional options */ - for (j = 6; j < c->argc; j++) { - int moreargs = (c->argc-1) - j; - if (!strcasecmp(c->argv[j]->ptr,"copy")) { - copy = 1; - } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { - replace = 1; - } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { - if (!moreargs) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - j++; - password = c->argv[j]->ptr; - redactClientCommandArgument(c,j); - } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) { - if (moreargs < 2) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - username = c->argv[++j]->ptr; - redactClientCommandArgument(c,j); - password = c->argv[++j]->ptr; - redactClientCommandArgument(c,j); - } else if (!strcasecmp(c->argv[j]->ptr,"keys")) { - if (sdslen(c->argv[3]->ptr) != 0) { - addReplyError(c, - "When using MIGRATE KEYS option, the key argument" - " must be set to the empty string"); - return; - } - first_key = j+1; - num_keys = c->argc - j - 1; - break; /* All the remaining args are keys. */ - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - /* Sanity check */ - if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK || - getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK) - { - return; - } - if (timeout <= 0) timeout = 1000; - - /* Check if the keys are here. If at least one key is to migrate, do it - * otherwise if all the keys are missing reply with "NOKEY" to signal - * the caller there was nothing to migrate. We don't return an error in - * this case, since often this is due to a normal condition like the key - * expiring in the meantime. */ - ov = zrealloc(ov,sizeof(robj*)*num_keys); - kv = zrealloc(kv,sizeof(robj*)*num_keys); - int oi = 0; - - for (j = 0; j < num_keys; j++) { - if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { - kv[oi] = c->argv[first_key+j]; - oi++; - } - } - num_keys = oi; - if (num_keys == 0) { - zfree(ov); zfree(kv); - addReplySds(c,sdsnew("+NOKEY\r\n")); - return; - } - -try_again: - write_error = 0; - - /* Connect */ - cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); - if (cs == NULL) { - zfree(ov); zfree(kv); - return; /* error sent to the client by migrateGetSocket() */ - } - - rioInitWithBuffer(&cmd,sdsempty()); - - /* Authentication */ - if (password) { - int arity = username ? 3 : 2; - serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity)); - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); - if (username) { - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username, - sdslen(username))); - } - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, - sdslen(password))); - } - - /* Send the SELECT command if the current DB is not already selected. */ - int select = cs->last_dbid != dbid; /* Should we emit SELECT? */ - if (select) { - serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); - serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); - } - - int non_expired = 0; /* Number of keys that we'll find non expired. - Note that serializing large keys may take some time - so certain keys that were found non expired by the - lookupKey() function, may be expired later. */ - - /* Create RESTORE payload and generate the protocol to call the command. */ - for (j = 0; j < num_keys; j++) { - long long ttl = 0; - long long expireat = getExpire(c->db,kv[j]); - - if (expireat != -1) { - ttl = expireat-commandTimeSnapshot(); - if (ttl < 0) { - continue; - } - if (ttl < 1) ttl = 1; - } - - /* Relocate valid (non expired) keys and values into the array in successive - * positions to remove holes created by the keys that were present - * in the first lookup but are now expired after the second lookup. */ - ov[non_expired] = ov[j]; - kv[non_expired++] = kv[j]; - - serverAssertWithInfo(c,NULL, - rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); - - if (server.cluster_enabled) - serverAssertWithInfo(c,NULL, - rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); - else - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); - serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr, - sdslen(kv[j]->ptr))); - serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); - - /* Emit the payload argument, that is the serialized object using - * the DUMP format. */ - createDumpPayload(&payload,ov[j],kv[j],dbid); - serverAssertWithInfo(c,NULL, - rioWriteBulkString(&cmd,payload.io.buffer.ptr, - sdslen(payload.io.buffer.ptr))); - sdsfree(payload.io.buffer.ptr); - - /* Add the REPLACE option to the RESTORE command if it was specified - * as a MIGRATE option. */ - if (replace) - serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); - } - - /* Fix the actual number of keys we are migrating. */ - num_keys = non_expired; - - /* Transfer the query to the other node in 64K chunks. */ - errno = 0; - { - sds buf = cmd.io.buffer.ptr; - size_t pos = 0, towrite; - int nwritten = 0; - - while ((towrite = sdslen(buf)-pos) > 0) { - towrite = (towrite > (64*1024) ? (64*1024) : towrite); - nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout); - if (nwritten != (signed)towrite) { - write_error = 1; - goto socket_err; - } - pos += nwritten; - } - } - - char buf0[1024]; /* Auth reply. */ - char buf1[1024]; /* Select reply. */ - char buf2[1024]; /* Restore reply. */ - - /* Read the AUTH reply if needed. */ - if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0) - goto socket_err; - - /* Read the SELECT reply if needed. */ - if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0) - goto socket_err; - - /* Read the RESTORE replies. */ - int error_from_target = 0; - int socket_error = 0; - int del_idx = 1; /* Index of the key argument for the replicated DEL op. */ - - /* Allocate the new argument vector that will replace the current command, - * to propagate the MIGRATE as a DEL command (if no COPY option was given). - * We allocate num_keys+1 because the additional argument is for "DEL" - * command name itself. */ - if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1)); - - for (j = 0; j < num_keys; j++) { - if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) { - socket_error = 1; - break; - } - if ((password && buf0[0] == '-') || - (select && buf1[0] == '-') || - buf2[0] == '-') - { - /* On error assume that last_dbid is no longer valid. */ - if (!error_from_target) { - cs->last_dbid = -1; - char *errbuf; - if (password && buf0[0] == '-') errbuf = buf0; - else if (select && buf1[0] == '-') errbuf = buf1; - else errbuf = buf2; - - error_from_target = 1; - addReplyErrorFormat(c,"Target instance replied with error: %s", - errbuf+1); - } - } else { - if (!copy) { - /* No COPY option: remove the local key, signal the change. */ - dbDelete(c->db,kv[j]); - signalModifiedKey(c,c->db,kv[j]); - notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id); - server.dirty++; - - /* Populate the argument vector to replace the old one. */ - newargv[del_idx++] = kv[j]; - incrRefCount(kv[j]); - } - } - } - - /* On socket error, if we want to retry, do it now before rewriting the - * command vector. We only retry if we are sure nothing was processed - * and we failed to read the first reply (j == 0 test). */ - if (!error_from_target && socket_error && j == 0 && may_retry && - errno != ETIMEDOUT) - { - goto socket_err; /* A retry is guaranteed because of tested conditions.*/ - } - - /* On socket errors, close the migration socket now that we still have - * the original host/port in the ARGV. Later the original command may be - * rewritten to DEL and will be too later. */ - if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]); - - if (!copy) { - /* Translate MIGRATE as DEL for replication/AOF. Note that we do - * this only for the keys for which we received an acknowledgement - * from the receiving Redis server, by using the del_idx index. */ - if (del_idx > 1) { - newargv[0] = createStringObject("DEL",3); - /* Note that the following call takes ownership of newargv. */ - replaceClientCommandVector(c,del_idx,newargv); - argv_rewritten = 1; - } else { - /* No key transfer acknowledged, no need to rewrite as DEL. */ - zfree(newargv); - } - newargv = NULL; /* Make it safe to call zfree() on it in the future. */ - } - - /* If we are here and a socket error happened, we don't want to retry. - * Just signal the problem to the client, but only do it if we did not - * already queue a different error reported by the destination server. */ - if (!error_from_target && socket_error) { - may_retry = 0; - goto socket_err; - } - - if (!error_from_target) { - /* Success! Update the last_dbid in migrateCachedSocket, so that we can - * avoid SELECT the next time if the target DB is the same. Reply +OK. - * - * Note: If we reached this point, even if socket_error is true - * still the SELECT command succeeded (otherwise the code jumps to - * socket_err label. */ - cs->last_dbid = dbid; - addReply(c,shared.ok); - } else { - /* On error we already sent it in the for loop above, and set - * the currently selected socket to -1 to force SELECT the next time. */ - } - - sdsfree(cmd.io.buffer.ptr); - zfree(ov); zfree(kv); zfree(newargv); - return; - -/* On socket errors we try to close the cached socket and try again. - * It is very common for the cached socket to get closed, if just reopening - * it works it's a shame to notify the error to the caller. */ -socket_err: - /* Cleanup we want to perform in both the retry and no retry case. - * Note: Closing the migrate socket will also force SELECT next time. */ - sdsfree(cmd.io.buffer.ptr); - - /* If the command was rewritten as DEL and there was a socket error, - * we already closed the socket earlier. While migrateCloseSocket() - * is idempotent, the host/port arguments are now gone, so don't do it - * again. */ - if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]); - zfree(newargv); - newargv = NULL; /* This will get reallocated on retry. */ - - /* Retry only if it's not a timeout and we never attempted a retry - * (or the code jumping here did not set may_retry to zero). */ - if (errno != ETIMEDOUT && may_retry) { - may_retry = 0; - goto try_again; - } - - /* Cleanup we want to do if no retry is attempted. */ - zfree(ov); zfree(kv); - addReplyErrorSds(c, sdscatprintf(sdsempty(), - "-IOERR error or timeout %s to target instance", - write_error ? "writing" : "reading")); - return; -} /* ----------------------------------------------------------------------------- * Cluster functions related to serving / redirecting clients