diff --git a/src/aof.c b/src/aof.c index e7537742d..12bd74376 100644 --- a/src/aof.c +++ b/src/aof.c @@ -920,12 +920,12 @@ int aofFsyncInProgress(void) { /* Starts a background task that performs fsync() against the specified * file descriptor (the one of the AOF file) in another thread. */ void aof_background_fsync(int fd) { - bioCreateFsyncJob(fd); + bioCreateFsyncJob(fd, server.master_repl_offset, 1); } /* Close the fd on the basis of aof_background_fsync. */ void aof_background_fsync_and_close(int fd) { - bioCreateCloseJob(fd, 1, 1); + bioCreateCloseAofJob(fd, server.master_repl_offset, 1); } /* Kills an AOFRW child process if exists */ @@ -962,6 +962,8 @@ void stopAppendOnly(void) { server.aof_state = AOF_OFF; server.aof_rewrite_scheduled = 0; server.aof_last_incr_size = 0; + server.fsynced_reploff = -1; + atomicSet(server.fsynced_reploff_pending, 0); killAppendOnlyChild(); sdsfree(server.aof_buf); server.aof_buf = sdsempty(); @@ -972,6 +974,18 @@ void stopAppendOnly(void) { int startAppendOnly(void) { serverAssert(server.aof_state == AOF_OFF); + /* Wait for all bio jobs related to AOF to drain. This prevents a race + * between updates to `fsynced_reploff_pending` of the worker thread, belonging + * to the previous AOF, and the new one. This concern is specific for a full + * sync scenario where we don't wanna risk the ACKed replication offset + * jumping backwards or forward when switching to a different master. */ + bioDrainWorker(BIO_AOF_FSYNC); + + /* Set the initial repl_offset, which will be applied to fsynced_reploff + * when AOFRW finishes (after possibly being updated by a bio thread) */ + atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); + server.fsynced_reploff = 0; + server.aof_state = AOF_WAIT_REWRITE; if (hasActiveChildProcess() && server.child_type != CHILD_TYPE_AOF) { server.aof_rewrite_scheduled = 1; @@ -1241,6 +1255,7 @@ try_fsync: latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_fsync_offset = server.aof_current_size; server.aof_last_fsync = server.unixtime; + atomicSet(server.fsynced_reploff_pending, server.master_repl_offset); } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) { @@ -2669,9 +2684,17 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { serverLog(LL_NOTICE, "Background AOF rewrite finished successfully"); /* Change state from WAIT_REWRITE to ON if needed */ - if (server.aof_state == AOF_WAIT_REWRITE) + if (server.aof_state == AOF_WAIT_REWRITE) { server.aof_state = AOF_ON; + /* Update the fsynced replication offset that just now become valid. + * This could either be the one we took in startAppendOnly, or a + * newer one set by the bio thread. */ + long long fsynced_reploff_pending; + atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending); + server.fsynced_reploff = fsynced_reploff_pending; + } + serverLog(LL_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { diff --git a/src/bio.c b/src/bio.c index 7eb43a3a3..640fbb024 100644 --- a/src/bio.c +++ b/src/bio.c @@ -15,13 +15,15 @@ * DESIGN * ------ * - * The design is trivial, we have a structure representing a job to perform - * and a different thread and job queue for every job type. - * Every thread waits for new jobs in its queue, and process every job + * The design is simple: We have a structure representing a job to perform, + * and several worker threads and job queues. Every job type is assigned to + * a specific worker thread, and a single worker may handle several different + * job types. + * Every thread waits for new jobs in its queue, and processes every job * sequentially. * - * Jobs of the same type are guaranteed to be processed from the least - * recently inserted to the most recently inserted (older jobs processed + * Jobs handled by the same worker are guaranteed to be processed from the + * least-recently-inserted to the most-recently-inserted (older jobs processed * first). * * Currently there is no way for the creator of the job to be notified about @@ -61,17 +63,39 @@ #include "server.h" #include "bio.h" -static pthread_t bio_threads[BIO_NUM_OPS]; -static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; -static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; -static list *bio_jobs[BIO_NUM_OPS]; +static char* bio_worker_title[] = { + "bio_close_file", + "bio_aof", + "bio_lazy_free", +}; + +#define BIO_WORKER_NUM (sizeof(bio_worker_title) / sizeof(*bio_worker_title)) + +static unsigned int bio_job_to_worker[] = { + [BIO_CLOSE_FILE] = 0, + [BIO_AOF_FSYNC] = 1, + [BIO_CLOSE_AOF] = 1, + [BIO_LAZY_FREE] = 2, +}; + +static pthread_t bio_threads[BIO_WORKER_NUM]; +static pthread_mutex_t bio_mutex[BIO_WORKER_NUM]; +static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM]; +static list *bio_jobs[BIO_WORKER_NUM]; +static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0}; /* This structure represents a background Job. It is only used locally to this * file as the API does not expose the internals at all. */ typedef union bio_job { + struct { + int type; /* Job-type tag. This needs to appear as the first element in all union members. */ + } header; + /* Job specific arguments.*/ struct { + int type; int fd; /* Fd for file based background jobs */ + long long offset; /* A job-specific offset, if applicable */ unsigned need_fsync:1; /* A flag to indicate that a fsync is required before * the file is closed. */ unsigned need_reclaim_cache:1; /* A flag to indicate that reclaim cache is required before @@ -79,6 +103,7 @@ typedef union bio_job { } fd_args; struct { + int type; lazy_free_fn *free_fn; /* Function that will free the provided arguments */ void *free_args[]; /* List of arguments to be passed to the free function */ } free_args; @@ -95,10 +120,10 @@ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; - int j; + unsigned long j; /* Initialization of state vars and objects */ - for (j = 0; j < BIO_NUM_OPS; j++) { + for (j = 0; j < BIO_WORKER_NUM; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); bio_jobs[j] = listCreate(); @@ -113,8 +138,8 @@ void bioInit(void) { /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is - * responsible of. */ - for (j = 0; j < BIO_NUM_OPS; j++) { + * responsible for. */ + for (j = 0; j < BIO_WORKER_NUM; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); @@ -125,10 +150,13 @@ void bioInit(void) { } void bioSubmitJob(int type, bio_job *job) { - pthread_mutex_lock(&bio_mutex[type]); - listAddNodeTail(bio_jobs[type],job); - pthread_cond_signal(&bio_newjob_cond[type]); - pthread_mutex_unlock(&bio_mutex[type]); + job->header.type = type; + unsigned long worker = bio_job_to_worker[type]; + pthread_mutex_lock(&bio_mutex[worker]); + listAddNodeTail(bio_jobs[worker],job); + bio_jobs_counter[type]++; + pthread_cond_signal(&bio_newjob_cond[worker]); + pthread_mutex_unlock(&bio_mutex[worker]); } void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { @@ -155,42 +183,40 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) { bioSubmitJob(BIO_CLOSE_FILE, job); } -void bioCreateFsyncJob(int fd) { +void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache) { bio_job *job = zmalloc(sizeof(*job)); job->fd_args.fd = fd; + job->fd_args.offset = offset; + job->fd_args.need_fsync = 1; + job->fd_args.need_reclaim_cache = need_reclaim_cache; + + bioSubmitJob(BIO_CLOSE_AOF, job); +} + +void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache) { + bio_job *job = zmalloc(sizeof(*job)); + job->fd_args.fd = fd; + job->fd_args.offset = offset; + job->fd_args.need_reclaim_cache = need_reclaim_cache; bioSubmitJob(BIO_AOF_FSYNC, job); } void *bioProcessBackgroundJobs(void *arg) { bio_job *job; - unsigned long type = (unsigned long) arg; + unsigned long worker = (unsigned long) arg; sigset_t sigset; - /* Check that the type is within the right interval. */ - if (type >= BIO_NUM_OPS) { - serverLog(LL_WARNING, - "Warning: bio thread started with wrong type %lu",type); - return NULL; - } + /* Check that the worker is within the right interval. */ + serverAssert(worker < BIO_WORKER_NUM); - switch (type) { - case BIO_CLOSE_FILE: - redis_set_thread_title("bio_close_file"); - break; - case BIO_AOF_FSYNC: - redis_set_thread_title("bio_aof_fsync"); - break; - case BIO_LAZY_FREE: - redis_set_thread_title("bio_lazy_free"); - break; - } + redis_set_thread_title(bio_worker_title[worker]); redisSetCpuAffinity(server.bio_cpulist); makeThreadKillable(); - pthread_mutex_lock(&bio_mutex[type]); + pthread_mutex_lock(&bio_mutex[worker]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); @@ -203,21 +229,26 @@ void *bioProcessBackgroundJobs(void *arg) { listNode *ln; /* The loop always starts with the lock hold. */ - if (listLength(bio_jobs[type]) == 0) { - pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); + if (listLength(bio_jobs[worker]) == 0) { + pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]); continue; } - /* Pop the job from the queue. */ - ln = listFirst(bio_jobs[type]); + /* Get the job from the queue. */ + ln = listFirst(bio_jobs[worker]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ - pthread_mutex_unlock(&bio_mutex[type]); + pthread_mutex_unlock(&bio_mutex[worker]); /* Process the job accordingly to its type. */ - if (type == BIO_CLOSE_FILE) { - if (job->fd_args.need_fsync) { - redis_fsync(job->fd_args.fd); + int job_type = job->header.type; + + if (job_type == BIO_CLOSE_FILE) { + if (job->fd_args.need_fsync && + redis_fsync(job->fd_args.fd) == -1 && + errno != EBADF && errno != EINVAL) + { + serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno)); } if (job->fd_args.need_reclaim_cache) { if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) { @@ -225,7 +256,7 @@ void *bioProcessBackgroundJobs(void *arg) { } } close(job->fd_args.fd); - } else if (type == BIO_AOF_FSYNC) { + } else if (job_type == BIO_AOF_FSYNC || job_type == BIO_CLOSE_AOF) { /* The fd may be closed by main thread and reused for another * socket, pipe, or file. We just ignore these errno because * aof fsync did not really fail. */ @@ -242,8 +273,17 @@ void *bioProcessBackgroundJobs(void *arg) { } } else { atomicSet(server.aof_bio_fsync_status,C_OK); + atomicSet(server.fsynced_reploff_pending, job->fd_args.offset); } - } else if (type == BIO_LAZY_FREE) { + + if (job->fd_args.need_reclaim_cache) { + if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) { + serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno)); + } + } + if (job_type == BIO_CLOSE_AOF) + close(job->fd_args.fd); + } else if (job_type == BIO_LAZY_FREE) { job->free_args.free_fn(job->free_args.free_args); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); @@ -252,37 +292,53 @@ void *bioProcessBackgroundJobs(void *arg) { /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ - pthread_mutex_lock(&bio_mutex[type]); - listDelNode(bio_jobs[type],ln); + pthread_mutex_lock(&bio_mutex[worker]); + listDelNode(bio_jobs[worker], ln); + bio_jobs_counter[job_type]--; + pthread_cond_signal(&bio_newjob_cond[worker]); } } /* Return the number of pending jobs of the specified type. */ unsigned long bioPendingJobsOfType(int type) { - unsigned long long val; - pthread_mutex_lock(&bio_mutex[type]); - val = listLength(bio_jobs[type]); - pthread_mutex_unlock(&bio_mutex[type]); + unsigned int worker = bio_job_to_worker[type]; + + pthread_mutex_lock(&bio_mutex[worker]); + unsigned long val = bio_jobs_counter[type]; + pthread_mutex_unlock(&bio_mutex[worker]); + return val; } +/* Wait for the job queue of the worker for jobs of specified type to become empty. */ +void bioDrainWorker(int job_type) { + unsigned long worker = bio_job_to_worker[job_type]; + + pthread_mutex_lock(&bio_mutex[worker]); + while (listLength(bio_jobs[worker]) > 0) { + pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]); + } + pthread_mutex_unlock(&bio_mutex[worker]); +} + /* Kill the running bio threads in an unclean way. This function should be * used only when it's critical to stop the threads for some reason. * Currently Redis does this only on crash (for instance on SIGSEGV) in order * to perform a fast memory check without other threads messing with memory. */ void bioKillThreads(void) { - int err, j; + int err; + unsigned long j; - for (j = 0; j < BIO_NUM_OPS; j++) { + for (j = 0; j < BIO_WORKER_NUM; j++) { if (bio_threads[j] == pthread_self()) continue; if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) { if ((err = pthread_join(bio_threads[j],NULL)) != 0) { serverLog(LL_WARNING, - "Bio thread for job type #%d can not be joined: %s", + "Bio worker thread #%lu can not be joined: %s", j, strerror(err)); } else { serverLog(LL_WARNING, - "Bio thread for job type #%d terminated",j); + "Bio worker thread #%lu terminated",j); } } } diff --git a/src/bio.h b/src/bio.h index 52c80fdb6..0d1fe9b4b 100644 --- a/src/bio.h +++ b/src/bio.h @@ -35,15 +35,20 @@ typedef void lazy_free_fn(void *args[]); /* Exported API */ void bioInit(void); unsigned long bioPendingJobsOfType(int type); +void bioDrainWorker(int job_type); void bioKillThreads(void); void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache); -void bioCreateFsyncJob(int fd); +void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache); +void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache); void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); /* Background job opcodes */ -#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */ -#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */ -#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */ -#define BIO_NUM_OPS 3 +enum { + BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */ + BIO_AOF_FSYNC, /* Deferred AOF fsync. */ + BIO_LAZY_FREE, /* Deferred objects freeing. */ + BIO_CLOSE_AOF, /* Deferred close for AOF files. */ + BIO_NUM_OPS +}; #endif diff --git a/src/blocked.c b/src/blocked.c index 753442c2b..1f5fba01b 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -177,7 +177,7 @@ void unblockClient(client *c) { c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { unblockClientWaitingData(c); - } else if (c->bstate.btype == BLOCKED_WAIT) { + } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF) { unblockClientWaitingReplicas(c); } else if (c->bstate.btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); @@ -225,6 +225,10 @@ void replyToBlockedClientTimedOut(client *c) { updateStatsOnUnblock(c, 0, 0, 0); } else if (c->bstate.btype == BLOCKED_WAIT) { addReplyLongLong(c,replicationCountAcksByOffset(c->bstate.reploffset)); + } else if (c->bstate.btype == BLOCKED_WAITAOF) { + addReplyArrayLen(c,2); + addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset); + addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset)); } else if (c->bstate.btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c); } else { @@ -583,6 +587,16 @@ void blockForReplication(client *c, mstime_t timeout, long long offset, long num blockClient(c,BLOCKED_WAIT); } +/* block a client due to waitaof command */ +void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) { + c->bstate.timeout = timeout; + c->bstate.reploffset = offset; + c->bstate.numreplicas = numreplicas; + c->bstate.numlocal = numlocal; + listAddNodeHead(server.clients_waiting_acks,c); + blockClient(c,BLOCKED_WAITAOF); +} + /* Postpone client from executing a command. For example the server might be busy * requesting to avoid processing clients commands which will be processed later * when the it is ready to accept them. */ diff --git a/src/commands.c b/src/commands.c index 5eb422d35..7e2fe6f4e 100644 --- a/src/commands.c +++ b/src/commands.c @@ -1837,6 +1837,26 @@ struct redisCommandArg WAIT_Args[] = { {0} }; +/********** WAITAOF ********************/ + +/* WAITAOF history */ +#define WAITAOF_History NULL + +/* WAITAOF tips */ +const char *WAITAOF_tips[] = { +"request_policy:all_shards", +"response_policy:agg_min", +NULL +}; + +/* WAITAOF argument table */ +struct redisCommandArg WAITAOF_Args[] = { +{"numlocal",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, +{"numreplicas",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, +{"timeout",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE}, +{0} +}; + /********** GEOADD ********************/ /* GEOADD history */ @@ -7244,6 +7264,7 @@ struct redisCommand redisCommandTable[] = { {"type","Determine the type stored at key","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,TYPE_History,TYPE_tips,typeCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,{{NULL,CMD_KEY_RO,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=TYPE_Args}, {"unlink","Delete a key asynchronously in another thread. Otherwise it is just as DEL, but non blocking.","O(1) for each key removed regardless of its size. Then the command does O(N) work in a different thread in order to reclaim memory, where N is the number of allocations the deleted objects where composed of.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,UNLINK_History,UNLINK_tips,unlinkCommand,-2,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE,{{NULL,CMD_KEY_RM|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=UNLINK_Args}, {"wait","Wait for the synchronous replication of all the write commands sent in the context of the current connection","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,WAIT_History,WAIT_tips,waitCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_CONNECTION,.args=WAIT_Args}, +{"waitaof","Wait for all write commands sent in the context of the current connection to be synced to AOF of local host and/or replicas","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,WAITAOF_History,WAITAOF_tips,waitaofCommand,4,CMD_NOSCRIPT,ACL_CATEGORY_CONNECTION,.args=WAITAOF_Args}, /* geo */ {"geoadd","Add one or more geospatial items in the geospatial index represented using a sorted set","O(log(N)) for each item added, where N is the number of elements in the sorted set.","3.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GEO,GEOADD_History,GEOADD_tips,geoaddCommand,-5,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_GEO,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=GEOADD_Args}, {"geodist","Returns the distance between two members of a geospatial index","O(log(N))","3.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GEO,GEODIST_History,GEODIST_tips,geodistCommand,-4,CMD_READONLY,ACL_CATEGORY_GEO,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=GEODIST_Args}, diff --git a/src/commands/waitaof.json b/src/commands/waitaof.json new file mode 100644 index 000000000..bea18cae9 --- /dev/null +++ b/src/commands/waitaof.json @@ -0,0 +1,52 @@ +{ + "WAITAOF": { + "summary": "Wait for all write commands sent in the context of the current connection to be synced to AOF of local host and/or replicas", + "complexity": "O(1)", + "group": "generic", + "since": "7.2.0", + "arity": 4, + "function": "waitaofCommand", + "command_flags": [ + "NOSCRIPT" + ], + "acl_categories": [ + "CONNECTION" + ], + "command_tips": [ + "REQUEST_POLICY:ALL_SHARDS", + "RESPONSE_POLICY:AGG_MIN" + ], + "reply_schema": { + "type": "array", + "description": "Number of local and remote AOF files in sync.", + "minItems": 2, + "maxItems": 2, + "items": [ + { + "description": "Number of local AOF files.", + "type": "integer", + "minimum": 0 + }, + { + "description": "Number of replica AOF files.", + "type": "number", + "minimum": 0 + } + ] + }, + "arguments": [ + { + "name": "numlocal", + "type": "integer" + }, + { + "name": "numreplicas", + "type": "integer" + }, + { + "name": "timeout", + "type": "integer" + } + ] + } +} diff --git a/src/config.c b/src/config.c index bd62ca3db..94fc039e2 100644 --- a/src/config.c +++ b/src/config.c @@ -31,6 +31,7 @@ #include "server.h" #include "cluster.h" #include "connection.h" +#include "bio.h" #include #include @@ -2558,6 +2559,17 @@ int updateRequirePass(const char **err) { return 1; } +int updateAppendFsync(const char **err) { + UNUSED(err); + if (server.aof_fsync == AOF_FSYNC_ALWAYS) { + /* Wait for all bio jobs related to AOF to drain before proceeding. This prevents a race + * between updates to `fsynced_reploff_pending` done in the main thread and those done on the + * worker thread. */ + bioDrainWorker(BIO_AOF_FSYNC); + } + return 1; +} + /* applyBind affects both TCP and TLS (if enabled) together */ static int applyBind(const char **err) { connListener *tcp_listener = listenerByType(CONN_TYPE_SOCKET); @@ -3098,7 +3110,7 @@ standardConfig static_configs[] = { createEnumConfig("repl-diskless-load", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG | DENY_LOADING_CONFIG, repl_diskless_load_enum, server.repl_diskless_load, REPL_DISKLESS_LOAD_DISABLED, NULL, NULL), createEnumConfig("loglevel", NULL, MODIFIABLE_CONFIG, loglevel_enum, server.verbosity, LL_NOTICE, NULL, NULL), createEnumConfig("maxmemory-policy", NULL, MODIFIABLE_CONFIG, maxmemory_policy_enum, server.maxmemory_policy, MAXMEMORY_NO_EVICTION, NULL, NULL), - createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, server.aof_fsync, AOF_FSYNC_EVERYSEC, NULL, NULL), + createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, server.aof_fsync, AOF_FSYNC_EVERYSEC, NULL, updateAppendFsync), createEnumConfig("oom-score-adj", NULL, MODIFIABLE_CONFIG, oom_score_adj_enum, server.oom_score_adj, OOM_SCORE_ADJ_NO, NULL, updateOOMScoreAdj), createEnumConfig("acl-pubsub-default", NULL, MODIFIABLE_CONFIG, acl_pubsub_default_enum, server.acl_pubsub_default, 0, NULL, NULL), createEnumConfig("sanitize-dump-payload", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, sanitize_dump_payload_enum, server.sanitize_dump_payload, SANITIZE_DUMP_NO, NULL, NULL), diff --git a/src/networking.c b/src/networking.c index 634c1fa89..bc5d8128f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -180,6 +180,7 @@ client *createClient(connection *conn) { c->repl_applied = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; + c->repl_aof_off = 0; c->repl_last_partial_write = 0; c->slave_listening_port = 0; c->slave_addr = NULL; diff --git a/src/replication.c b/src/replication.c index 5aef23698..14f32a869 100644 --- a/src/replication.c +++ b/src/replication.c @@ -451,7 +451,13 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ - if (server.repl_backlog == NULL && listLength(slaves) == 0) return; + if (server.repl_backlog == NULL && listLength(slaves) == 0) { + /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs + * even when there's no replication active. This code will not be reached if AOF + * is also disabled. */ + server.master_repl_offset += 1; + return; + } /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); @@ -1140,11 +1146,12 @@ void syncCommand(client *c) { * eof: supports EOF-style RDB transfer for diskless replication. * psync2: supports PSYNC v2, so understands +CONTINUE . * - * - ack + * - ack [fack ] * Replica informs the master the amount of replication stream that it - * processed so far. + * processed so far, and optionally the replication offset fsynced to the AOF file. + * This special pattern doesn't reply to the caller. * - * - getack + * - getack * Unlike other subcommands, this is used by master to get the replication * offset from a replica. * @@ -1201,6 +1208,12 @@ void replconfCommand(client *c) { return; if (offset > c->repl_ack_off) c->repl_ack_off = offset; + if (c->argc > j+3 && !strcasecmp(c->argv[j+2]->ptr,"fack")) { + if ((getLongLongFromObject(c->argv[j+3], &offset) != C_OK)) + return; + if (offset > c->repl_aof_off) + c->repl_aof_off = offset; + } c->repl_ack_time = server.unixtime; /* If this was a diskless replication, we need to really put * the slave online when the first ACK is received (which @@ -3248,11 +3261,16 @@ void replicationSendAck(void) { client *c = server.master; if (c != NULL) { + int send_fack = server.fsynced_reploff != -1; c->flags |= CLIENT_MASTER_FORCE_REPLY; - addReplyArrayLen(c,3); + addReplyArrayLen(c,send_fack ? 5 : 3); addReplyBulkCString(c,"REPLCONF"); addReplyBulkCString(c,"ACK"); addReplyBulkLongLong(c,c->reploff); + if (send_fack) { + addReplyBulkCString(c,"FACK"); + addReplyBulkLongLong(c,server.fsynced_reploff); + } c->flags &= ~CLIENT_MASTER_FORCE_REPLY; } } @@ -3487,6 +3505,23 @@ int replicationCountAcksByOffset(long long offset) { return count; } +/* Return the number of replicas that already acknowledged the specified + * replication offset being AOF fsynced. */ +int replicationCountAOFAcksByOffset(long long offset) { + listIter li; + listNode *ln; + int count = 0; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + + if (slave->replstate != SLAVE_STATE_ONLINE) continue; + if (slave->repl_aof_off >= offset) count++; + } + return count; +} + /* WAIT for N replicas to acknowledge the processing of our latest * write command (and all the previous commands). */ void waitCommand(client *c) { @@ -3507,7 +3542,7 @@ void waitCommand(client *c) { /* First try without blocking at all. */ ackreplicas = replicationCountAcksByOffset(c->woff); - if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) { + if (ackreplicas >= numreplicas || c->flags & CLIENT_DENY_BLOCKING) { addReplyLongLong(c,ackreplicas); return; } @@ -3521,6 +3556,48 @@ void waitCommand(client *c) { replicationRequestAckFromSlaves(); } +/* WAIT for N replicas and / or local master to acknowledge our latest + * write command got synced to the disk. */ +void waitaofCommand(client *c) { + mstime_t timeout; + long numreplicas, numlocal, ackreplicas, acklocal; + + /* Argument parsing. */ + if (getRangeLongFromObjectOrReply(c,c->argv[1],0,1,&numlocal,NULL) != C_OK) + return; + if (getPositiveLongFromObjectOrReply(c,c->argv[2],&numreplicas,NULL) != C_OK) + return; + if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_MILLISECONDS) != C_OK) + return; + + if (server.masterhost) { + addReplyError(c,"WAITAOF cannot be used with replica instances. Please also note that writes to replicas are just local and are not propagated."); + return; + } + if (numlocal && !server.aof_enabled) { + addReplyError(c,"WAITAOF cannot be used when appendonly is disabled"); + return; + } + + /* First try without blocking at all. */ + ackreplicas = replicationCountAOFAcksByOffset(c->woff); + acklocal = server.fsynced_reploff >= c->woff; + if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flags & CLIENT_DENY_BLOCKING) { + addReplyArrayLen(c,2); + addReplyLongLong(c,acklocal); + addReplyLongLong(c,ackreplicas); + return; + } + + /* Otherwise block the client and put it into our list of clients + * waiting for ack from slaves. */ + blockForAofFsync(c,timeout,c->woff,numlocal,numreplicas); + + /* Make sure that the server will send an ACK request to all the slaves + * before returning to the event loop. */ + replicationRequestAckFromSlaves(); +} + /* This is called by unblockClient() to perform the blocking op type * specific cleanup. We just remove the client from the list of clients * waiting for replica acks. Never call it directly, call unblockClient() @@ -3531,8 +3608,8 @@ void unblockClientWaitingReplicas(client *c) { listDelNode(server.clients_waiting_acks,ln); } -/* Check if there are clients blocked in WAIT that can be unblocked since - * we received enough ACKs from slaves. */ +/* Check if there are clients blocked in WAIT or WAITAOF that can be unblocked + * since we received enough ACKs from slaves. */ void processClientsWaitingReplicas(void) { long long last_offset = 0; int last_numreplicas = 0; @@ -3543,6 +3620,13 @@ void processClientsWaitingReplicas(void) { listRewind(server.clients_waiting_acks,&li); while((ln = listNext(&li))) { client *c = ln->value; + int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF; + + if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) { + unblockClient(c); + addReplyError(c,"WAITAOF cannot be used when appendonly is disabled"); + return; + } /* Every time we find a client that is satisfied for a given * offset and number of replicas, we remember it so the next client @@ -3555,13 +3639,28 @@ void processClientsWaitingReplicas(void) { addReplyLongLong(c,last_numreplicas); unblockClient(c); } else { - int numreplicas = replicationCountAcksByOffset(c->bstate.reploffset); + int numreplicas = is_wait_aof ? + replicationCountAOFAcksByOffset(c->bstate.reploffset) : + replicationCountAcksByOffset(c->bstate.reploffset); if (numreplicas >= c->bstate.numreplicas) { last_offset = c->bstate.reploffset; last_numreplicas = numreplicas; + + /* Check if the local constraint of WAITAOF is served */ + int numlocal = server.fsynced_reploff >= c->bstate.reploffset; + if (is_wait_aof && numlocal < c->bstate.numlocal) + continue; + + if (is_wait_aof) { + /* WAITAOF has an array reply*/ + addReplyArrayLen(c,2); + addReplyLongLong(c,numlocal); + addReplyLongLong(c,numreplicas); + } else { + addReplyLongLong(c,numreplicas); + } /* Reply before unblocking, because unblock client calls reqresAppendResponse */ - addReplyLongLong(c,numreplicas); unblockClient(c); } } diff --git a/src/server.c b/src/server.c index b42f22616..47988dec9 100644 --- a/src/server.c +++ b/src/server.c @@ -1642,7 +1642,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Unblock all the clients blocked for synchronous replication - * in WAIT. */ + * in WAIT or WAITAOF. */ if (listLength(server.clients_waiting_acks)) processClientsWaitingReplicas(); @@ -1702,6 +1702,15 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) flushAppendOnlyFile(0); + /* Update the fsynced replica offset. + * If an initial rewrite is in progress then not all data is guaranteed to have actually been + * persisted to disk yet, so we cannot update the field. We will wait for the rewrite to complete. */ + if (server.aof_state == AOF_ON && server.fsynced_reploff != -1) { + long long fsynced_reploff_pending; + atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending); + server.fsynced_reploff = fsynced_reploff_pending; + } + /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); @@ -2030,6 +2039,7 @@ void initServerConfig(void) { server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.master_repl_offset = 0; + server.fsynced_reploff_pending = 0; /* Replication partial resync backlog */ server.repl_backlog = NULL; @@ -2508,6 +2518,7 @@ void initServer(void) { /* Initialization after setting defaults from the config system. */ server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF; + server.fsynced_reploff = server.aof_enabled ? 0 : -1; server.hz = server.config_hz; server.pid = getpid(); server.in_fork_child = CHILD_TYPE_NONE; @@ -3468,6 +3479,7 @@ void call(client *c, int flags) { /* Call the command. */ dirty = server.dirty; + long long old_master_repl_offset = server.master_repl_offset; incrCommandStatsOnError(NULL, 0); const long long call_timer = ustime(); @@ -3636,6 +3648,11 @@ void call(client *c, int flags) { /* Do some maintenance job and cleanup */ afterCommand(c); + /* Remember the replication offset of the client, right after its last + * command that resulted in propagation. */ + if (old_master_repl_offset != server.master_repl_offset) + c->woff = server.master_repl_offset; + /* Client pause takes effect after a transaction has finished. This needs * to be located after everything is propagated. */ if (!server.in_exec && server.client_pause_in_transaction) { @@ -4099,7 +4116,6 @@ int processCommand(client *c) { addReply(c,shared.queued); } else { call(c,CMD_CALL_FULL); - c->woff = server.master_repl_offset; if (listLength(server.ready_keys)) handleClientsBlockedOnKeys(); } diff --git a/src/server.h b/src/server.h index a3d9aa088..ed39b1b96 100644 --- a/src/server.h +++ b/src/server.h @@ -399,6 +399,7 @@ typedef enum blocking_type { BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */ BLOCKED_LIST, /* BLPOP & co. */ BLOCKED_WAIT, /* WAIT for synchronous replication. */ + BLOCKED_WAITAOF, /* WAITAOF for AOF file fsync. */ BLOCKED_MODULE, /* Blocked by a loadable module. */ BLOCKED_STREAM, /* XREAD. */ BLOCKED_ZSET, /* BZPOP et al. */ @@ -1010,6 +1011,7 @@ typedef struct blockingState { /* BLOCKED_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ + int numlocal; /* Indication if WAITAOF is waiting for local fsync. */ long long reploffset; /* Replication offset to reach. */ /* BLOCKED_MODULE */ @@ -1175,6 +1177,7 @@ typedef struct client { long long reploff; /* Applied replication offset if this is a master. */ long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ + long long repl_aof_off; /* Replication AOF fsync ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves @@ -1802,6 +1805,11 @@ struct redisServer { char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ long long master_repl_offset; /* My current replication offset */ long long second_replid_offset; /* Accept offsets up to this for replid2. */ + redisAtomic long long fsynced_reploff_pending;/* Largest replication offset to + * potentially have been fsynced, applied to + fsynced_reploff only when AOF state is AOF_ON + (not during the initial rewrite) */ + long long fsynced_reploff; /* Largest replication offset that has been confirmed to be fsynced */ int slaveseldb; /* Last SELECTed DB in replication output */ int repl_ping_slave_period; /* Master pings the slave every N seconds */ replBacklog *repl_backlog; /* Replication backlog for partial syncs */ @@ -1859,7 +1867,7 @@ struct redisServer { long long master_initial_offset; /* Master PSYNC offset. */ int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ /* Synchronous replication. */ - list *clients_waiting_acks; /* Clients waiting in WAIT command. */ + list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ /* Limits */ unsigned int maxclients; /* Max number of simultaneous clients */ @@ -2789,6 +2797,7 @@ int checkGoodReplicasStatus(void); void processClientsWaitingReplicas(void); void unblockClientWaitingReplicas(client *c); int replicationCountAcksByOffset(long long offset); +int replicationCountAOFAcksByOffset(long long offset); void replicationSendNewlineToMaster(void); long long replicationGetSlaveOffset(void); char *replicationGetSlaveName(client *c); @@ -3352,6 +3361,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo void blockClientShutdown(client *c); void blockPostponeClient(client *c); void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas); +void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas); void signalDeletedKeyAsReady(redisDb *db, robj *key, int type); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with); @@ -3616,6 +3626,7 @@ void bitcountCommand(client *c); void bitposCommand(client *c); void replconfCommand(client *c); void waitCommand(client *c); +void waitaofCommand(client *c); void georadiusbymemberCommand(client *c); void georadiusbymemberroCommand(client *c); void georadiusCommand(client *c); diff --git a/tests/support/util.tcl b/tests/support/util.tcl index b2c9bdf7a..062f33a14 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -878,9 +878,9 @@ proc debug_digest {{level 0}} { r $level debug digest } -proc wait_for_blocked_client {} { +proc wait_for_blocked_client {{idx 0}} { wait_for_condition 50 100 { - [s blocked_clients] ne 0 + [s $idx blocked_clients] ne 0 } else { fail "no blocked clients" } diff --git a/tests/unit/wait.tcl b/tests/unit/wait.tcl index 0ab36a0e6..7e040fbc0 100644 --- a/tests/unit/wait.tcl +++ b/tests/unit/wait.tcl @@ -69,3 +69,234 @@ start_server {} { assert {[$master wait 1 1000] == 1} } }} + + +tags {"wait aof network external:skip"} { + start_server {overrides {appendonly {yes} auto-aof-rewrite-percentage {0}}} { + set master [srv 0 client] + + test {WAITAOF local copy before fsync} { + r config set appendfsync no + $master incr foo + assert_equal [$master waitaof 1 0 50] {0 0} ;# exits on timeout + r config set appendfsync everysec + } + + test {WAITAOF local copy everysec} { + $master incr foo + assert_equal [$master waitaof 1 0 0] {1 0} + } + + test {WAITAOF local copy with appendfsync always} { + r config set appendfsync always + $master incr foo + assert_equal [$master waitaof 1 0 0] {1 0} + r config set appendfsync everysec + } + + test {WAITAOF local wait and then stop aof} { + set rd [redis_deferring_client] + $rd incr foo + $rd read + $rd waitaof 1 0 0 + wait_for_blocked_client + r config set appendonly no ;# this should release the blocked client as an error + assert_error {ERR WAITAOF cannot be used when appendonly is disabled} {$rd read} + $rd close + } + + test {WAITAOF local on server with aof disabled} { + $master incr foo + assert_error {ERR WAITAOF cannot be used when appendonly is disabled} {$master waitaof 1 0 0} + } + + $master config set appendonly yes + waitForBgrewriteaof $master + + start_server {overrides {appendonly {yes} auto-aof-rewrite-percentage {0}}} { + set master_host [srv -1 host] + set master_port [srv -1 port] + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_pid [srv 0 pid] + + # make sure the master always fsyncs first (easier to test) + $master config set appendfsync always + $replica config set appendfsync no + + test {WAITAOF on demoted master gets unblocked with an error} { + set rd [redis_deferring_client] + $rd incr foo + $rd read + $rd waitaof 0 1 0 + wait_for_blocked_client + $replica replicaof $master_host $master_port + assert_error {UNBLOCKED force unblock from blocking operation,*} {$rd read} + $rd close + } + + wait_for_ofs_sync $master $replica + + test {WAITAOF replica copy before fsync} { + $master incr foo + assert_equal [$master waitaof 0 1 50] {1 0} ;# exits on timeout + } + $replica config set appendfsync everysec + + test {WAITAOF replica copy everysec} { + $master incr foo + assert_equal [$master waitaof 0 1 0] {1 1} + } + + test {WAITAOF replica copy appendfsync always} { + $replica config set appendfsync always + $master incr foo + assert_equal [$master waitaof 0 1 0] {1 1} + $replica config set appendfsync everysec + } + + test {WAITAOF replica copy if replica is blocked} { + exec kill -SIGSTOP $replica_pid + $master incr foo + assert_equal [$master waitaof 0 1 50] {1 0} ;# exits on timeout + exec kill -SIGCONT $replica_pid + assert_equal [$master waitaof 0 1 0] {1 1} + } + + test {WAITAOF on promoted replica} { + $replica replicaof no one + $replica incr foo + assert_equal [$replica waitaof 1 0 0] {1 0} + } + + test {WAITAOF master that loses a replica and backlog is dropped} { + $master config set repl-backlog-ttl 1 + after 2000 ;# wait for backlog to expire + $master incr foo + assert_equal [$master waitaof 1 0 0] {1 0} + } + + test {WAITAOF master without backlog, wait is released when the replica finishes full-sync} { + set rd [redis_deferring_client -1] + $rd incr foo + $rd read + $rd waitaof 0 1 0 + wait_for_blocked_client -1 + $replica replicaof $master_host $master_port + assert_equal [$rd read] {1 1} + $rd close + } + + test {WAITAOF master isn't configured to do AOF} { + $master config set appendonly no + $master incr foo + assert_equal [$master waitaof 0 1 0] {0 1} + } + + test {WAITAOF replica isn't configured to do AOF} { + $master config set appendonly yes + waitForBgrewriteaof $master + $replica config set appendonly no + $master incr foo + assert_equal [$master waitaof 1 0 0] {1 0} + } + + test {WAITAOF both local and replica got AOF enabled at runtime} { + $replica config set appendonly yes + waitForBgrewriteaof $replica + $master incr foo + assert_equal [$master waitaof 1 1 0] {1 1} + } + + test {WAITAOF master sends PING after last write} { + $master config set repl-ping-replica-period 1 + $master incr foo + after 1200 ;# wait for PING + $master get foo + assert_equal [$master waitaof 1 1 0] {1 1} + $master config set repl-ping-replica-period 10 + } + + test {WAITAOF master client didn't send any write command} { + $master config set repl-ping-replica-period 1 + set client [redis_client -1] + after 1200 ;# wait for PING + assert_equal [$master waitaof 1 1 0] {1 1} + $client close + $master config set repl-ping-replica-period 10 + } + + test {WAITAOF master client didn't send any command} { + $master config set repl-ping-replica-period 1 + set client [redis [srv -1 "host"] [srv -1 "port"] 0 $::tls] + after 1200 ;# wait for PING + assert_equal [$master waitaof 1 1 0] {1 1} + $client close + $master config set repl-ping-replica-period 10 + } + + foreach fsync {no everysec always} { + test "WAITAOF when replica switches between masters, fsync: $fsync" { + # test a case where a replica is moved from one master to the other + # between two replication streams with different offsets that should + # not be mixed. done to smoke-test race conditions with bio thread. + start_server {overrides {appendonly {yes} auto-aof-rewrite-percentage {0}}} { + start_server {overrides {appendonly {yes} auto-aof-rewrite-percentage {0}}} { + set master2 [srv -1 client] + set master2_host [srv -1 host] + set master2_port [srv -1 port] + set replica2 [srv 0 client] + set replica2_host [srv 0 host] + set replica2_port [srv 0 port] + set replica2_pid [srv 0 pid] + + $replica2 replicaof $master2_host $master2_port + wait_for_ofs_sync $master2 $replica2 + + $master config set appendfsync $fsync + $master2 config set appendfsync $fsync + $replica config set appendfsync $fsync + $replica2 config set appendfsync $fsync + if {$fsync eq "no"} { + after 2000 ;# wait for any previous fsync to finish + # can't afford "no" on the masters + $master config set appendfsync always + $master2 config set appendfsync always + } elseif {$fsync eq "everysec"} { + after 990 ;# hoping to hit a race + } + + # add some writes and block a client on each master + set rd [redis_deferring_client -3] + set rd2 [redis_deferring_client -1] + $rd set boo 11 + $rd2 set boo 22 + $rd read + $rd2 read + $rd waitaof 1 1 0 + $rd2 waitaof 1 1 0 + + if {$fsync eq "no"} { + # since appendfsync is disabled in the replicas, the client + # will get released only with full sync + wait_for_blocked_client -1 + wait_for_blocked_client -3 + } + # switch between the two replicas + $replica2 replicaof $master_host $master_port + $replica replicaof $master2_host $master2_port + assert_equal [$rd read] {1 1} + assert_equal [$rd2 read] {1 1} + $rd close + $rd2 close + + assert_equal [$replica get boo] 22 + assert_equal [$replica2 get boo] 11 + } + } + } + } + } + } +}