Remove RDB files used for replication in persistence-less instances.
Former-commit-id: b323645227a3e2cc5928e649586221aba508b10d
This commit is contained in:
parent
1a078be843
commit
34f7c653d5
@ -53,6 +53,11 @@ void putSlaveOnline(client *replica);
|
|||||||
int cancelReplicationHandshake(redisMaster *mi);
|
int cancelReplicationHandshake(redisMaster *mi);
|
||||||
static void propagateMasterStaleKeys();
|
static void propagateMasterStaleKeys();
|
||||||
|
|
||||||
|
/* We take a global flag to remember if this instance generated an RDB
|
||||||
|
* because of replication, so that we can remove the RDB file in case
|
||||||
|
* the instance is configured to have no persistence. */
|
||||||
|
int RDBGeneratedByReplication = 0;
|
||||||
|
|
||||||
/* --------------------------- Utility functions ---------------------------- */
|
/* --------------------------- Utility functions ---------------------------- */
|
||||||
|
|
||||||
/* Return the pointer to a string representing the replica ip:listening_port
|
/* Return the pointer to a string representing the replica ip:listening_port
|
||||||
@ -789,6 +794,10 @@ int startBgsaveForReplication(int mincapa) {
|
|||||||
retval = C_ERR;
|
retval = C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If we succeeded to start a BGSAVE with disk target, let's remember
|
||||||
|
* this fact, so that we can later delete the file if needed. */
|
||||||
|
if (retval == C_OK && !socket_target) RDBGeneratedByReplication = 1;
|
||||||
|
|
||||||
/* If we failed to BGSAVE, remove the slaves waiting for a full
|
/* If we failed to BGSAVE, remove the slaves waiting for a full
|
||||||
* resynchronization from the list of slaves, inform them with
|
* resynchronization from the list of slaves, inform them with
|
||||||
* an error about what happened, close the connection ASAP. */
|
* an error about what happened, close the connection ASAP. */
|
||||||
@ -1132,6 +1141,36 @@ void putSlaveOnline(client *replica) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* We call this function periodically to remove an RDB file that was
|
||||||
|
* generated because of replication, in an instance that is otherwise
|
||||||
|
* without any persistence. We don't want instances without persistence
|
||||||
|
* to take RDB files around, this violates certain policies in certain
|
||||||
|
* environments. */
|
||||||
|
void removeRDBUsedToSyncReplicas(void) {
|
||||||
|
if (allPersistenceDisabled() && RDBGeneratedByReplication) {
|
||||||
|
client *slave;
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
|
||||||
|
int delrdb = 1;
|
||||||
|
listRewind(server.slaves,&li);
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
|
slave = ln->value;
|
||||||
|
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
|
||||||
|
slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END ||
|
||||||
|
slave->replstate == SLAVE_STATE_SEND_BULK)
|
||||||
|
{
|
||||||
|
delrdb = 0;
|
||||||
|
break; /* No need to check the other replicas. */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (delrdb) {
|
||||||
|
RDBGeneratedByReplication = 0;
|
||||||
|
unlink(server.rdb_filename);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void sendBulkToSlave(connection *conn) {
|
void sendBulkToSlave(connection *conn) {
|
||||||
client *replica = (client*)connGetPrivateData(conn);
|
client *replica = (client*)connGetPrivateData(conn);
|
||||||
serverAssert(FCorrectThread(replica));
|
serverAssert(FCorrectThread(replica));
|
||||||
@ -1144,7 +1183,8 @@ void sendBulkToSlave(connection *conn) {
|
|||||||
if (replica->replpreamble) {
|
if (replica->replpreamble) {
|
||||||
nwritten = connWrite(conn,replica->replpreamble,sdslen(replica->replpreamble));
|
nwritten = connWrite(conn,replica->replpreamble,sdslen(replica->replpreamble));
|
||||||
if (nwritten == -1) {
|
if (nwritten == -1) {
|
||||||
serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
|
serverLog(LL_VERBOSE,
|
||||||
|
"Write error sending RDB preamble to replica: %s",
|
||||||
connGetLastError(conn));
|
connGetLastError(conn));
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
return;
|
return;
|
||||||
@ -1985,12 +2025,14 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
"Failed trying to load the MASTER synchronization "
|
"Failed trying to load the MASTER synchronization "
|
||||||
"DB from disk");
|
"DB from disk");
|
||||||
cancelReplicationHandshake(mi);
|
cancelReplicationHandshake(mi);
|
||||||
|
if (allPersistenceDisabled()) unlink(g_pserver->rdb_filename);
|
||||||
/* Note that there's no point in restarting the AOF on sync failure,
|
/* Note that there's no point in restarting the AOF on sync failure,
|
||||||
it'll be restarted when sync succeeds or replica promoted. */
|
it'll be restarted when sync succeeds or replica promoted. */
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Cleanup. */
|
/* Cleanup. */
|
||||||
|
if (allPersistenceDisabled()) unlink(server.rdb_filename);
|
||||||
if (fUpdate)
|
if (fUpdate)
|
||||||
unlink(mi->repl_transfer_tmpfile);
|
unlink(mi->repl_transfer_tmpfile);
|
||||||
zfree(mi->repl_transfer_tmpfile);
|
zfree(mi->repl_transfer_tmpfile);
|
||||||
@ -3684,6 +3726,11 @@ void replicationCron(void) {
|
|||||||
|
|
||||||
propagateMasterStaleKeys();
|
propagateMasterStaleKeys();
|
||||||
|
|
||||||
|
|
||||||
|
/* Remove the RDB file used for replication if Redis is not running
|
||||||
|
* with any persistence. */
|
||||||
|
removeRDBUsedToSyncReplicas();
|
||||||
|
|
||||||
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
|
||||||
refreshGoodSlavesCount();
|
refreshGoodSlavesCount();
|
||||||
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
|
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
|
||||||
|
@ -1483,12 +1483,20 @@ void updateDictResizePolicy(void) {
|
|||||||
dictDisableResize();
|
dictDisableResize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Return true if there are no active children processes doing RDB saving,
|
||||||
|
* AOF rewriting, or some side process spawned by a loaded module. */
|
||||||
int hasActiveChildProcess() {
|
int hasActiveChildProcess() {
|
||||||
return g_pserver->rdb_child_pid != -1 ||
|
return g_pserver->rdb_child_pid != -1 ||
|
||||||
g_pserver->aof_child_pid != -1 ||
|
g_pserver->aof_child_pid != -1 ||
|
||||||
g_pserver->module_child_pid != -1;
|
g_pserver->module_child_pid != -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Return true if this instance has persistence completely turned off:
|
||||||
|
* both RDB and AOF are disabled. */
|
||||||
|
int allPersistenceDisabled(void) {
|
||||||
|
return server.saveparamslen == 0 && server.aof_state == AOF_OFF;
|
||||||
|
}
|
||||||
|
|
||||||
/* ======================= Cron: called every 100 ms ======================== */
|
/* ======================= Cron: called every 100 ms ======================== */
|
||||||
|
|
||||||
/* Add a sample to the operations per second array of samples. */
|
/* Add a sample to the operations per second array of samples. */
|
||||||
|
@ -2357,6 +2357,7 @@ void loadingProgress(off_t pos);
|
|||||||
void stopLoading(int success);
|
void stopLoading(int success);
|
||||||
void startSaving(int rdbflags);
|
void startSaving(int rdbflags);
|
||||||
void stopSaving(int success);
|
void stopSaving(int success);
|
||||||
|
int allPersistenceDisabled(void);
|
||||||
|
|
||||||
#define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */
|
#define DISK_ERROR_TYPE_AOF 1 /* Don't accept writes: AOF errors. */
|
||||||
#define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */
|
#define DISK_ERROR_TYPE_RDB 2 /* Don't accept writes: RDB errors. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user