
Implementing the WAITAOF functionality which would allow the user to block until a specified number of Redises have fsynced all previous write commands to the AOF. Syntax: `WAITAOF <num_local> <num_replicas> <timeout>` Response: Array containing two elements: num_local, num_replicas num_local is always either 0 or 1 representing the local AOF on the master. num_replicas is the number of replicas that acknowledged the a replication offset of the last write being fsynced to the AOF. Returns an error when called on replicas, or when called with non-zero num_local on a master with AOF disabled, in all other cases the response just contains number of fsync copies. Main changes: * Added code to keep track of replication offsets that are confirmed to have been fsynced to disk. * Keep advancing master_repl_offset even when replication is disabled (and there's no replication backlog, only if there's an AOF enabled). This way we can use this command and it's mechanisms even when replication is disabled. * Extend REPLCONF ACK to `REPLCONF ACK <ofs> FACK <ofs>`, the FACK will be appended only if there's an AOF on the replica, and already ignored on old masters (thus backwards compatible) * WAIT now no longer wait for the replication offset after your last command, but rather the replication offset after your last write (or read command that caused propagation, e.g. lazy expiry). Unrelated changes: * WAIT command respects CLIENT_DENY_BLOCKING (not just CLIENT_MULTI) Implementation details: * Add an atomic var named `fsynced_reploff_pending` that's updated (usually by the bio thread) and later copied to the main `fsynced_reploff` variable (only if the AOF base file exists). I.e. during the initial AOF rewrite it will not be used as the fsynced offset since the AOF base is still missing. * Replace close+fsync bio job with new BIO_CLOSE_AOF (AOF specific) job that will also update fsync offset the field. * Handle all AOF jobs (BIO_CLOSE_AOF, BIO_AOF_FSYNC) in the same bio worker thread, to impose ordering on their execution. This solves a race condition where a job could set `fsynced_reploff_pending` to a higher value than another pending fsync job, resulting in indicating an offset for which parts of the data have not yet actually been fsynced. Imposing an ordering on the jobs guarantees that fsync jobs are executed in increasing order of replication offset. * Drain bio jobs when switching `appendfsync` to "always" This should prevent a write race between updates to `fsynced_reploff_pending` in the main thread (`flushAppendOnlyFile` when set to ALWAYS fsync), and those done in the bio thread. * Drain the pending fsync when starting over a new AOF to avoid race conditions with the previous AOF offsets overriding the new one (e.g. after switching to replicate from a new master). * Make sure to update the fsynced offset at the end of the initial AOF rewrite. a must in case there are no additional writes that trigger a periodic fsync, specifically for a replica that does a full sync. Limitations: It is possible to write a module and a Lua script that propagate to the AOF and doesn't propagate to the replication stream. see REDISMODULE_ARGV_NO_REPLICAS and luaRedisSetReplCommand. These features are incompatible with the WAITAOF command, and can result in two bad cases. The scenario is that the user executes command that only propagates to AOF, and then immediately issues a WAITAOF, and there's no further writes on the replication stream after that. 1. if the the last thing that happened on the replication stream is a PING (which increased the replication offset but won't trigger an fsync on the replica), then the client would hang forever (will wait for an fack that the replica will never send sine it doesn't trigger any fsyncs). 2. if the last thing that happened is a write command that got propagated properly, then WAITAOF will be released immediately, without waiting for an fsync (since the offset didn't change) Refactoring: * Plumbing to allow bio worker to handle multiple job types This introduces infrastructure necessary to allow BIO workers to not have a 1-1 mapping of worker to job-type. This allows in the future to assign multiple job types to a single worker, either as a performance/resource optimization, or as a way of enforcing ordering between specific classes of jobs. Co-authored-by: Oran Agra <oran@redislabs.com>
346 lines
13 KiB
C
346 lines
13 KiB
C
/* Background I/O service for Redis.
|
|
*
|
|
* This file implements operations that we need to perform in the background.
|
|
* Currently there is only a single operation, that is a background close(2)
|
|
* system call. This is needed as when the process is the last owner of a
|
|
* reference to a file closing it means unlinking it, and the deletion of the
|
|
* file is slow, blocking the server.
|
|
*
|
|
* In the future we'll either continue implementing new things we need or
|
|
* we'll switch to libeio. However there are probably long term uses for this
|
|
* file as we may want to put here Redis specific background tasks (for instance
|
|
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
|
|
* implementation).
|
|
*
|
|
* DESIGN
|
|
* ------
|
|
*
|
|
* The design is simple: We have a structure representing a job to perform,
|
|
* and several worker threads and job queues. Every job type is assigned to
|
|
* a specific worker thread, and a single worker may handle several different
|
|
* job types.
|
|
* Every thread waits for new jobs in its queue, and processes every job
|
|
* sequentially.
|
|
*
|
|
* Jobs handled by the same worker are guaranteed to be processed from the
|
|
* least-recently-inserted to the most-recently-inserted (older jobs processed
|
|
* first).
|
|
*
|
|
* Currently there is no way for the creator of the job to be notified about
|
|
* the completion of the operation, this will only be added when/if needed.
|
|
*
|
|
* ----------------------------------------------------------------------------
|
|
*
|
|
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
|
|
#include "server.h"
|
|
#include "bio.h"
|
|
|
|
static char* bio_worker_title[] = {
|
|
"bio_close_file",
|
|
"bio_aof",
|
|
"bio_lazy_free",
|
|
};
|
|
|
|
#define BIO_WORKER_NUM (sizeof(bio_worker_title) / sizeof(*bio_worker_title))
|
|
|
|
static unsigned int bio_job_to_worker[] = {
|
|
[BIO_CLOSE_FILE] = 0,
|
|
[BIO_AOF_FSYNC] = 1,
|
|
[BIO_CLOSE_AOF] = 1,
|
|
[BIO_LAZY_FREE] = 2,
|
|
};
|
|
|
|
static pthread_t bio_threads[BIO_WORKER_NUM];
|
|
static pthread_mutex_t bio_mutex[BIO_WORKER_NUM];
|
|
static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM];
|
|
static list *bio_jobs[BIO_WORKER_NUM];
|
|
static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0};
|
|
|
|
/* This structure represents a background Job. It is only used locally to this
|
|
* file as the API does not expose the internals at all. */
|
|
typedef union bio_job {
|
|
struct {
|
|
int type; /* Job-type tag. This needs to appear as the first element in all union members. */
|
|
} header;
|
|
|
|
/* Job specific arguments.*/
|
|
struct {
|
|
int type;
|
|
int fd; /* Fd for file based background jobs */
|
|
long long offset; /* A job-specific offset, if applicable */
|
|
unsigned need_fsync:1; /* A flag to indicate that a fsync is required before
|
|
* the file is closed. */
|
|
unsigned need_reclaim_cache:1; /* A flag to indicate that reclaim cache is required before
|
|
* the file is closed. */
|
|
} fd_args;
|
|
|
|
struct {
|
|
int type;
|
|
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
|
|
void *free_args[]; /* List of arguments to be passed to the free function */
|
|
} free_args;
|
|
} bio_job;
|
|
|
|
void *bioProcessBackgroundJobs(void *arg);
|
|
|
|
/* Make sure we have enough stack to perform all the things we do in the
|
|
* main thread. */
|
|
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
|
|
|
|
/* Initialize the background system, spawning the thread. */
|
|
void bioInit(void) {
|
|
pthread_attr_t attr;
|
|
pthread_t thread;
|
|
size_t stacksize;
|
|
unsigned long j;
|
|
|
|
/* Initialization of state vars and objects */
|
|
for (j = 0; j < BIO_WORKER_NUM; j++) {
|
|
pthread_mutex_init(&bio_mutex[j],NULL);
|
|
pthread_cond_init(&bio_newjob_cond[j],NULL);
|
|
bio_jobs[j] = listCreate();
|
|
}
|
|
|
|
/* Set the stack size as by default it may be small in some system */
|
|
pthread_attr_init(&attr);
|
|
pthread_attr_getstacksize(&attr,&stacksize);
|
|
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
|
|
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
|
|
pthread_attr_setstacksize(&attr, stacksize);
|
|
|
|
/* Ready to spawn our threads. We use the single argument the thread
|
|
* function accepts in order to pass the job ID the thread is
|
|
* responsible for. */
|
|
for (j = 0; j < BIO_WORKER_NUM; j++) {
|
|
void *arg = (void*)(unsigned long) j;
|
|
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
|
|
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
|
|
exit(1);
|
|
}
|
|
bio_threads[j] = thread;
|
|
}
|
|
}
|
|
|
|
void bioSubmitJob(int type, bio_job *job) {
|
|
job->header.type = type;
|
|
unsigned long worker = bio_job_to_worker[type];
|
|
pthread_mutex_lock(&bio_mutex[worker]);
|
|
listAddNodeTail(bio_jobs[worker],job);
|
|
bio_jobs_counter[type]++;
|
|
pthread_cond_signal(&bio_newjob_cond[worker]);
|
|
pthread_mutex_unlock(&bio_mutex[worker]);
|
|
}
|
|
|
|
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
|
|
va_list valist;
|
|
/* Allocate memory for the job structure and all required
|
|
* arguments */
|
|
bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
|
|
job->free_args.free_fn = free_fn;
|
|
|
|
va_start(valist, arg_count);
|
|
for (int i = 0; i < arg_count; i++) {
|
|
job->free_args.free_args[i] = va_arg(valist, void *);
|
|
}
|
|
va_end(valist);
|
|
bioSubmitJob(BIO_LAZY_FREE, job);
|
|
}
|
|
|
|
void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) {
|
|
bio_job *job = zmalloc(sizeof(*job));
|
|
job->fd_args.fd = fd;
|
|
job->fd_args.need_fsync = need_fsync;
|
|
job->fd_args.need_reclaim_cache = need_reclaim_cache;
|
|
|
|
bioSubmitJob(BIO_CLOSE_FILE, job);
|
|
}
|
|
|
|
void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache) {
|
|
bio_job *job = zmalloc(sizeof(*job));
|
|
job->fd_args.fd = fd;
|
|
job->fd_args.offset = offset;
|
|
job->fd_args.need_fsync = 1;
|
|
job->fd_args.need_reclaim_cache = need_reclaim_cache;
|
|
|
|
bioSubmitJob(BIO_CLOSE_AOF, job);
|
|
}
|
|
|
|
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache) {
|
|
bio_job *job = zmalloc(sizeof(*job));
|
|
job->fd_args.fd = fd;
|
|
job->fd_args.offset = offset;
|
|
job->fd_args.need_reclaim_cache = need_reclaim_cache;
|
|
|
|
bioSubmitJob(BIO_AOF_FSYNC, job);
|
|
}
|
|
|
|
void *bioProcessBackgroundJobs(void *arg) {
|
|
bio_job *job;
|
|
unsigned long worker = (unsigned long) arg;
|
|
sigset_t sigset;
|
|
|
|
/* Check that the worker is within the right interval. */
|
|
serverAssert(worker < BIO_WORKER_NUM);
|
|
|
|
redis_set_thread_title(bio_worker_title[worker]);
|
|
|
|
redisSetCpuAffinity(server.bio_cpulist);
|
|
|
|
makeThreadKillable();
|
|
|
|
pthread_mutex_lock(&bio_mutex[worker]);
|
|
/* Block SIGALRM so we are sure that only the main thread will
|
|
* receive the watchdog signal. */
|
|
sigemptyset(&sigset);
|
|
sigaddset(&sigset, SIGALRM);
|
|
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
|
|
serverLog(LL_WARNING,
|
|
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
|
|
|
|
while(1) {
|
|
listNode *ln;
|
|
|
|
/* The loop always starts with the lock hold. */
|
|
if (listLength(bio_jobs[worker]) == 0) {
|
|
pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]);
|
|
continue;
|
|
}
|
|
/* Get the job from the queue. */
|
|
ln = listFirst(bio_jobs[worker]);
|
|
job = ln->value;
|
|
/* It is now possible to unlock the background system as we know have
|
|
* a stand alone job structure to process.*/
|
|
pthread_mutex_unlock(&bio_mutex[worker]);
|
|
|
|
/* Process the job accordingly to its type. */
|
|
int job_type = job->header.type;
|
|
|
|
if (job_type == BIO_CLOSE_FILE) {
|
|
if (job->fd_args.need_fsync &&
|
|
redis_fsync(job->fd_args.fd) == -1 &&
|
|
errno != EBADF && errno != EINVAL)
|
|
{
|
|
serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno));
|
|
}
|
|
if (job->fd_args.need_reclaim_cache) {
|
|
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
|
|
serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno));
|
|
}
|
|
}
|
|
close(job->fd_args.fd);
|
|
} else if (job_type == BIO_AOF_FSYNC || job_type == BIO_CLOSE_AOF) {
|
|
/* The fd may be closed by main thread and reused for another
|
|
* socket, pipe, or file. We just ignore these errno because
|
|
* aof fsync did not really fail. */
|
|
if (redis_fsync(job->fd_args.fd) == -1 &&
|
|
errno != EBADF && errno != EINVAL)
|
|
{
|
|
int last_status;
|
|
atomicGet(server.aof_bio_fsync_status,last_status);
|
|
atomicSet(server.aof_bio_fsync_status,C_ERR);
|
|
atomicSet(server.aof_bio_fsync_errno,errno);
|
|
if (last_status == C_OK) {
|
|
serverLog(LL_WARNING,
|
|
"Fail to fsync the AOF file: %s",strerror(errno));
|
|
}
|
|
} else {
|
|
atomicSet(server.aof_bio_fsync_status,C_OK);
|
|
atomicSet(server.fsynced_reploff_pending, job->fd_args.offset);
|
|
}
|
|
|
|
if (job->fd_args.need_reclaim_cache) {
|
|
if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) {
|
|
serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno));
|
|
}
|
|
}
|
|
if (job_type == BIO_CLOSE_AOF)
|
|
close(job->fd_args.fd);
|
|
} else if (job_type == BIO_LAZY_FREE) {
|
|
job->free_args.free_fn(job->free_args.free_args);
|
|
} else {
|
|
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
|
|
}
|
|
zfree(job);
|
|
|
|
/* Lock again before reiterating the loop, if there are no longer
|
|
* jobs to process we'll block again in pthread_cond_wait(). */
|
|
pthread_mutex_lock(&bio_mutex[worker]);
|
|
listDelNode(bio_jobs[worker], ln);
|
|
bio_jobs_counter[job_type]--;
|
|
pthread_cond_signal(&bio_newjob_cond[worker]);
|
|
}
|
|
}
|
|
|
|
/* Return the number of pending jobs of the specified type. */
|
|
unsigned long bioPendingJobsOfType(int type) {
|
|
unsigned int worker = bio_job_to_worker[type];
|
|
|
|
pthread_mutex_lock(&bio_mutex[worker]);
|
|
unsigned long val = bio_jobs_counter[type];
|
|
pthread_mutex_unlock(&bio_mutex[worker]);
|
|
|
|
return val;
|
|
}
|
|
|
|
/* Wait for the job queue of the worker for jobs of specified type to become empty. */
|
|
void bioDrainWorker(int job_type) {
|
|
unsigned long worker = bio_job_to_worker[job_type];
|
|
|
|
pthread_mutex_lock(&bio_mutex[worker]);
|
|
while (listLength(bio_jobs[worker]) > 0) {
|
|
pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]);
|
|
}
|
|
pthread_mutex_unlock(&bio_mutex[worker]);
|
|
}
|
|
|
|
/* Kill the running bio threads in an unclean way. This function should be
|
|
* used only when it's critical to stop the threads for some reason.
|
|
* Currently Redis does this only on crash (for instance on SIGSEGV) in order
|
|
* to perform a fast memory check without other threads messing with memory. */
|
|
void bioKillThreads(void) {
|
|
int err;
|
|
unsigned long j;
|
|
|
|
for (j = 0; j < BIO_WORKER_NUM; j++) {
|
|
if (bio_threads[j] == pthread_self()) continue;
|
|
if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) {
|
|
if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
|
|
serverLog(LL_WARNING,
|
|
"Bio worker thread #%lu can not be joined: %s",
|
|
j, strerror(err));
|
|
} else {
|
|
serverLog(LL_WARNING,
|
|
"Bio worker thread #%lu terminated",j);
|
|
}
|
|
}
|
|
}
|
|
}
|