Merge branch 'flash-psync' into 'fastsync_collab'
Flash Partial Sync See merge request external-collab/keydb-pro-6!6 Former-commit-id: 2ebf2a8abd8df5c4cf3a5d759491962d050e8cc5
This commit is contained in:
commit
414277b098
@ -2,6 +2,8 @@
|
||||
#include <functional>
|
||||
#include "sds.h"
|
||||
|
||||
#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f"
|
||||
|
||||
class IStorageFactory
|
||||
{
|
||||
public:
|
||||
@ -9,6 +11,7 @@ public:
|
||||
|
||||
virtual ~IStorageFactory() {}
|
||||
virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0;
|
||||
virtual class IStorage *createMetadataDb() = 0;
|
||||
virtual const char *name() const = 0;
|
||||
virtual size_t totalDiskspaceUsed() const = 0;
|
||||
virtual bool FSlow() const = 0;
|
||||
|
@ -2093,6 +2093,64 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
||||
}
|
||||
}
|
||||
|
||||
/* Save the replid of yourself and any connected masters to storage.
|
||||
* Returns if no storage provider is used. */
|
||||
void saveMasterStatusToStorage()
|
||||
{
|
||||
if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return;
|
||||
|
||||
g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true);
|
||||
g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true);
|
||||
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
|
||||
g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb,
|
||||
g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true);
|
||||
}
|
||||
|
||||
struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL);
|
||||
|
||||
if (miFirst && miFirst->master) {
|
||||
g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true);
|
||||
}
|
||||
else if (miFirst && miFirst->cached_master) {
|
||||
g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true);
|
||||
}
|
||||
|
||||
if (listLength(g_pserver->masters) == 0) {
|
||||
g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true);
|
||||
return;
|
||||
}
|
||||
sds val = sds(sdsempty());
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
redisMaster *mi;
|
||||
listRewind(g_pserver->masters,&li);
|
||||
while((ln = listNext(&li)) != NULL) {
|
||||
mi = (redisMaster*)listNodeValue(ln);
|
||||
if (!mi->master) {
|
||||
// If master client is not available, use info from master struct - better than nothing
|
||||
if (mi->master_replid[0] == 0) {
|
||||
// if replid is null, there's no reason to save it
|
||||
continue;
|
||||
}
|
||||
val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid,
|
||||
mi->master_initial_offset,
|
||||
mi->masterhost,
|
||||
mi->masterport);
|
||||
}
|
||||
else {
|
||||
if (mi->master->replid[0] == 0) {
|
||||
// if replid is null, there's no reason to save it
|
||||
continue;
|
||||
}
|
||||
val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid,
|
||||
mi->master->reploff,
|
||||
mi->masterhost,
|
||||
mi->masterport);
|
||||
}
|
||||
}
|
||||
g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true);
|
||||
}
|
||||
|
||||
/* Change the current instance replication ID with a new, random one.
|
||||
* This will prevent successful PSYNCs between this master and other
|
||||
* slaves, so the command should be called when something happens that
|
||||
@ -2100,6 +2158,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
||||
void changeReplicationId(void) {
|
||||
getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE);
|
||||
g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0';
|
||||
saveMasterStatusToStorage();
|
||||
}
|
||||
|
||||
|
||||
@ -2891,6 +2950,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
g_pserver->master_repl_offset = mi->master->reploff;
|
||||
if (g_pserver->repl_batch_offStart >= 0)
|
||||
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
||||
saveMasterStatusToStorage();
|
||||
}
|
||||
clearReplicationId2();
|
||||
|
||||
@ -3852,6 +3912,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) {
|
||||
mi->masterhost, mi->masterport);
|
||||
connectWithMaster(mi);
|
||||
}
|
||||
saveMasterStatusToStorage();
|
||||
return mi;
|
||||
}
|
||||
|
||||
@ -3935,6 +3996,8 @@ void replicationUnsetMaster(redisMaster *mi) {
|
||||
/* Restart the AOF subsystem in case we shut it down during a sync when
|
||||
* we were still a slave. */
|
||||
if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC();
|
||||
|
||||
saveMasterStatusToStorage();
|
||||
}
|
||||
|
||||
/* This function is called when the replica lose the connection with the
|
||||
@ -3966,6 +4029,8 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
|
||||
mi->masterhost, mi->masterport);
|
||||
connectWithMaster(mi);
|
||||
}
|
||||
|
||||
saveMasterStatusToStorage();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3984,6 +3984,38 @@ void initServer(void) {
|
||||
slowlogInit();
|
||||
latencyMonitorInit();
|
||||
|
||||
if (g_pserver->m_pstorageFactory) {
|
||||
g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb();
|
||||
if (g_pserver->metadataDb) {
|
||||
g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){
|
||||
if (cb == sizeof(g_pserver->replid)) {
|
||||
memcpy(g_pserver->replid, data, cb);
|
||||
}
|
||||
});
|
||||
g_pserver->metadataDb->retrieve("repl-offset", 11, [&](const char *, size_t, const void *data, size_t cb){
|
||||
if (cb == sizeof(g_pserver->replid)) {
|
||||
g_pserver->master_repl_offset = *(long long*)data;
|
||||
}
|
||||
});
|
||||
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
listRewind(g_pserver->masters, &li);
|
||||
while ((ln = listNext(&li)))
|
||||
{
|
||||
redisMaster *mi = (redisMaster*)listNodeValue(ln);
|
||||
/* If we are a replica, create a cached master from this
|
||||
* information, in order to allow partial resynchronizations
|
||||
* with masters. */
|
||||
replicationCacheMasterUsingMyself(mi);
|
||||
g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t){
|
||||
selectDb(mi->cached_master, *(int*)data);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* We have to initialize storage providers after the cluster has been initialized */
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
{
|
||||
|
@ -2170,6 +2170,7 @@ struct redisServer {
|
||||
mode_t umask; /* The umask value of the process on startup */
|
||||
std::atomic<int> hz; /* serverCron() calls frequency in hertz */
|
||||
int in_fork_child; /* indication that this is a fork child */
|
||||
IStorage *metadataDb = nullptr;
|
||||
redisDb **db = nullptr;
|
||||
dict *commands; /* Command table */
|
||||
dict *orig_commands; /* Command table before command renaming. */
|
||||
|
@ -8,6 +8,7 @@
|
||||
#define INTERNAL_KEY_PREFIX "\x00\x04\x03\x00\x05\x02\x04"
|
||||
static const char count_key[] = INTERNAL_KEY_PREFIX "__keydb__count\1";
|
||||
static const char version_key[] = INTERNAL_KEY_PREFIX "__keydb__version\1";
|
||||
static const char meta_key[] = INTERNAL_KEY_PREFIX "__keydb__metadata\1";
|
||||
class RocksDBStorageFactory;
|
||||
|
||||
class RocksDBStorageProvider : public IStorage
|
||||
|
@ -14,6 +14,7 @@ public:
|
||||
~RocksDBStorageFactory();
|
||||
|
||||
virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
|
||||
virtual IStorage *createMetadataDb() override;
|
||||
virtual const char *name() const override;
|
||||
|
||||
virtual size_t totalDiskspaceUsed() const override;
|
||||
|
@ -32,6 +32,7 @@ IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const
|
||||
RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig)
|
||||
: m_path(dbfile)
|
||||
{
|
||||
dbnum++; // create an extra db for metadata
|
||||
// Get the count of column families in the actual database
|
||||
std::vector<std::string> vecT;
|
||||
auto status = rocksdb::DB::ListColumnFamilies(rocksdb::Options(), dbfile, &vecT);
|
||||
@ -134,6 +135,13 @@ std::string RocksDBStorageFactory::getTempFolder()
|
||||
return path;
|
||||
}
|
||||
|
||||
IStorage *RocksDBStorageFactory::createMetadataDb()
|
||||
{
|
||||
IStorage *metadataDb = this->create(-1, nullptr, nullptr);
|
||||
metadataDb->insert(meta_key, sizeof(meta_key), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false);
|
||||
return metadataDb;
|
||||
}
|
||||
|
||||
IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *privdata)
|
||||
{
|
||||
++db; // skip default col family
|
||||
|
@ -6,6 +6,13 @@ IStorage *TestStorageFactory::create(int, key_load_iterator, void *)
|
||||
return new (MALLOC_LOCAL) TestStorageProvider();
|
||||
}
|
||||
|
||||
IStorage *TestStorageFactory::createMetadataDb()
|
||||
{
|
||||
IStorage *metadataDb = new (MALLOC_LOCAL) TestStorageProvider();
|
||||
metadataDb->insert("KEYDB_METADATA_ID", strlen("KEYDB_METADATA_ID"), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false);
|
||||
return metadataDb;
|
||||
}
|
||||
|
||||
const char *TestStorageFactory::name() const
|
||||
{
|
||||
return "TEST Storage Provider";
|
||||
|
@ -5,6 +5,7 @@
|
||||
class TestStorageFactory : public IStorageFactory
|
||||
{
|
||||
virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override;
|
||||
virtual class IStorage *createMetadataDb() override;
|
||||
virtual const char *name() const override;
|
||||
virtual size_t totalDiskspaceUsed() const override { return 0; }
|
||||
virtual bool FSlow() const { return false; }
|
||||
|
135
tests/integration/replication-psync-flash.tcl
Normal file
135
tests/integration/replication-psync-flash.tcl
Normal file
@ -0,0 +1,135 @@
|
||||
# Creates a master-slave pair and breaks the link continuously to force
|
||||
# partial resyncs attempts, all this while flooding the master with
|
||||
# write queries.
|
||||
#
|
||||
# You can specify backlog size, ttl, delay before reconnection, test duration
|
||||
# in seconds, and an additional condition to verify at the end.
|
||||
#
|
||||
# If reconnect is > 0, the test actually try to break the connection and
|
||||
# reconnect with the master, otherwise just the initial synchronization is
|
||||
# checked for consistency.
|
||||
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
|
||||
start_server {tags {"repl"}} {
|
||||
start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] {
|
||||
|
||||
set master [srv -1 client]
|
||||
set master_host [srv -1 host]
|
||||
set master_port [srv -1 port]
|
||||
set slave [srv 0 client]
|
||||
|
||||
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000]
|
||||
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000]
|
||||
set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000]
|
||||
|
||||
test {Slave should be able to synchronize with the master} {
|
||||
$slave slaveof $master_host $master_port
|
||||
wait_for_condition 50 100 {
|
||||
[lindex [r role] 0] eq {slave} &&
|
||||
[lindex [r role] 3] eq {connected}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
}
|
||||
}
|
||||
|
||||
# Check that the background clients are actually writing.
|
||||
test {Detect write load to master} {
|
||||
wait_for_condition 50 1000 {
|
||||
[$master dbsize] > 100
|
||||
} else {
|
||||
fail "Can't detect write load from background clients."
|
||||
}
|
||||
}
|
||||
|
||||
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" {
|
||||
# Now while the clients are writing data, break the maste-slave
|
||||
# link multiple times.
|
||||
if ($reconnect) {
|
||||
for {set j 0} {$j < $duration*10} {incr j} {
|
||||
after 100
|
||||
# catch {puts "MASTER [$master dbsize] keys, REPLICA [$slave dbsize] keys"}
|
||||
|
||||
if {($j % 20) == 0} {
|
||||
catch {
|
||||
$slave debug restart
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stop_bg_complex_data $load_handle0
|
||||
stop_bg_complex_data $load_handle1
|
||||
stop_bg_complex_data $load_handle2
|
||||
|
||||
# Wait for the slave to reach the "online"
|
||||
# state from the POV of the master.
|
||||
set retry 5000
|
||||
while {$retry} {
|
||||
set info [$master info]
|
||||
if {[string match {*slave0:*state=online*} $info]} {
|
||||
break
|
||||
} else {
|
||||
incr retry -1
|
||||
after 100
|
||||
}
|
||||
}
|
||||
if {$retry == 0} {
|
||||
error "assertion:Slave not correctly synchronized"
|
||||
}
|
||||
|
||||
# Wait that slave acknowledge it is online so
|
||||
# we are sure that DBSIZE and DEBUG DIGEST will not
|
||||
# fail because of timing issues. (-LOADING error)
|
||||
wait_for_condition 5000 100 {
|
||||
[lindex [$slave role] 3] eq {connected}
|
||||
} else {
|
||||
fail "Slave still not connected after some time"
|
||||
}
|
||||
|
||||
set retry 10
|
||||
while {$retry && ([$master debug digest] ne [$slave debug digest])}\
|
||||
{
|
||||
after 1000
|
||||
incr retry -1
|
||||
}
|
||||
assert {[$master dbsize] > 0}
|
||||
|
||||
if {[$master debug digest] ne [$slave debug digest]} {
|
||||
set csv1 [csvdump r]
|
||||
set csv2 [csvdump {r -1}]
|
||||
set fd [open /tmp/repldump1.txt w]
|
||||
puts -nonewline $fd $csv1
|
||||
close $fd
|
||||
set fd [open /tmp/repldump2.txt w]
|
||||
puts -nonewline $fd $csv2
|
||||
close $fd
|
||||
puts "Master - Replica inconsistency"
|
||||
puts "Run diff -u against /tmp/repldump*.txt for more info"
|
||||
}
|
||||
assert_equal [r debug digest] [r -1 debug digest]
|
||||
eval $cond
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
foreach mdl {no yes} {
|
||||
foreach sdl {disabled swapdb} {
|
||||
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
|
||||
} $mdl $sdl 0
|
||||
|
||||
test_psync {ok psync} 6 100000000 3600 0 {
|
||||
assert {[s -1 sync_partial_ok] > 0}
|
||||
} $mdl $sdl 1
|
||||
|
||||
test_psync {no backlog} 6 100 3600 0.5 {
|
||||
assert {[s -1 sync_partial_err] > 0}
|
||||
} $mdl $sdl 1
|
||||
|
||||
test_psync {ok after delay} 3 100000000 3600 3 {
|
||||
assert {[s -1 sync_partial_ok] > 0}
|
||||
} $mdl $sdl 1
|
||||
|
||||
test_psync {backlog expired} 3 100000000 1 3 {
|
||||
assert {[s -1 sync_partial_err] > 0}
|
||||
} $mdl $sdl 1
|
||||
}
|
||||
}
|
@ -47,6 +47,7 @@ set ::all_tests {
|
||||
integration/replication-3
|
||||
integration/replication-4
|
||||
integration/replication-psync
|
||||
integration/replication-psync-flash
|
||||
integration/replication-active
|
||||
integration/replication-multimaster
|
||||
integration/replication-multimaster-connect
|
||||
|
Loading…
x
Reference in New Issue
Block a user