Cluster refactor: Move items from cluster_legacy.c to cluster.c

Move (but do not change) some items from cluster_legacy.c
back info cluster.c. These items are shared code that all
clustering implementations will use.

Signed-off-by: Josh Hershberg <yehoshua@redis.com>
This commit is contained in:
Josh Hershberg 2023-11-01 09:44:11 +02:00
parent 040cb6a4aa
commit ac1513221b
3 changed files with 747 additions and 750 deletions

View File

@ -0,0 +1,746 @@
#include "server.h"
#include "cluster.h"
#include <ctype.h>
/* -----------------------------------------------------------------------------
* Key space handling
* -------------------------------------------------------------------------- */
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
/* If it can be inferred that the given glob-style pattern, as implemented in
* stringmatchlen() in util.c, only can match keys belonging to a single slot,
* that slot is returned. Otherwise -1 is returned. */
int patternHashSlot(char *pattern, int length) {
int s = -1; /* index of the first '{' */
for (int i = 0; i < length; i++) {
if (pattern[i] == '*' || pattern[i] == '?' || pattern[i] == '[') {
/* Wildcard or character class found. Keys can be in any slot. */
return -1;
} else if (pattern[i] == '\\') {
/* Escaped character. Computing slot in this case is not
* implemented. We would need a temp buffer. */
return -1;
} else if (s == -1 && pattern[i] == '{') {
/* Opening brace '{' found. */
s = i;
} else if (s >= 0 && pattern[i] == '}' && i == s + 1) {
/* Empty tag '{}' found. The whole key is hashed. Ignore braces. */
s = -2;
} else if (s >= 0 && pattern[i] == '}') {
/* Non-empty tag '{...}' found. Hash what's between braces. */
return crc16(pattern + s + 1, i - s - 1) & 0x3FFF;
}
}
/* The pattern matches a single key. Hash the whole pattern. */
return crc16(pattern, length) & 0x3FFF;
}
ConnectionType *connTypeOfCluster(void) {
if (server.tls_cluster) {
return connectionTypeTls();
}
return connectionTypeTcp();
}
/* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */
/* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */
void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
unsigned char buf[2];
uint64_t crc;
/* Serialize the object in an RDB-like format. It consist of an object type
* byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o,key,dbid));
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
* ----------------+---------------------+---------------+
* RDB version and CRC are both in little endian.
*/
/* RDB version */
buf[0] = RDB_VERSION & 0xff;
buf[1] = (RDB_VERSION >> 8) & 0xff;
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
/* CRC64 */
crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
sdslen(payload->io.buffer.ptr));
memrev64ifbe(&crc);
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
}
/* Verify that the RDB version of the dump payload matches the one of this Redis
* instance and that the checksum is ok.
* If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
* is returned. If rdbver_ptr is not NULL, its populated with the value read
* from the input buffer. */
int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
unsigned char *footer;
uint16_t rdbver;
uint64_t crc;
/* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
if (len < 10) return C_ERR;
footer = p+(len-10);
/* Set and verify RDB version. */
rdbver = (footer[1] << 8) | footer[0];
if (rdbver_ptr) {
*rdbver_ptr = rdbver;
}
if (rdbver > RDB_VERSION) return C_ERR;
if (server.skip_checksum_validation)
return C_OK;
/* Verify CRC64 */
crc = crc64(0,p,len-8);
memrev64ifbe(&crc);
return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
}
/* DUMP keyname
* DUMP is actually not used by Redis Cluster but it is the obvious
* complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
robj *o;
rio payload;
/* Check if the key is here. */
if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
addReplyNull(c);
return;
}
/* Create the DUMP encoded representation. */
createDumpPayload(&payload,o,c->argv[1],c->db->id);
/* Transfer to the client */
addReplyBulkSds(c,payload.io.buffer.ptr);
return;
}
/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
void restoreCommand(client *c) {
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
rio payload;
int j, type, replace = 0, absttl = 0;
robj *obj;
/* Parse additional options */
for (j = 4; j < c->argc; j++) {
int additional = c->argc-j-1;
if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
absttl = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
lfu_freq == -1)
{
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
!= C_OK) return;
if (lru_idle < 0) {
addReplyError(c,"Invalid IDLETIME value, must be >= 0");
return;
}
lru_clock = LRU_CLOCK();
j++; /* Consume additional arg. */
} else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
lru_idle == -1)
{
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
!= C_OK) return;
if (lfu_freq < 0 || lfu_freq > 255) {
addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
return;
}
j++; /* Consume additional arg. */
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
/* Make sure this key does not already exist here... */
robj *key = c->argv[1];
if (!replace && lookupKeyWrite(c->db,key) != NULL) {
addReplyErrorObject(c,shared.busykeyerr);
return;
}
/* Check if the TTL value makes sense */
if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
return;
} else if (ttl < 0) {
addReplyError(c,"Invalid TTL value, must be >= 0");
return;
}
/* Verify RDB version and data checksum. */
if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR)
{
addReplyError(c,"DUMP payload version or checksum are wrong");
return;
}
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}
/* Remove the old key if needed. */
int deleted = 0;
if (replace)
deleted = dbDelete(c->db,key);
if (ttl && !absttl) ttl+=commandTimeSnapshot();
if (ttl && checkAlreadyExpired(ttl)) {
if (deleted) {
robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
rewriteClientCommandVector(c, 2, aux, key);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
server.dirty++;
}
decrRefCount(obj);
addReply(c, shared.ok);
return;
}
/* Create the key and set the TTL if any */
dbAdd(c->db,key,obj);
if (ttl) {
setExpire(c,c->db,key,ttl);
if (!absttl) {
/* Propagate TTL as absolute timestamp */
robj *ttl_obj = createStringObjectFromLongLong(ttl);
rewriteClientCommandArgument(c,2,ttl_obj);
decrRefCount(ttl_obj);
rewriteClientCommandArgument(c,c->argc,shared.absttl);
}
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
addReply(c,shared.ok);
server.dirty++;
}
/* MIGRATE socket cache implementation.
*
* We take a map between host:ip and a TCP socket that we used to connect
* to this instance in recent time.
* This sockets are closed when the max number we cache is reached, and also
* in serverCron() when they are around for more than a few seconds. */
#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
typedef struct migrateCachedSocket {
connection *conn;
long last_dbid;
time_t last_use_time;
} migrateCachedSocket;
/* Return a migrateCachedSocket containing a TCP socket connected with the
* target instance, possibly returning a cached one.
*
* This function is responsible of sending errors to the client if a
* connection can't be established. In this case -1 is returned.
* Otherwise on success the socket is returned, and the caller should not
* attempt to free it after usage.
*
* If the caller detects an error while using the socket, migrateCloseSocket()
* should be called so that the connection will be created from scratch
* the next time. */
migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
connection *conn;
sds name = sdsempty();
migrateCachedSocket *cs;
/* Check if we have an already cached socket for this ip:port pair. */
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (cs) {
sdsfree(name);
cs->last_use_time = server.unixtime;
return cs;
}
/* No cached socket, create one. */
if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
/* Too many items, drop one at random. */
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
/* Create the connection */
conn = connCreate(connTypeOfCluster());
if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout)
!= C_OK) {
addReplyError(c,"-IOERR error or timeout connecting to the client");
connClose(conn);
sdsfree(name);
return NULL;
}
connEnableTcpNoDelay(conn);
/* Add to the cache and return it to the caller. */
cs = zmalloc(sizeof(*cs));
cs->conn = conn;
cs->last_dbid = -1;
cs->last_use_time = server.unixtime;
dictAdd(server.migrate_cached_sockets,name,cs);
return cs;
}
/* Free a migrate cached connection. */
void migrateCloseSocket(robj *host, robj *port) {
sds name = sdsempty();
migrateCachedSocket *cs;
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (!cs) {
sdsfree(name);
return;
}
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets,name);
sdsfree(name);
}
void migrateCloseTimedoutSockets(void) {
dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
migrateCachedSocket *cs = dictGetVal(de);
if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
}
dictReleaseIterator(di);
}
/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
* AUTH2 username password]
*
* On in the multiple keys form:
*
* MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
* AUTH2 username password] KEYS key1 key2 ... keyN */
void migrateCommand(client *c) {
migrateCachedSocket *cs;
int copy = 0, replace = 0, j;
char *username = NULL;
char *password = NULL;
long timeout;
long dbid;
robj **ov = NULL; /* Objects to migrate. */
robj **kv = NULL; /* Key names. */
robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
rio cmd, payload;
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;
/* To support the KEYS option we need the following additional state. */
int first_key = 3; /* Argument index of the first key. */
int num_keys = 1; /* By default only migrate the 'key' argument. */
/* Parse additional options */
for (j = 6; j < c->argc; j++) {
int moreargs = (c->argc-1) - j;
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
if (!moreargs) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
redactClientCommandArgument(c,j);
} else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
if (moreargs < 2) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
username = c->argv[++j]->ptr;
redactClientCommandArgument(c,j);
password = c->argv[++j]->ptr;
redactClientCommandArgument(c,j);
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
}
first_key = j+1;
num_keys = c->argc - j - 1;
break; /* All the remaining args are keys. */
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
/* Sanity check */
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
{
return;
}
if (timeout <= 0) timeout = 1000;
/* Check if the keys are here. If at least one key is to migrate, do it
* otherwise if all the keys are missing reply with "NOKEY" to signal
* the caller there was nothing to migrate. We don't return an error in
* this case, since often this is due to a normal condition like the key
* expiring in the meantime. */
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
int oi = 0;
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
kv[oi] = c->argv[first_key+j];
oi++;
}
}
num_keys = oi;
if (num_keys == 0) {
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
try_again:
write_error = 0;
/* Connect */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
zfree(ov); zfree(kv);
return; /* error sent to the client by migrateGetSocket() */
}
rioInitWithBuffer(&cmd,sdsempty());
/* Authentication */
if (password) {
int arity = username ? 3 : 2;
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
if (username) {
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
sdslen(username)));
}
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
sdslen(password)));
}
/* Send the SELECT command if the current DB is not already selected. */
int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
if (select) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}
int non_expired = 0; /* Number of keys that we'll find non expired.
Note that serializing large keys may take some time
so certain keys that were found non expired by the
lookupKey() function, may be expired later. */
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
if (expireat != -1) {
ttl = expireat-commandTimeSnapshot();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
/* Relocate valid (non expired) keys and values into the array in successive
* positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */
ov[non_expired] = ov[j];
kv[non_expired++] = kv[j];
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
/* Emit the payload argument, that is the serialized object using
* the DUMP format. */
createDumpPayload(&payload,ov[j],kv[j],dbid);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
/* Add the REPLACE option to the RESTORE command if it was specified
* as a MIGRATE option. */
if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
}
/* Fix the actual number of keys we are migrating. */
num_keys = non_expired;
/* Transfer the query to the other node in 64K chunks. */
errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten;
}
}
char buf0[1024]; /* Auth reply. */
char buf1[1024]; /* Select reply. */
char buf2[1024]; /* Restore reply. */
/* Read the AUTH reply if needed. */
if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
goto socket_err;
/* Read the SELECT reply if needed. */
if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
goto socket_err;
/* Read the RESTORE replies. */
int error_from_target = 0;
int socket_error = 0;
int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
/* Allocate the new argument vector that will replace the current command,
* to propagate the MIGRATE as a DEL command (if no COPY option was given).
* We allocate num_keys+1 because the additional argument is for "DEL"
* command name itself. */
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
for (j = 0; j < num_keys; j++) {
if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1;
break;
}
if ((password && buf0[0] == '-') ||
(select && buf1[0] == '-') ||
buf2[0] == '-')
{
/* On error assume that last_dbid is no longer valid. */
if (!error_from_target) {
cs->last_dbid = -1;
char *errbuf;
if (password && buf0[0] == '-') errbuf = buf0;
else if (select && buf1[0] == '-') errbuf = buf1;
else errbuf = buf2;
error_from_target = 1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
errbuf+1);
}
} else {
if (!copy) {
/* No COPY option: remove the local key, signal the change. */
dbDelete(c->db,kv[j]);
signalModifiedKey(c,c->db,kv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
server.dirty++;
/* Populate the argument vector to replace the old one. */
newargv[del_idx++] = kv[j];
incrRefCount(kv[j]);
}
}
}
/* On socket error, if we want to retry, do it now before rewriting the
* command vector. We only retry if we are sure nothing was processed
* and we failed to read the first reply (j == 0 test). */
if (!error_from_target && socket_error && j == 0 && may_retry &&
errno != ETIMEDOUT)
{
goto socket_err; /* A retry is guaranteed because of tested conditions.*/
}
/* On socket errors, close the migration socket now that we still have
* the original host/port in the ARGV. Later the original command may be
* rewritten to DEL and will be too later. */
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
if (!copy) {
/* Translate MIGRATE as DEL for replication/AOF. Note that we do
* this only for the keys for which we received an acknowledgement
* from the receiving Redis server, by using the del_idx index. */
if (del_idx > 1) {
newargv[0] = createStringObject("DEL",3);
/* Note that the following call takes ownership of newargv. */
replaceClientCommandVector(c,del_idx,newargv);
argv_rewritten = 1;
} else {
/* No key transfer acknowledged, no need to rewrite as DEL. */
zfree(newargv);
}
newargv = NULL; /* Make it safe to call zfree() on it in the future. */
}
/* If we are here and a socket error happened, we don't want to retry.
* Just signal the problem to the client, but only do it if we did not
* already queue a different error reported by the destination server. */
if (!error_from_target && socket_error) {
may_retry = 0;
goto socket_err;
}
if (!error_from_target) {
/* Success! Update the last_dbid in migrateCachedSocket, so that we can
* avoid SELECT the next time if the target DB is the same. Reply +OK.
*
* Note: If we reached this point, even if socket_error is true
* still the SELECT command succeeded (otherwise the code jumps to
* socket_err label. */
cs->last_dbid = dbid;
addReply(c,shared.ok);
} else {
/* On error we already sent it in the for loop above, and set
* the currently selected socket to -1 to force SELECT the next time. */
}
sdsfree(cmd.io.buffer.ptr);
zfree(ov); zfree(kv); zfree(newargv);
return;
/* On socket errors we try to close the cached socket and try again.
* It is very common for the cached socket to get closed, if just reopening
* it works it's a shame to notify the error to the caller. */
socket_err:
/* Cleanup we want to perform in both the retry and no retry case.
* Note: Closing the migrate socket will also force SELECT next time. */
sdsfree(cmd.io.buffer.ptr);
/* If the command was rewritten as DEL and there was a socket error,
* we already closed the socket earlier. While migrateCloseSocket()
* is idempotent, the host/port arguments are now gone, so don't do it
* again. */
if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
zfree(newargv);
newargv = NULL; /* This will get reallocated on retry. */
/* Retry only if it's not a timeout and we never attempted a retry
* (or the code jumping here did not set may_retry to zero). */
if (errno != ETIMEDOUT && may_retry) {
may_retry = 0;
goto try_again;
}
/* Cleanup we want to do if no retry is attempted. */
zfree(ov); zfree(kv);
addReplyErrorSds(c, sdscatprintf(sdsempty(),
"-IOERR error or timeout %s to target instance",
write_error ? "writing" : "reading"));
return;
}
/* Cluster node sanity check. Returns C_OK if the node id
* is valid an C_ERR otherwise. */
int verifyClusterNodeId(const char *name, int length) {
if (length != CLUSTER_NAMELEN) return C_ERR;
for (int i = 0; i < length; i++) {
if (name[i] >= 'a' && name[i] <= 'z') continue;
if (name[i] >= '0' && name[i] <= '9') continue;
return C_ERR;
}
return C_OK;
}
int isValidAuxChar(int c) {
return isalnum(c) || (strchr("!#$%&()*+:;<>?@[]^{|}~", c) == NULL);
}
int isValidAuxString(char *s, unsigned int length) {
for (unsigned i = 0; i < length; i++) {
if (!isValidAuxChar(s[i])) return 0;
}
return 1;
}

