diff --git a/redis.conf b/redis.conf index 060510768..74b6c018f 100644 --- a/redis.conf +++ b/redis.conf @@ -377,6 +377,22 @@ repl-diskless-sync no # it entirely just set it to 0 seconds and the transfer will start ASAP. repl-diskless-sync-delay 5 +# Replica can load the rdb it reads from the replication link directly from the +# socket, or store the rdb to a file and read that file after it was completely +# recived from the master. +# In many cases the disk is slower than the network, and storing and loading +# the rdb file may increase replication time (and even increase the master's +# Copy on Write memory and salve buffers). +# However, parsing the rdb file directly from the socket may mean that we have +# to flush the contents of the current database before the full rdb was received. +# for this reason we have the following options: +# "disabled" - Don't use diskless load (store the rdb file to the disk first) +# "on-empty-db" - Use diskless load only when it is completely safe. +# "swapdb" - Keep a copy of the current db contents in RAM while parsing +# the data directly from the socket. note that this requires +# sufficient memory, if you don't have it, you risk an OOM kill. +repl-diskless-load disabled + # Replicas send PINGs to server in a predefined interval. It's possible to change # this interval with the repl_ping_replica_period option. The default value is 10 # seconds. diff --git a/src/anet.c b/src/anet.c index 2981fca13..2088f4fb1 100644 --- a/src/anet.c +++ b/src/anet.c @@ -193,6 +193,20 @@ int anetSendTimeout(char *err, int fd, long long ms) { return ANET_OK; } +/* Set the socket receive timeout (SO_RCVTIMEO socket option) to the specified + * number of milliseconds, or disable it if the 'ms' argument is zero. */ +int anetRecvTimeout(char *err, int fd, long long ms) { + struct timeval tv; + + tv.tv_sec = ms/1000; + tv.tv_usec = (ms%1000)*1000; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { + anetSetError(err, "setsockopt SO_RCVTIMEO: %s", strerror(errno)); + return ANET_ERR; + } + return ANET_OK; +} + /* anetGenericResolve() is called by anetResolve() and anetResolveIP() to * do the actual work. It resolves the hostname "host" and set the string * representation of the IP address into the buffer pointed by "ipbuf". diff --git a/src/anet.h b/src/anet.h index 7142f78d2..dd735240d 100644 --- a/src/anet.h +++ b/src/anet.h @@ -70,6 +70,7 @@ int anetEnableTcpNoDelay(char *err, int fd); int anetDisableTcpNoDelay(char *err, int fd); int anetTcpKeepAlive(char *err, int fd); int anetSendTimeout(char *err, int fd, long long ms); +int anetRecvTimeout(char *err, int fd, long long ms); int anetPeerToString(int fd, char *ip, size_t ip_len, int *port); int anetKeepAlive(char *err, int fd, int interval); int anetSockName(int fd, char *ip, size_t ip_len, int *port); diff --git a/src/aof.c b/src/aof.c index 4744847d2..565ee8073 100644 --- a/src/aof.c +++ b/src/aof.c @@ -729,7 +729,7 @@ int loadAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; fakeClient = createFakeClient(); - startLoading(fp); + startLoadingFile(fp, filename); /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ diff --git a/src/config.c b/src/config.c index 2e6e9a6b7..fde00ddf5 100644 --- a/src/config.c +++ b/src/config.c @@ -91,6 +91,13 @@ configEnum aof_fsync_enum[] = { {NULL, 0} }; +configEnum repl_diskless_load_enum[] = { + {"disabled", REPL_DISKLESS_LOAD_DISABLED}, + {"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY}, + {"swapdb", REPL_DISKLESS_LOAD_SWAPDB}, + {NULL, 0} +}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -427,6 +434,11 @@ void loadServerConfigFromString(char *config) { err = "repl-timeout must be 1 or greater"; goto loaderr; } + } else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) { + server.repl_diskless_load = configEnumGetValue(repl_diskless_load_enum,argv[1]); + if (server.repl_diskless_load == INT_MIN) { + err = "argument must be 'disabled', 'on-empty-db', 'swapdb' or 'flushdb'"; + } } else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) { server.repl_diskless_sync_delay = atoi(argv[1]); if (server.repl_diskless_sync_delay < 0) { @@ -466,12 +478,10 @@ void loadServerConfigFromString(char *config) { if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ; if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ; } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) { - int yes; - - if ((yes = yesnotoi(argv[1])) == -1) { + if ((server.aof_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - server.aof_state = yes ? AOF_ON : AOF_OFF; + server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF; } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { if (!pathIsBaseName(argv[1])) { err = "appendfilename can't be a path, just a filename"; @@ -497,6 +507,12 @@ void loadServerConfigFromString(char *config) { argc == 2) { server.aof_rewrite_min_size = memtoll(argv[1],NULL); + } else if (!strcasecmp(argv[0],"rdb-key-save-delay") && argc==2) { + server.rdb_key_save_delay = atoi(argv[1]); + if (server.rdb_key_save_delay < 0) { + err = "rdb-key-save-delay can't be negative"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; @@ -942,6 +958,7 @@ void configSetCommand(client *c) { int enable = yesnotoi(o->ptr); if (enable == -1) goto badfmt; + server.aof_enabled = enable; if (enable == 0 && server.aof_state != AOF_OFF) { stopAppendOnly(); } else if (enable && server.aof_state == AOF_OFF) { @@ -1132,6 +1149,8 @@ void configSetCommand(client *c) { "slave-priority",server.slave_priority,0,INT_MAX) { } config_set_numerical_field( "replica-priority",server.slave_priority,0,INT_MAX) { + } config_set_numerical_field( + "rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) { } config_set_numerical_field( "slave-announce-port",server.slave_announce_port,0,65535) { } config_set_numerical_field( @@ -1199,6 +1218,8 @@ void configSetCommand(client *c) { "maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum) { } config_set_enum_field( "appendfsync",server.aof_fsync,aof_fsync_enum) { + } config_set_enum_field( + "repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum) { /* Everyhing else is an error... */ } config_set_else { @@ -1346,6 +1367,7 @@ void configGetCommand(client *c) { config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor); config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor); config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay); + config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay); config_get_numerical_field("tcp-keepalive",server.tcpkeepalive); /* Bool (yes/no) values */ @@ -1370,12 +1392,14 @@ void configGetCommand(client *c) { server.aof_fsync,aof_fsync_enum); config_get_enum_field("syslog-facility", server.syslog_facility,syslog_facility_enum); + config_get_enum_field("repl-diskless-load", + server.repl_diskless_load,repl_diskless_load_enum); /* Everything we can't handle with macros follows. */ if (stringmatch(pattern,"appendonly",1)) { addReplyBulkCString(c,"appendonly"); - addReplyBulkCString(c,server.aof_state == AOF_OFF ? "no" : "yes"); + addReplyBulkCString(c,server.aof_enabled ? "yes" : "no"); matches++; } if (stringmatch(pattern,"dir",1)) { @@ -2109,6 +2133,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"repl-timeout",server.repl_timeout,CONFIG_DEFAULT_REPL_TIMEOUT); rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,CONFIG_DEFAULT_REPL_BACKLOG_SIZE); rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT); + rewriteConfigEnumOption(state,"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum,CONFIG_DEFAULT_REPL_DISKLESS_LOAD); rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY); rewriteConfigNumericalOption(state,"replica-priority",server.slave_priority,CONFIG_DEFAULT_SLAVE_PRIORITY); rewriteConfigNumericalOption(state,"min-replicas-to-write",server.repl_min_slaves_to_write,CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE); @@ -2128,7 +2153,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN); rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX); rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS); - rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0); + rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0); rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME); rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC); rewriteConfigNumericalOption(state,"auto-aof-rewrite-percentage",server.aof_rewrite_perc,AOF_REWRITE_PERC); @@ -2157,6 +2182,7 @@ int rewriteConfig(char *path) { rewriteConfigClientoutputbufferlimitOption(state); rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); + rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY); /* Rewrite Sentinel config if in Sentinel mode. */ if (server.sentinel_mode) rewriteConfigSentinelOption(state); diff --git a/src/db.c b/src/db.c index 07051dad4..8b7656802 100644 --- a/src/db.c +++ b/src/db.c @@ -344,7 +344,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { * On success the fuction returns the number of keys removed from the * database(s). Otherwise -1 is returned in the specific case the * DB number is out of range, and errno is set to EINVAL. */ -long long emptyDb(int dbnum, int flags, void(callback)(void*)) { +long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) { int async = (flags & EMPTYDB_ASYNC); long long removed = 0; @@ -362,12 +362,12 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { } for (int j = startdb; j <= enddb; j++) { - removed += dictSize(server.db[j].dict); + removed += dictSize(dbarray[j].dict); if (async) { - emptyDbAsync(&server.db[j]); + emptyDbAsync(&dbarray[j]); } else { - dictEmpty(server.db[j].dict,callback); - dictEmpty(server.db[j].expires,callback); + dictEmpty(dbarray[j].dict,callback); + dictEmpty(dbarray[j].expires,callback); } } if (server.cluster_enabled) { @@ -381,6 +381,10 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { return removed; } +long long emptyDb(int dbnum, int flags, void(callback)(void*)) { + return emptyDbGeneric(server.db, dbnum, flags, callback); +} + int selectDb(client *c, int id) { if (id < 0 || id >= server.dbnum) return C_ERR; @@ -388,6 +392,15 @@ int selectDb(client *c, int id) { return C_OK; } +long long dbTotalServerKeyCount() { + long long total = 0; + int j; + for (j = 0; j < server.dbnum; j++) { + total += dictSize(server.db[j].dict); + } + return total; +} + /*----------------------------------------------------------------------------- * Hooks for key space changes. * diff --git a/src/rdb.c b/src/rdb.c index 95e4766ea..c566378fb 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -44,6 +44,7 @@ #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) +char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); @@ -61,11 +62,17 @@ void rdbCheckThenExit(int linenum, char *reason, ...) { if (!rdbCheckMode) { serverLog(LL_WARNING, "%s", msg); - char *argv[2] = {"",server.rdb_filename}; - redis_check_rdb_main(2,argv,NULL); + if (rdbFileBeingLoaded) { + char *argv[2] = {"",rdbFileBeingLoaded}; + redis_check_rdb_main(2,argv,NULL); + } else { + serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation."); + return; + } } else { rdbCheckError("%s",msg); } + serverLog(LL_WARNING, "Terminating server after rdb file reading failure."); exit(1); } @@ -1039,6 +1046,11 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveObject(rdb,val,key) == -1) return -1; + + /* Delay return if required (for testing) */ + if (server.rdb_key_save_delay) + usleep(server.rdb_key_save_delay); + return 1; } @@ -1800,18 +1812,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ -void startLoading(FILE *fp) { - struct stat sb; - +void startLoading(size_t size) { /* Load the DB */ server.loading = 1; server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; - if (fstat(fileno(fp), &sb) == -1) { - server.loading_total_bytes = 0; - } else { - server.loading_total_bytes = sb.st_size; - } + server.loading_total_bytes = size; +} + +/* Mark that we are loading in the global state and setup the fields + * needed to provide loading stats. + * 'filename' is optional and used for rdb-check on error */ +void startLoadingFile(FILE *fp, char* filename) { + struct stat sb; + if (fstat(fileno(fp), &sb) == -1) + sb.st_size = 0; + rdbFileBeingLoaded = filename; + startLoading(sb.st_size); } /* Refresh the loading progress info */ @@ -1824,6 +1841,7 @@ void loadingProgress(off_t pos) { /* Loading finished */ void stopLoading(void) { server.loading = 0; + rdbFileBeingLoaded = NULL; } /* Track loading progress in order to serve client's from time to time @@ -2089,7 +2107,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) { int retval; if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - startLoading(fp); + startLoadingFile(fp, filename); rioInitWithFile(&rdb,fp); retval = rdbLoadRio(&rdb,rsi,0); fclose(fp); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index ec00ee71c..e2d71b5a5 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } expiretime = -1; - startLoading(fp); + startLoadingFile(fp, rdbfilename); while(1) { robj *key, *val; @@ -314,6 +314,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } if (closefile) fclose(fp); + stopLoading(); return 0; eoferr: /* unexpected end of file is handled here with a fatal exit */ @@ -324,6 +325,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ } err: if (closefile) fclose(fp); + stopLoading(); return 1; } diff --git a/src/replication.c b/src/replication.c index 63a67a06a..e2bac08bd 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1113,11 +1113,22 @@ void restartAOFAfterSYNC() { } } +static int useDisklessLoad() { + /* compute boolean decision to use diskless load */ + return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || + (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); +} + + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[4096]; ssize_t nread, readlen, nwritten; + int use_diskless_load; + redisDb *diskless_load_backup = NULL; + int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + int i; off_t left; UNUSED(el); UNUSED(privdata); @@ -1173,90 +1184,177 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * at the next call. */ server.repl_transfer_size = 0; serverLog(LL_NOTICE, - "MASTER <-> REPLICA sync: receiving streamed RDB from master"); + "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s", + useDisklessLoad()? "to parser":"to disk"); } else { usemark = 0; server.repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, - "MASTER <-> REPLICA sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + "MASTER <-> REPLICA sync: receiving %lld bytes from master %s", + (long long) server.repl_transfer_size, + useDisklessLoad()? "to parser":"to disk"); } return; } - /* Read bulk data */ - if (usemark) { - readlen = sizeof(buf); - } else { - left = server.repl_transfer_size - server.repl_transfer_read; - readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); - } + use_diskless_load = useDisklessLoad(); + if (!use_diskless_load) { - nread = read(fd,buf,readlen); - if (nread <= 0) { - serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", - (nread == -1) ? strerror(errno) : "connection lost"); - cancelReplicationHandshake(); - return; - } - server.stat_net_input_bytes += nread; - - /* When a mark is used, we want to detect EOF asap in order to avoid - * writing the EOF mark into the file... */ - int eof_reached = 0; - - if (usemark) { - /* Update the last bytes array, and check if it matches our delimiter.*/ - if (nread >= CONFIG_RUN_ID_SIZE) { - memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); + /* read the data from the socket, store it to a file and search for the EOF */ + if (usemark) { + readlen = sizeof(buf); } else { - int rem = CONFIG_RUN_ID_SIZE-nread; - memmove(lastbytes,lastbytes+nread,rem); - memcpy(lastbytes+rem,buf,nread); + left = server.repl_transfer_size - server.repl_transfer_read; + readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); } - if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; - } - server.repl_transfer_lastio = server.unixtime; - if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { - serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", - (nwritten == -1) ? strerror(errno) : "short write"); - goto error; - } - server.repl_transfer_read += nread; + nread = read(fd,buf,readlen); + if (nread <= 0) { + serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", + (nread == -1) ? strerror(errno) : "connection lost"); + cancelReplicationHandshake(); + return; + } + server.stat_net_input_bytes += nread; - /* Delete the last 40 bytes from the file if we reached EOF. */ - if (usemark && eof_reached) { - if (ftruncate(server.repl_transfer_fd, - server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) - { - serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); + /* When a mark is used, we want to detect EOF asap in order to avoid + * writing the EOF mark into the file... */ + int eof_reached = 0; + + if (usemark) { + /* Update the last bytes array, and check if it matches our delimiter.*/ + if (nread >= CONFIG_RUN_ID_SIZE) { + memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); + } else { + int rem = CONFIG_RUN_ID_SIZE-nread; + memmove(lastbytes,lastbytes+nread,rem); + memcpy(lastbytes+rem,buf,nread); + } + if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; + } + + server.repl_transfer_lastio = server.unixtime; + if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { + serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", + (nwritten == -1) ? strerror(errno) : "short write"); goto error; } + server.repl_transfer_read += nread; + + /* Delete the last 40 bytes from the file if we reached EOF. */ + if (usemark && eof_reached) { + if (ftruncate(server.repl_transfer_fd, + server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) + { + serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); + goto error; + } + } + + /* Sync data on disk from time to time, otherwise at the end of the transfer + * we may suffer a big delay as the memory buffers are copied into the + * actual disk. */ + if (server.repl_transfer_read >= + server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) + { + off_t sync_size = server.repl_transfer_read - + server.repl_transfer_last_fsync_off; + rdb_fsync_range(server.repl_transfer_fd, + server.repl_transfer_last_fsync_off, sync_size); + server.repl_transfer_last_fsync_off += sync_size; + } + + /* Check if the transfer is now complete */ + if (!usemark) { + if (server.repl_transfer_read == server.repl_transfer_size) + eof_reached = 1; + } + if (!eof_reached) + return; } - /* Sync data on disk from time to time, otherwise at the end of the transfer - * we may suffer a big delay as the memory buffers are copied into the - * actual disk. */ - if (server.repl_transfer_read >= - server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) - { - off_t sync_size = server.repl_transfer_read - - server.repl_transfer_last_fsync_off; - rdb_fsync_range(server.repl_transfer_fd, - server.repl_transfer_last_fsync_off, sync_size); - server.repl_transfer_last_fsync_off += sync_size; + /* We reach here when the slave is using diskless replication, + * or when we are done reading from the socket to the rdb file. */ + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + /* We need to stop any AOFRW fork before flusing and parsing + * RDB, otherwise we'll create a copy-on-write disaster. */ + if (server.aof_state != AOF_OFF) stopAppendOnly(); + signalFlushedDb(-1); + if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* create a backup of the current db */ + diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum); + for (i=0; i REPLICA sync: Loading DB in memory"); + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (use_diskless_load) { + rio rdb; + rioInitWithFd(&rdb,fd,server.repl_transfer_size); + /* Put the socket in blocking mode to simplify RDB transfer. + * We'll restore it when the RDB is received. */ + anetBlock(NULL,fd); + anetRecvTimeout(NULL,fd,server.repl_timeout*1000); - /* Check if the transfer is now complete */ - if (!usemark) { - if (server.repl_transfer_read == server.repl_transfer_size) - eof_reached = 1; - } - - if (eof_reached) { - int aof_is_enabled = server.aof_state != AOF_OFF; - + startLoading(server.repl_transfer_size); + if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { + /* rdbloading failed */ + stopLoading(); + serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from socket"); + cancelReplicationHandshake(); + rioFreeFd(&rdb, NULL); + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* restore the backed up db */ + emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback); + for (i=0; i REPLICA synchronization: %s", - server.rdb_filename, strerror(errno)); + server.rdb_filename, strerror(errno)); cancelReplicationHandshake(); return; } - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); - /* We need to stop any AOFRW fork before flusing and parsing - * RDB, otherwise we'll create a copy-on-write disaster. */ - if(aof_is_enabled) stopAppendOnly(); - signalFlushedDb(-1); - emptyDb( - -1, - server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, - replicationEmptyDbCallback); - /* Before loading the DB into memory we need to delete the readable - * handler, otherwise it will get called recursively since - * rdbLoad() will call the event loop to process events from time to - * time for non blocking loading. */ - aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); - /* Re-enable the AOF if we disabled it earlier, in order to restore - * the original configuration. */ - if (aof_is_enabled) restartAOFAfterSYNC(); + /* Note that there's no point in restarting the AOF on sync failure, + it'll be restarted when sync succeeds or slave promoted. */ return; } - /* Final setup of the connected slave <- master link */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); - replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); - server.repl_state = REPL_STATE_CONNECTED; - server.repl_down_since = 0; - /* After a full resynchroniziation we use the replication ID and - * offset of the master. The secondary ID / offset are cleared since - * we are starting a new history. */ - memcpy(server.replid,server.master->replid,sizeof(server.replid)); - server.master_repl_offset = server.master->reploff; - clearReplicationId2(); - /* Let's create the replication backlog if needed. Slaves need to - * accumulate the backlog regardless of the fact they have sub-slaves - * or not, in order to behave correctly if they are promoted to - * masters after a failover. */ - if (server.repl_backlog == NULL) createReplicationBacklog(); - - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); - /* Restart the AOF subsystem now that we finished the sync. This - * will trigger an AOF rewrite, and when done will start appending - * to the new file. */ - if (aof_is_enabled) restartAOFAfterSYNC(); + server.repl_transfer_fd = -1; + server.repl_transfer_tmpfile = NULL; } + /* Final setup of the connected slave <- master link */ + replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); + server.repl_state = REPL_STATE_CONNECTED; + server.repl_down_since = 0; + /* After a full resynchroniziation we use the replication ID and + * offset of the master. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(server.replid,server.master->replid,sizeof(server.replid)); + server.master_repl_offset = server.master->reploff; + clearReplicationId2(); + /* Let's create the replication backlog if needed. Slaves need to + * accumulate the backlog regardless of the fact they have sub-slaves + * or not, in order to behave correctly if they are promoted to + * masters after a failover. */ + if (server.repl_backlog == NULL) createReplicationBacklog(); + + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); + /* Restart the AOF subsystem now that we finished the sync. This + * will trigger an AOF rewrite, and when done will start appending + * to the new file. */ + if (server.aof_enabled) restartAOFAfterSYNC(); return; error: @@ -1845,16 +1928,20 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Prepare a suitable temp file for bulk transfer */ - while(maxtries--) { - snprintf(tmpfile,256, - "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); - dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); - if (dfd != -1) break; - sleep(1); - } - if (dfd == -1) { - serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); - goto error; + if (!useDisklessLoad()) { + while(maxtries--) { + snprintf(tmpfile,256, + "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); + dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); + if (dfd != -1) break; + sleep(1); + } + if (dfd == -1) { + serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); + goto error; + } + server.repl_transfer_tmpfile = zstrdup(tmpfile); + server.repl_transfer_fd = dfd; } /* Setup the non blocking download of the bulk file. */ @@ -1871,15 +1958,19 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; - server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; - server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); if (dfd != -1) close(dfd); close(fd); + if (server.repl_transfer_fd != -1) + close(server.repl_transfer_fd); + if (server.repl_transfer_tmpfile) + zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; server.repl_transfer_s = -1; server.repl_state = REPL_STATE_CONNECT; return; @@ -1933,9 +2024,13 @@ void undoConnectWithMaster(void) { void replicationAbortSyncTransfer(void) { serverAssert(server.repl_state == REPL_STATE_TRANSFER); undoConnectWithMaster(); - close(server.repl_transfer_fd); - unlink(server.repl_transfer_tmpfile); - zfree(server.repl_transfer_tmpfile); + if (server.repl_transfer_fd!=-1) { + close(server.repl_transfer_fd); + unlink(server.repl_transfer_tmpfile); + zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + } } /* This function aborts a non blocking replication attempt if there is one @@ -2045,6 +2140,9 @@ void replicaofCommand(client *c) { serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); + /* Restart the AOF subsystem in case we shut it down during a sync when + * we were still a slave. */ + if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC(); } } else { long port; diff --git a/src/rio.c b/src/rio.c index c9c76b8f2..993768b56 100644 --- a/src/rio.c +++ b/src/rio.c @@ -157,6 +157,113 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } +/* ------------------- File descriptor implementation ------------------- */ + +static size_t rioFdWrite(rio *r, const void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not yet support writing. */ +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdRead(rio *r, void *buf, size_t len) { + size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos; + + /* if the buffer is too small for the entire request: realloc */ + if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len) + r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf)); + + /* if the remaining unused buffer is not large enough: memmove so that we can read the rest */ + if (len > avail && sdsavail(r->io.fd.buf) < len - avail) { + sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + r->io.fd.pos = 0; + } + + /* if we don't already have all the data in the sds, read more */ + while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) { + size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos; + size_t toread = len - buffered; + /* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */ + if (toread < PROTO_IOBUF_LEN) + toread = PROTO_IOBUF_LEN; + if (toread > sdsavail(r->io.fd.buf)) + toread = sdsavail(r->io.fd.buf); + if (r->io.fd.read_limit != 0 && + r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) { + if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered) + toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered; + else { + errno = EOVERFLOW; + return 0; + } + } + int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread); + if (retval <= 0) { + if (errno == EWOULDBLOCK) errno = ETIMEDOUT; + return 0; + } + sdsIncrLen(r->io.fd.buf, retval); + } + + memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len); + r->io.fd.read_so_far += len; + r->io.fd.pos += len; + return len; +} + +/* Returns read/write position in file. */ +static off_t rioFdTell(rio *r) { + return r->io.fd.read_so_far; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdWrite(r,NULL,0); +} + +static const rio rioFdIO = { + rioFdRead, + rioFdWrite, + rioFdTell, + rioFdFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +/* create an rio that implements a buffered read from an fd + * read_limit argument stops buffering when the reaching the limit */ +void rioInitWithFd(rio *r, int fd, size_t read_limit) { + *r = rioFdIO; + r->io.fd.fd = fd; + r->io.fd.pos = 0; + r->io.fd.read_limit = read_limit; + r->io.fd.read_so_far = 0; + r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(r->io.fd.buf); +} + +/* release the rio stream. + * optionally returns the unread buffered data. */ +void rioFreeFd(rio *r, sds* out_remainingBufferedData) { + if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) { + if (r->io.fd.pos > 0) + sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + *out_remainingBufferedData = r->io.fd.buf; + } else { + sdsfree(r->io.fd.buf); + if (out_remainingBufferedData) + *out_remainingBufferedData = NULL; + } + r->io.fd.buf = NULL; +} + /* ------------------- File descriptors set implementation ------------------- */ /* Returns 1 or 0 for success/failure. @@ -300,7 +407,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { * disk I/O concentrated in very little time. When we fsync in an explicit * way instead the I/O pressure is more distributed across time. */ void rioSetAutoSync(rio *r, off_t bytes) { - serverAssert(r->read == rioFileIO.read); + if(r->write != rioFileIO.write) return; r->io.file.autosync = bytes; } diff --git a/src/rio.h b/src/rio.h index c996c54f6..beea06888 100644 --- a/src/rio.h +++ b/src/rio.h @@ -73,6 +73,14 @@ struct _rio { off_t buffered; /* Bytes written since last fsync. */ off_t autosync; /* fsync after 'autosync' bytes written. */ } file; + /* file descriptor */ + struct { + int fd; /* File descriptor. */ + off_t pos; /* pos in buf that was returned */ + sds buf; /* buffered data */ + size_t read_limit; /* don't allow to buffer/read more than that */ + size_t read_so_far; /* amount of data read from the rio (not buffered) */ + } fd; /* Multiple FDs target (used to write to N sockets). */ struct { int *fds; /* File descriptors. */ @@ -126,9 +134,11 @@ static inline int rioFlush(rio *r) { void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); +void rioInitWithFd(rio *r, int fd, size_t read_limit); void rioInitWithFdset(rio *r, int *fds, int numfds); void rioFreeFdset(rio *r); +void rioFreeFd(rio *r, sds* out_remainingBufferedData); size_t rioWriteBulkCount(rio *r, char prefix, long count); size_t rioWriteBulkString(rio *r, const char *buf, size_t len); diff --git a/src/server.c b/src/server.c index 78b8d8f1b..8ed5b591c 100644 --- a/src/server.c +++ b/src/server.c @@ -2265,6 +2265,7 @@ void initServerConfig(void) { server.aof_flush_postponed_start = 0; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC; + server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.pidfile = NULL; @@ -2334,6 +2335,9 @@ void initServerConfig(void) { server.cached_master = NULL; server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + server.repl_transfer_s = -1; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; @@ -2342,6 +2346,7 @@ void initServerConfig(void) { server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; + server.repl_diskless_load = CONFIG_DEFAULT_REPL_DISKLESS_LOAD; server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; @@ -4053,7 +4058,7 @@ sds genRedisInfoString(char *section) { (server.aof_last_write_status == C_OK) ? "ok" : "err", server.stat_aof_cow_bytes); - if (server.aof_state != AOF_OFF) { + if (server.aof_enabled) { info = sdscatprintf(info, "aof_current_size:%lld\r\n" "aof_base_size:%lld\r\n" diff --git a/src/server.h b/src/server.h index 8686994f6..f81b1010e 100644 --- a/src/server.h +++ b/src/server.h @@ -132,6 +132,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_RDB_FILENAME "dump.rdb" #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0 #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 +#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0 #define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define CONFIG_DEFAULT_SLAVE_READ_ONLY 1 #define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1 @@ -394,6 +395,12 @@ typedef long long mstime_t; /* millisecond time type. */ #define AOF_FSYNC_EVERYSEC 2 #define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC +/* Replication diskless load defines */ +#define REPL_DISKLESS_LOAD_DISABLED 0 +#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1 +#define REPL_DISKLESS_LOAD_SWAPDB 2 +#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD REPL_DISKLESS_LOAD_DISABLED + /* Zipped structures related defaults */ #define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512 #define OBJ_HASH_MAX_ZIPLIST_VALUE 64 @@ -1158,6 +1165,7 @@ struct redisServer { int daemonize; /* True if running as a daemon */ clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT]; /* AOF persistence */ + int aof_enabled; /* AOF configuration */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ @@ -1214,6 +1222,8 @@ struct redisServer { int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ + int rdb_key_save_delay; /* Delay in microseconds between keys while + * writing the RDB. (for testings) */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ struct { @@ -1249,7 +1259,9 @@ struct redisServer { int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ - int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ + int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ + int repl_diskless_load; /* Slave parse RDB directly from the socket. + * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ char *masteruser; /* AUTH with this user and masterauth with master */ @@ -1739,7 +1751,8 @@ void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); /* Generic persistence functions */ -void startLoading(FILE *fp); +void startLoadingFile(FILE* fp, char* filename); +void startLoading(size_t size); void loadingProgress(off_t pos); void stopLoading(void); @@ -1996,6 +2009,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); +long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); +long long dbTotalServerKeyCount(); int selectDb(client *c, int id); void signalModifiedKey(redisDb *db, robj *key); diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl index 3c6df52a8..54891151b 100644 --- a/tests/integration/replication-4.tcl +++ b/tests/integration/replication-4.tcl @@ -1,12 +1,3 @@ -proc start_bg_complex_data {host port db ops} { - set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & -} - -proc stop_bg_complex_data {handle} { - catch {exec /bin/kill -9 $handle} -} - start_server {tags {"repl"}} { start_server {} { diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index bf8682446..3c98723af 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -1,12 +1,3 @@ -proc start_bg_complex_data {host port db ops} { - set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & -} - -proc stop_bg_complex_data {handle} { - catch {exec /bin/kill -9 $handle} -} - # Creates a master-slave pair and breaks the link continuously to force # partial resyncs attempts, all this while flooding the master with # write queries. @@ -17,7 +8,7 @@ proc stop_bg_complex_data {handle} { # If reconnect is > 0, the test actually try to break the connection and # reconnect with the master, otherwise just the initial synchronization is # checked for consistency. -proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless reconnect} { +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { start_server {tags {"repl"}} { start_server {} { @@ -28,8 +19,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec $master config set repl-backlog-size $backlog_size $master config set repl-backlog-ttl $backlog_ttl - $master config set repl-diskless-sync $diskless + $master config set repl-diskless-sync $mdl $master config set repl-diskless-sync-delay 1 + $slave config set repl-diskless-load $sdl set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000] set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000] @@ -54,7 +46,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec } } - test "Test replication partial resync: $descr (diskless: $diskless, reconnect: $reconnect)" { + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { # Now while the clients are writing data, break the maste-slave # link multiple times. if ($reconnect) { @@ -132,23 +124,25 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec } } -foreach diskless {no yes} { - test_psync {no reconnection, just sync} 6 1000000 3600 0 { - } $diskless 0 +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl 0 - test_psync {ok psync} 6 100000000 3600 0 { + test_psync {ok psync} 6 100000000 3600 0 { assert {[s -1 sync_partial_ok] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {no backlog} 6 100 3600 0.5 { + test_psync {no backlog} 6 100 3600 0.5 { assert {[s -1 sync_partial_err] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {ok after delay} 3 100000000 3600 3 { + test_psync {ok after delay} 3 100000000 3600 3 { assert {[s -1 sync_partial_ok] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {backlog expired} 3 100000000 1 3 { + test_psync {backlog expired} 3 100000000 1 3 { assert {[s -1 sync_partial_err] > 0} - } $diskless 1 + } $mdl $sdl 1 + } } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0e50c20a9..d69a1761a 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -183,85 +183,92 @@ start_server {tags {"repl"}} { } } -foreach dl {no yes} { - start_server {tags {"repl"}} { - set master [srv 0 client] - $master config set repl-diskless-sync $dl - set master_host [srv 0 host] - set master_port [srv 0 port] - set slaves {} - set load_handle0 [start_write_load $master_host $master_port 3] - set load_handle1 [start_write_load $master_host $master_port 5] - set load_handle2 [start_write_load $master_host $master_port 20] - set load_handle3 [start_write_load $master_host $master_port 8] - set load_handle4 [start_write_load $master_host $master_port 4] - start_server {} { - lappend slaves [srv 0 client] +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + start_server {tags {"repl"}} { + set master [srv 0 client] + $master config set repl-diskless-sync $mdl + $master config set repl-diskless-sync-delay 1 + set master_host [srv 0 host] + set master_port [srv 0 port] + set slaves {} + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] + set load_handle3 [start_write_load $master_host $master_port 8] + set load_handle4 [start_write_load $master_host $master_port 4] + after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork start_server {} { lappend slaves [srv 0 client] start_server {} { lappend slaves [srv 0 client] - test "Connect multiple replicas at the same time (issue #141), diskless=$dl" { - # Send SLAVEOF commands to slaves - [lindex $slaves 0] slaveof $master_host $master_port - [lindex $slaves 1] slaveof $master_host $master_port - [lindex $slaves 2] slaveof $master_host $master_port + start_server {} { + lappend slaves [srv 0 client] + test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" { + # Send SLAVEOF commands to slaves + [lindex $slaves 0] config set repl-diskless-load $sdl + [lindex $slaves 1] config set repl-diskless-load $sdl + [lindex $slaves 2] config set repl-diskless-load $sdl + [lindex $slaves 0] slaveof $master_host $master_port + [lindex $slaves 1] slaveof $master_host $master_port + [lindex $slaves 2] slaveof $master_host $master_port - # Wait for all the three slaves to reach the "online" - # state from the POV of the master. - set retry 500 - while {$retry} { - set info [r -3 info] - if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { - break - } else { - incr retry -1 - after 100 + # Wait for all the three slaves to reach the "online" + # state from the POV of the master. + set retry 500 + while {$retry} { + set info [r -3 info] + if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slaves not correctly synchronized" } - } - if {$retry == 0} { - error "assertion:Replicas not correctly synchronized" - } - # Wait that slaves acknowledge they are online so - # we are sure that DBSIZE and DEBUG DIGEST will not - # fail because of timing issues. - wait_for_condition 500 100 { - [lindex [[lindex $slaves 0] role] 3] eq {connected} && - [lindex [[lindex $slaves 1] role] 3] eq {connected} && - [lindex [[lindex $slaves 2] role] 3] eq {connected} - } else { - fail "Replicas still not connected after some time" + # Wait that slaves acknowledge they are online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. + wait_for_condition 500 100 { + [lindex [[lindex $slaves 0] role] 3] eq {connected} && + [lindex [[lindex $slaves 1] role] 3] eq {connected} && + [lindex [[lindex $slaves 2] role] 3] eq {connected} + } else { + fail "Slaves still not connected after some time" + } + + # Stop the write load + stop_bg_complex_data $load_handle0 + stop_bg_complex_data $load_handle1 + stop_bg_complex_data $load_handle2 + stop_write_load $load_handle3 + stop_write_load $load_handle4 + + # Make sure that slaves and master have same + # number of keys + wait_for_condition 500 100 { + [$master dbsize] == [[lindex $slaves 0] dbsize] && + [$master dbsize] == [[lindex $slaves 1] dbsize] && + [$master dbsize] == [[lindex $slaves 2] dbsize] + } else { + fail "Different number of keys between master and replica after too long time." + } + + # Check digests + set digest [$master debug digest] + set digest0 [[lindex $slaves 0] debug digest] + set digest1 [[lindex $slaves 1] debug digest] + set digest2 [[lindex $slaves 2] debug digest] + assert {$digest ne 0000000000000000000000000000000000000000} + assert {$digest eq $digest0} + assert {$digest eq $digest1} + assert {$digest eq $digest2} } - - # Stop the write load - stop_write_load $load_handle0 - stop_write_load $load_handle1 - stop_write_load $load_handle2 - stop_write_load $load_handle3 - stop_write_load $load_handle4 - - # Make sure that slaves and master have same - # number of keys - wait_for_condition 500 100 { - [$master dbsize] == [[lindex $slaves 0] dbsize] && - [$master dbsize] == [[lindex $slaves 1] dbsize] && - [$master dbsize] == [[lindex $slaves 2] dbsize] - } else { - fail "Different number of keys between masted and replica after too long time." - } - - # Check digests - set digest [$master debug digest] - set digest0 [[lindex $slaves 0] debug digest] - set digest1 [[lindex $slaves 1] debug digest] - set digest2 [[lindex $slaves 2] debug digest] - assert {$digest ne 0000000000000000000000000000000000000000} - assert {$digest eq $digest0} - assert {$digest eq $digest1} - assert {$digest eq $digest2} - } - } + } + } } } } @@ -309,3 +316,70 @@ start_server {tags {"repl"}} { } } } + +test {slave fails full sync and diskless load swapdb recoveres it} { + start_server {tags {"repl"}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + set slave_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Put different data sets on the master and slave + # we need to put large keys on the master since the slave replies to info only once in 2mb + $slave debug populate 2000 slave 10 + $master debug populate 200 master 100000 + $master config set rdbcompression no + + # Set master and slave to use diskless replication + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 0 + $slave config set repl-diskless-load swapdb + + # Set master with a slow rdb generation, so that we can easily disconnect it mid sync + # 10ms per key, with 200 keys is 2 seconds + $master config set rdb-key-save-delay 10000 + + # Start the replication process... + $slave slaveof $master_host $master_port + + # wait for the slave to start reading the rdb + wait_for_condition 50 100 { + [s -1 loading] eq 1 + } else { + fail "Replica didn't get into loading mode" + } + + # make sure that next sync will not start immediately so that we can catch the slave in betweeen syncs + $master config set repl-diskless-sync-delay 5 + # for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one) + $master config set rdb-key-save-delay 0 + + # waiting slave to do flushdb (key count drop) + wait_for_condition 50 100 { + 2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d] + } else { + fail "Replica didn't flush" + } + + # make sure we're still loading + assert_equal [s -1 loading] 1 + + # kill the slave connection on the master + set killed [$master client kill type slave] + + # wait for loading to stop (fail) + wait_for_condition 50 100 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + + # make sure the original keys were restored + assert_equal [$slave dbsize] 2000 + } + } +} diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 74f491e48..41cc5612a 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -399,3 +399,15 @@ proc lshuffle {list} { } return $slist } + +# Execute a background process writing complex data for the specified number +# of ops to the specified Redis instance. +proc start_bg_complex_data {host port db ops} { + set tclsh [info nameofexecutable] + exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & +} + +# Stop a process generating write load executed with start_bg_complex_data. +proc stop_bg_complex_data {handle} { + catch {exec /bin/kill -9 $handle} +}