View File

@ -78,4 +78,5 @@ int clusterNodeIsFailing(clusterNode *node);
int clusterNodeIsNoFailover(clusterNode *node);
char **clusterDebugCommandHelp(void);
ConnectionType *connTypeOfCluster(void);
#endif /* __CLUSTER_H */

View File

@ -40,9 +40,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <math.h>
#include <ctype.h>
/* A global reference to myself is handy to make code more clear.
* Myself always points to server.cluster->myself, that is, the clusterNode
@ -136,13 +134,6 @@ static int shouldReturnTlsInfo(void) {
}
}
/* Links to the next and previous entries for keys in the same slot are stored
* in the dict entry metadata. See Slot to Key API below. */
#define dictEntryNextInSlot(de) \
(((clusterDictEntryMetadata *)dictEntryMetadata(de))->next)
#define dictEntryPrevInSlot(de) \
(((clusterDictEntryMetadata *)dictEntryMetadata(de))->prev)
#define isSlotUnclaimed(slot) \
(server.cluster->slots[slot] == NULL || \
bitmapTestBit(server.cluster->owner_not_claiming_slot, slot))
@ -175,13 +166,6 @@ dictType clusterNodesBlackListDictType = {
NULL /* allow to expand */
};
static ConnectionType *connTypeOfCluster(void) {
if (server.tls_cluster) {
return connectionTypeTls();
}
return connectionTypeTcp();
}
/* Cluster shards hash table, mapping shard id to list of nodes */
dictType clusterSdsToListType = {
dictSdsHash, /* hash function */
@ -238,17 +222,6 @@ auxFieldHandler auxFieldHandlers[] = {
{"tls-port", auxTlsPortSetter, auxTlsPortGetter, auxTlsPortPresent},
};
int isValidAuxChar(int c) {
return isalnum(c) || (strchr("!#$%&()*+:;<>?@[]^{|}~", c) == NULL);
}
int isValidAuxString(char *s, unsigned int length) {
for (unsigned i = 0; i < length; i++) {
if (!isValidAuxChar(s[i])) return 0;
}
return 1;
}
int auxShardIdSetter(clusterNode *n, void *value, int length) {
if (verifyClusterNodeId(value, length) == C_ERR) {
return C_ERR;
@ -1339,67 +1312,6 @@ unsigned long getClusterConnectionsCount(void) {
((dictSize(server.cluster->nodes)-1)*2) : 0;
}
/* -----------------------------------------------------------------------------
* Key space handling
* -------------------------------------------------------------------------- */
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
/* If it can be inferred that the given glob-style pattern, as implemented in
* stringmatchlen() in util.c, only can match keys belonging to a single slot,
* that slot is returned. Otherwise -1 is returned. */
int patternHashSlot(char *pattern, int length) {
int s = -1; /* index of the first '{' */
for (int i = 0; i < length; i++) {
if (pattern[i] == '*' || pattern[i] == '?' || pattern[i] == '[') {
/* Wildcard or character class found. Keys can be in any slot. */
return -1;
} else if (pattern[i] == '\\') {
/* Escaped character. Computing slot in this case is not
* implemented. We would need a temp buffer. */
return -1;
} else if (s == -1 && pattern[i] == '{') {
/* Opening brace '{' found. */
s = i;
} else if (s >= 0 && pattern[i] == '}' && i == s + 1) {
/* Empty tag '{}' found. The whole key is hashed. Ignore braces. */
s = -2;
} else if (s >= 0 && pattern[i] == '}') {
/* Non-empty tag '{...}' found. Hash what's between braces. */
return crc16(pattern + s + 1, i - s - 1) & 0x3FFF;
}
}
/* The pattern matches a single key. Hash the whole pattern. */
return crc16(pattern, length) & 0x3FFF;
}
/* -----------------------------------------------------------------------------
* CLUSTER node API
* -------------------------------------------------------------------------- */
@ -1667,18 +1579,6 @@ void clusterDelNode(clusterNode *delnode) {
freeClusterNode(delnode);
}
/* Cluster node sanity check. Returns C_OK if the node id
* is valid an C_ERR otherwise. */
int verifyClusterNodeId(const char *name, int length) {
if (length != CLUSTER_NAMELEN) return C_ERR;
for (int i = 0; i < length; i++) {
if (name[i] >= 'a' && name[i] <= 'z') continue;
if (name[i] >= '0' && name[i] <= '9') continue;
return C_ERR;
}
return C_OK;
}
/* Node lookup by name */
clusterNode *clusterLookupNode(const char *name, int length) {
if (verifyClusterNodeId(name, length) != C_OK) return NULL;
@ -6537,656 +6437,6 @@ void removeChannelsInSlot(unsigned int slot) {
zfree(channels);
}
/* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */
/* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */
void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
unsigned char buf[2];
uint64_t crc;
/* Serialize the object in an RDB-like format. It consist of an object type
* byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o,key,dbid));
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
* ----------------+---------------------+---------------+
* RDB version and CRC are both in little endian.
*/
/* RDB version */
buf[0] = RDB_VERSION & 0xff;
buf[1] = (RDB_VERSION >> 8) & 0xff;
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
/* CRC64 */
crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
sdslen(payload->io.buffer.ptr));
memrev64ifbe(&crc);
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
}
/* Verify that the RDB version of the dump payload matches the one of this Redis
* instance and that the checksum is ok.
* If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
* is returned. If rdbver_ptr is not NULL, its populated with the value read
* from the input buffer. */
int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
unsigned char *footer;
uint16_t rdbver;
uint64_t crc;
/* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
if (len < 10) return C_ERR;
footer = p+(len-10);
/* Set and verify RDB version. */
rdbver = (footer[1] << 8) | footer[0];
if (rdbver_ptr) {
*rdbver_ptr = rdbver;
}
if (rdbver > RDB_VERSION) return C_ERR;
if (server.skip_checksum_validation)
return C_OK;
/* Verify CRC64 */
crc = crc64(0,p,len-8);
memrev64ifbe(&crc);
return (memcmp(&crc,footer+2,8) == 0) ? C_OK : C_ERR;
}
/* DUMP keyname
* DUMP is actually not used by Redis Cluster but it is the obvious
* complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
robj *o;
rio payload;
/* Check if the key is here. */
if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
addReplyNull(c);
return;
}
/* Create the DUMP encoded representation. */
createDumpPayload(&payload,o,c->argv[1],c->db->id);
/* Transfer to the client */
addReplyBulkSds(c,payload.io.buffer.ptr);
return;
}
/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
void restoreCommand(client *c) {
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
rio payload;
int j, type, replace = 0, absttl = 0;
robj *obj;
/* Parse additional options */
for (j = 4; j < c->argc; j++) {
int additional = c->argc-j-1;
if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
absttl = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
lfu_freq == -1)
{
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
!= C_OK) return;
if (lru_idle < 0) {
addReplyError(c,"Invalid IDLETIME value, must be >= 0");
return;
}
lru_clock = LRU_CLOCK();
j++; /* Consume additional arg. */
} else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
lru_idle == -1)
{
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
!= C_OK) return;
if (lfu_freq < 0 || lfu_freq > 255) {
addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
return;
}
j++; /* Consume additional arg. */
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
/* Make sure this key does not already exist here... */
robj *key = c->argv[1];
if (!replace && lookupKeyWrite(c->db,key) != NULL) {
addReplyErrorObject(c,shared.busykeyerr);
return;
}
/* Check if the TTL value makes sense */
if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
return;
} else if (ttl < 0) {
addReplyError(c,"Invalid TTL value, must be >= 0");
return;
}
/* Verify RDB version and data checksum. */
if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR)
{
addReplyError(c,"DUMP payload version or checksum are wrong");
return;
}
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}
/* Remove the old key if needed. */
int deleted = 0;
if (replace)
deleted = dbDelete(c->db,key);
if (ttl && !absttl) ttl+=commandTimeSnapshot();
if (ttl && checkAlreadyExpired(ttl)) {
if (deleted) {
robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
rewriteClientCommandVector(c, 2, aux, key);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
server.dirty++;
}
decrRefCount(obj);
addReply(c, shared.ok);
return;
}
/* Create the key and set the TTL if any */
dbAdd(c->db,key,obj);
if (ttl) {
setExpire(c,c->db,key,ttl);
if (!absttl) {
/* Propagate TTL as absolute timestamp */
robj *ttl_obj = createStringObjectFromLongLong(ttl);
rewriteClientCommandArgument(c,2,ttl_obj);
decrRefCount(ttl_obj);
rewriteClientCommandArgument(c,c->argc,shared.absttl);
}
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
addReply(c,shared.ok);
server.dirty++;
}
/* MIGRATE socket cache implementation.
*
* We take a map between host:ip and a TCP socket that we used to connect
* to this instance in recent time.
* This sockets are closed when the max number we cache is reached, and also
* in serverCron() when they are around for more than a few seconds. */
#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
typedef struct migrateCachedSocket {
connection *conn;
long last_dbid;
time_t last_use_time;
} migrateCachedSocket;
/* Return a migrateCachedSocket containing a TCP socket connected with the
* target instance, possibly returning a cached one.
*
* This function is responsible of sending errors to the client if a
* connection can't be established. In this case -1 is returned.
* Otherwise on success the socket is returned, and the caller should not
* attempt to free it after usage.
*
* If the caller detects an error while using the socket, migrateCloseSocket()
* should be called so that the connection will be created from scratch
* the next time. */
migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
connection *conn;
sds name = sdsempty();
migrateCachedSocket *cs;
/* Check if we have an already cached socket for this ip:port pair. */
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (cs) {
sdsfree(name);
cs->last_use_time = server.unixtime;
return cs;
}
/* No cached socket, create one. */
if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
/* Too many items, drop one at random. */
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
/* Create the connection */
conn = connCreate(connTypeOfCluster());
if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout)
!= C_OK) {
addReplyError(c,"-IOERR error or timeout connecting to the client");
connClose(conn);
sdsfree(name);
return NULL;
}
connEnableTcpNoDelay(conn);
/* Add to the cache and return it to the caller. */
cs = zmalloc(sizeof(*cs));
cs->conn = conn;
cs->last_dbid = -1;
cs->last_use_time = server.unixtime;
dictAdd(server.migrate_cached_sockets,name,cs);
return cs;
}
/* Free a migrate cached connection. */
void migrateCloseSocket(robj *host, robj *port) {
sds name = sdsempty();
migrateCachedSocket *cs;
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (!cs) {
sdsfree(name);
return;
}
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets,name);
sdsfree(name);
}
void migrateCloseTimedoutSockets(void) {
dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
migrateCachedSocket *cs = dictGetVal(de);
if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
connClose(cs->conn);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
}
dictReleaseIterator(di);
}
/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
* AUTH2 username password]
*
* On in the multiple keys form:
*
* MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
* AUTH2 username password] KEYS key1 key2 ... keyN */
void migrateCommand(client *c) {
migrateCachedSocket *cs;
int copy = 0, replace = 0, j;
char *username = NULL;
char *password = NULL;
long timeout;
long dbid;
robj **ov = NULL; /* Objects to migrate. */
robj **kv = NULL; /* Key names. */
robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
rio cmd, payload;
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;
/* To support the KEYS option we need the following additional state. */
int first_key = 3; /* Argument index of the first key. */
int num_keys = 1; /* By default only migrate the 'key' argument. */
/* Parse additional options */
for (j = 6; j < c->argc; j++) {
int moreargs = (c->argc-1) - j;
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
if (!moreargs) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
redactClientCommandArgument(c,j);
} else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
if (moreargs < 2) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
username = c->argv[++j]->ptr;
redactClientCommandArgument(c,j);
password = c->argv[++j]->ptr;
redactClientCommandArgument(c,j);
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
}
first_key = j+1;
num_keys = c->argc - j - 1;
break; /* All the remaining args are keys. */
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
/* Sanity check */
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
{
return;
}
if (timeout <= 0) timeout = 1000;
/* Check if the keys are here. If at least one key is to migrate, do it
* otherwise if all the keys are missing reply with "NOKEY" to signal
* the caller there was nothing to migrate. We don't return an error in
* this case, since often this is due to a normal condition like the key
* expiring in the meantime. */
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
int oi = 0;
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
kv[oi] = c->argv[first_key+j];
oi++;
}
}
num_keys = oi;
if (num_keys == 0) {
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
try_again:
write_error = 0;
/* Connect */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
zfree(ov); zfree(kv);
return; /* error sent to the client by migrateGetSocket() */
}
rioInitWithBuffer(&cmd,sdsempty());
/* Authentication */
if (password) {
int arity = username ? 3 : 2;
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
if (username) {
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
sdslen(username)));
}
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
sdslen(password)));
}
/* Send the SELECT command if the current DB is not already selected. */
int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
if (select) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}
int non_expired = 0; /* Number of keys that we'll find non expired.
Note that serializing large keys may take some time
so certain keys that were found non expired by the
lookupKey() function, may be expired later. */
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
if (expireat != -1) {
ttl = expireat-commandTimeSnapshot();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
/* Relocate valid (non expired) keys and values into the array in successive
* positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */
ov[non_expired] = ov[j];
kv[non_expired++] = kv[j];
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
/* Emit the payload argument, that is the serialized object using
* the DUMP format. */
createDumpPayload(&payload,ov[j],kv[j],dbid);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
/* Add the REPLACE option to the RESTORE command if it was specified
* as a MIGRATE option. */
if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
}
/* Fix the actual number of keys we are migrating. */
num_keys = non_expired;
/* Transfer the query to the other node in 64K chunks. */
errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten;
}
}
char buf0[1024]; /* Auth reply. */
char buf1[1024]; /* Select reply. */
char buf2[1024]; /* Restore reply. */
/* Read the AUTH reply if needed. */
if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
goto socket_err;
/* Read the SELECT reply if needed. */
if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
goto socket_err;
/* Read the RESTORE replies. */
int error_from_target = 0;
int socket_error = 0;
int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
/* Allocate the new argument vector that will replace the current command,
* to propagate the MIGRATE as a DEL command (if no COPY option was given).
* We allocate num_keys+1 because the additional argument is for "DEL"
* command name itself. */
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
for (j = 0; j < num_keys; j++) {
if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1;
break;
}
if ((password && buf0[0] == '-') ||
(select && buf1[0] == '-') ||
buf2[0] == '-')
{
/* On error assume that last_dbid is no longer valid. */
if (!error_from_target) {
cs->last_dbid = -1;
char *errbuf;
if (password && buf0[0] == '-') errbuf = buf0;
else if (select && buf1[0] == '-') errbuf = buf1;
else errbuf = buf2;
error_from_target = 1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
errbuf+1);
}
} else {
if (!copy) {
/* No COPY option: remove the local key, signal the change. */
dbDelete(c->db,kv[j]);
signalModifiedKey(c,c->db,kv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id);
server.dirty++;
/* Populate the argument vector to replace the old one. */
newargv[del_idx++] = kv[j];
incrRefCount(kv[j]);
}
}
}
/* On socket error, if we want to retry, do it now before rewriting the
* command vector. We only retry if we are sure nothing was processed
* and we failed to read the first reply (j == 0 test). */
if (!error_from_target && socket_error && j == 0 && may_retry &&
errno != ETIMEDOUT)
{
goto socket_err; /* A retry is guaranteed because of tested conditions.*/
}
/* On socket errors, close the migration socket now that we still have
* the original host/port in the ARGV. Later the original command may be
* rewritten to DEL and will be too later. */
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
if (!copy) {
/* Translate MIGRATE as DEL for replication/AOF. Note that we do
* this only for the keys for which we received an acknowledgement
* from the receiving Redis server, by using the del_idx index. */
if (del_idx > 1) {
newargv[0] = createStringObject("DEL",3);
/* Note that the following call takes ownership of newargv. */
replaceClientCommandVector(c,del_idx,newargv);
argv_rewritten = 1;
} else {
/* No key transfer acknowledged, no need to rewrite as DEL. */
zfree(newargv);
}
newargv = NULL; /* Make it safe to call zfree() on it in the future. */
}
/* If we are here and a socket error happened, we don't want to retry.
* Just signal the problem to the client, but only do it if we did not
* already queue a different error reported by the destination server. */
if (!error_from_target && socket_error) {
may_retry = 0;
goto socket_err;
}
if (!error_from_target) {
/* Success! Update the last_dbid in migrateCachedSocket, so that we can
* avoid SELECT the next time if the target DB is the same. Reply +OK.
*
* Note: If we reached this point, even if socket_error is true
* still the SELECT command succeeded (otherwise the code jumps to
* socket_err label. */
cs->last_dbid = dbid;
addReply(c,shared.ok);
} else {
/* On error we already sent it in the for loop above, and set
* the currently selected socket to -1 to force SELECT the next time. */
}
sdsfree(cmd.io.buffer.ptr);
zfree(ov); zfree(kv); zfree(newargv);
return;
/* On socket errors we try to close the cached socket and try again.
* It is very common for the cached socket to get closed, if just reopening
* it works it's a shame to notify the error to the caller. */
socket_err:
/* Cleanup we want to perform in both the retry and no retry case.
* Note: Closing the migrate socket will also force SELECT next time. */
sdsfree(cmd.io.buffer.ptr);
/* If the command was rewritten as DEL and there was a socket error,
* we already closed the socket earlier. While migrateCloseSocket()
* is idempotent, the host/port arguments are now gone, so don't do it
* again. */
if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
zfree(newargv);
newargv = NULL; /* This will get reallocated on retry. */
/* Retry only if it's not a timeout and we never attempted a retry
* (or the code jumping here did not set may_retry to zero). */
if (errno != ETIMEDOUT && may_retry) {
may_retry = 0;
goto try_again;
}
/* Cleanup we want to do if no retry is attempted. */
zfree(ov); zfree(kv);
addReplyErrorSds(c, sdscatprintf(sdsempty(),
"-IOERR error or timeout %s to target instance",
write_error ? "writing" : "reading"));
return;
}
/* -----------------------------------------------------------------------------
* Cluster functions related to serving / redirecting clients