Merge remote-tracking branch 'internal/main'
This commit is contained in:
commit
486a6bb7b1
@ -21,13 +21,13 @@ machamp:
|
|||||||
parent: make-build
|
parent: make-build
|
||||||
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
||||||
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
||||||
command: ./runtest --clients 4 --verbose
|
command: ./runtest --clients 4 --verbose --tls
|
||||||
cluster-test:
|
cluster-test:
|
||||||
type: cmd
|
type: cmd
|
||||||
parent: make-build
|
parent: make-build
|
||||||
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
# https://github.sc-corp.net/Snapchat/img/tree/master/keydb/ubuntu-20-04
|
||||||
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
builder_image: us.gcr.io/snapchat-build-artifacts/prod/snapchat/img/keydb/keydb-ubuntu-20-04@sha256:cf869a3f5d1de1e1d976bb906689c37b7031938eb68661b844a38c532f27248c
|
||||||
command: ./runtest-cluster
|
command: ./runtest-cluster --tls
|
||||||
sentinel-test:
|
sentinel-test:
|
||||||
type: cmd
|
type: cmd
|
||||||
parent: make-build
|
parent: make-build
|
||||||
|
@ -136,14 +136,33 @@ void SnapshotPayloadParseState::flushQueuedKeys() {
|
|||||||
int idb = current_database;
|
int idb = current_database;
|
||||||
serverAssert(vecqueuedKeys.size() == vecqueuedVals.size());
|
serverAssert(vecqueuedKeys.size() == vecqueuedVals.size());
|
||||||
auto sizePrev = vecqueuedKeys.size();
|
auto sizePrev = vecqueuedKeys.size();
|
||||||
(*insertsInFlight)++;
|
|
||||||
std::weak_ptr<std::atomic<int>> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
|
|
||||||
if (current_database < cserver.dbnum) {
|
if (current_database < cserver.dbnum) {
|
||||||
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
|
if (g_pserver->m_pstorageFactory) {
|
||||||
g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
|
(*insertsInFlight)++;
|
||||||
(*(insertsInFlightTmp.lock()))--;
|
std::weak_ptr<std::atomic<int>> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
|
||||||
delete pallocator;
|
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
|
||||||
});
|
g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
|
||||||
|
(*(insertsInFlightTmp.lock()))--;
|
||||||
|
delete pallocator;
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
for (size_t ival = 0; ival < vecqueuedKeys.size(); ++ival) {
|
||||||
|
size_t offset = 0;
|
||||||
|
auto spexpire = deserializeExpire(vecqueuedVals[ival], vecqueuedValsCb[ival], &offset);
|
||||||
|
auto o = deserializeStoredObject(vecqueuedVals[ival] + offset, vecqueuedValsCb[ival] - offset);
|
||||||
|
sds sdsKey = sdsnewlen(vecqueuedKeys[ival], -static_cast<ssize_t>(vecqueuedKeysCb[ival]));
|
||||||
|
if (dbMerge(g_pserver->db[idb], sdsKey, o, false)) {
|
||||||
|
if (spexpire != nullptr)
|
||||||
|
g_pserver->db[idb]->setExpire(sdsKey, std::move(*spexpire));
|
||||||
|
} else {
|
||||||
|
sdsfree(sdsKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
vecqueuedKeys.clear();
|
||||||
|
vecqueuedKeysCb.clear();
|
||||||
|
vecqueuedVals.clear();
|
||||||
|
vecqueuedValsCb.clear();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// else drop the data
|
// else drop the data
|
||||||
vecqueuedKeys.clear();
|
vecqueuedKeys.clear();
|
||||||
|
@ -2983,6 +2983,7 @@ standardConfig configs[] = {
|
|||||||
createIntConfig("overload-protect-percent", NULL, MODIFIABLE_CONFIG, 0, 200, g_pserver->overload_protect_threshold, 0, INTEGER_CONFIG, NULL, NULL),
|
createIntConfig("overload-protect-percent", NULL, MODIFIABLE_CONFIG, 0, 200, g_pserver->overload_protect_threshold, 0, INTEGER_CONFIG, NULL, NULL),
|
||||||
createIntConfig("force-eviction-percent", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->force_eviction_percent, 0, INTEGER_CONFIG, NULL, NULL),
|
createIntConfig("force-eviction-percent", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->force_eviction_percent, 0, INTEGER_CONFIG, NULL, NULL),
|
||||||
createBoolConfig("enable-async-rehash", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_rehash, 1, NULL, NULL),
|
createBoolConfig("enable-async-rehash", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_rehash, 1, NULL, NULL),
|
||||||
|
createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 0, NULL, NULL),
|
||||||
|
|
||||||
#ifdef USE_OPENSSL
|
#ifdef USE_OPENSSL
|
||||||
createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, updateTLSPort), /* TCP port. */
|
createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, updateTLSPort), /* TCP port. */
|
||||||
|
@ -56,7 +56,6 @@ int expireIfNeeded(redisDb *db, robj *key, robj *o);
|
|||||||
void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
|
void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
|
||||||
|
|
||||||
std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset);
|
std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset);
|
||||||
sds serializeStoredObjectAndExpire(robj_roptr o);
|
|
||||||
|
|
||||||
dictType dictChangeDescType {
|
dictType dictChangeDescType {
|
||||||
dictSdsHash, /* hash function */
|
dictSdsHash, /* hash function */
|
||||||
|
@ -163,7 +163,6 @@ client *createClient(connection *conn, int iel) {
|
|||||||
c->multibulklen = 0;
|
c->multibulklen = 0;
|
||||||
c->bulklen = -1;
|
c->bulklen = -1;
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
c->sentlenAsync = 0;
|
|
||||||
c->flags = 0;
|
c->flags = 0;
|
||||||
c->fPendingAsyncWrite = FALSE;
|
c->fPendingAsyncWrite = FALSE;
|
||||||
c->fPendingAsyncWriteHandler = FALSE;
|
c->fPendingAsyncWriteHandler = FALSE;
|
||||||
|
@ -2524,7 +2524,10 @@ cleanup:
|
|||||||
|
|
||||||
static int clusterManagerNodeConnect(clusterManagerNode *node) {
|
static int clusterManagerNodeConnect(clusterManagerNode *node) {
|
||||||
if (node->context) redisFree(node->context);
|
if (node->context) redisFree(node->context);
|
||||||
node->context = redisConnect(node->ip, node->port);
|
struct timeval tv;
|
||||||
|
tv.tv_sec = config.cluster_manager_command.timeout / 1000;
|
||||||
|
tv.tv_usec = (config.cluster_manager_command.timeout % 1000) * 1000;
|
||||||
|
node->context = redisConnectWithTimeout(node->ip, node->port, tv);
|
||||||
if (!node->context->err && config.tls) {
|
if (!node->context->err && config.tls) {
|
||||||
const char *err = NULL;
|
const char *err = NULL;
|
||||||
if (cliSecureConnection(node->context, config.sslconfig, &err) == REDIS_ERR && err) {
|
if (cliSecureConnection(node->context, config.sslconfig, &err) == REDIS_ERR && err) {
|
||||||
@ -5626,7 +5629,10 @@ static int clusterManagerCommandImport(int argc, char **argv) {
|
|||||||
char *reply_err = NULL;
|
char *reply_err = NULL;
|
||||||
redisReply *src_reply = NULL;
|
redisReply *src_reply = NULL;
|
||||||
// Connect to the source node.
|
// Connect to the source node.
|
||||||
redisContext *src_ctx = redisConnect(src_ip, src_port);
|
struct timeval tv;
|
||||||
|
tv.tv_sec = config.cluster_manager_command.timeout / 1000;
|
||||||
|
tv.tv_usec = (config.cluster_manager_command.timeout % 1000) * 1000;
|
||||||
|
redisContext *src_ctx = redisConnectWithTimeout(src_ip, src_port, tv);
|
||||||
if (src_ctx->err) {
|
if (src_ctx->err) {
|
||||||
success = 0;
|
success = 0;
|
||||||
fprintf(stderr,"Could not connect to KeyDB at %s:%d: %s.\n", src_ip,
|
fprintf(stderr,"Could not connect to KeyDB at %s:%d: %s.\n", src_ip,
|
||||||
|
@ -1019,8 +1019,6 @@ public:
|
|||||||
void addReplica(client *replica) {
|
void addReplica(client *replica) {
|
||||||
replicas.push_back(replica);
|
replicas.push_back(replica);
|
||||||
replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset());
|
replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset());
|
||||||
// Optimize the socket for bulk transfer
|
|
||||||
//connDisableTcpNoDelay(replica->conn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isActive() const { return !replicas.empty(); }
|
bool isActive() const { return !replicas.empty(); }
|
||||||
@ -1093,7 +1091,7 @@ public:
|
|||||||
reply->used += size;
|
reply->used += size;
|
||||||
}
|
}
|
||||||
|
|
||||||
void addLongWithPrefix(long val, char prefix) {
|
void addLongLongWithPrefix(long long val, char prefix) {
|
||||||
char buf[128];
|
char buf[128];
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
@ -1105,15 +1103,19 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void addArrayLen(int len) {
|
void addArrayLen(int len) {
|
||||||
addLongWithPrefix(len, '*');
|
addLongLongWithPrefix(len, '*');
|
||||||
}
|
}
|
||||||
|
|
||||||
void addLong(long val) {
|
void addLong(long val) {
|
||||||
addLongWithPrefix(val, ':');
|
addLongLongWithPrefix(val, ':');
|
||||||
|
}
|
||||||
|
|
||||||
|
void addLongLong(long long val) {
|
||||||
|
addLongLongWithPrefix(val, ':');
|
||||||
}
|
}
|
||||||
|
|
||||||
void addString(const char *s, unsigned long len) {
|
void addString(const char *s, unsigned long len) {
|
||||||
addLongWithPrefix(len, '$');
|
addLongLongWithPrefix(len, '$');
|
||||||
addData(s, len);
|
addData(s, len);
|
||||||
addData("\r\n", 2);
|
addData("\r\n", 2);
|
||||||
}
|
}
|
||||||
@ -1149,22 +1151,19 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
|
|||||||
// TODO: This needs to be on a background thread
|
// TODO: This needs to be on a background thread
|
||||||
int retval = C_OK;
|
int retval = C_OK;
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
serverLog(LL_NOTICE, "Starting storage provider fast full sync with target: %s", "disk");
|
serverLog(LL_NOTICE, "Starting fast full sync with target: %s", "disk");
|
||||||
|
|
||||||
std::shared_ptr<replicationBuffer> spreplBuf = std::make_shared<replicationBuffer>();
|
std::shared_ptr<replicationBuffer> spreplBuf = std::make_shared<replicationBuffer>();
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
client *replica = nullptr;
|
|
||||||
listRewind(g_pserver->slaves, &li);
|
listRewind(g_pserver->slaves, &li);
|
||||||
while (replica == nullptr && (ln = listNext(&li))) {
|
while ((ln = listNext(&li))) {
|
||||||
client *replicaCur = (client*)listNodeValue(ln);
|
client *replicaCur = (client*)listNodeValue(ln);
|
||||||
if ((replicaCur->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) {
|
if ((replicaCur->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) {
|
||||||
replica = replicaCur;
|
spreplBuf->addReplica(replicaCur);
|
||||||
spreplBuf->addReplica(replica);
|
|
||||||
replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX;
|
replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
serverAssert(replica != nullptr);
|
|
||||||
|
|
||||||
spreplBuf->addArrayLen(2); // Two sections: Metadata and databases
|
spreplBuf->addArrayLen(2); // Two sections: Metadata and databases
|
||||||
|
|
||||||
@ -1179,7 +1178,7 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
|
|||||||
spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE);
|
spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE);
|
||||||
spreplBuf->addArrayLen(2);
|
spreplBuf->addArrayLen(2);
|
||||||
spreplBuf->addString("repl-offset", 11);
|
spreplBuf->addString("repl-offset", 11);
|
||||||
spreplBuf->addLong(rsi->master_repl_offset);
|
spreplBuf->addLongLong(rsi->master_repl_offset);
|
||||||
|
|
||||||
if (dictSize(g_pserver->lua_scripts)) {
|
if (dictSize(g_pserver->lua_scripts)) {
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
@ -1195,13 +1194,13 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
|
|||||||
di = NULL; /* So that we don't release it again on error. */
|
di = NULL; /* So that we don't release it again on error. */
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<std::vector<std::unique_ptr<const StorageCache>>> spvecspsnapshot = std::make_shared<std::vector<std::unique_ptr<const StorageCache>>>();
|
std::shared_ptr<std::vector<const redisDbPersistentDataSnapshot*>> spvecsnapshot = std::make_shared<std::vector<const redisDbPersistentDataSnapshot*>>();
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
spvecspsnapshot->emplace_back(g_pserver->db[idb]->CloneStorageCache());
|
spvecsnapshot->emplace_back(g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false));
|
||||||
}
|
}
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
|
|
||||||
g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecspsnapshot = std::move(spvecspsnapshot)]{
|
g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecsnapshot = std::move(spvecsnapshot)]{
|
||||||
int retval = C_OK;
|
int retval = C_OK;
|
||||||
auto timeStart = ustime();
|
auto timeStart = ustime();
|
||||||
auto lastLogTime = timeStart;
|
auto lastLogTime = timeStart;
|
||||||
@ -1211,15 +1210,16 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
|
|||||||
// Databases
|
// Databases
|
||||||
replBuf.addArrayLen(cserver.dbnum);
|
replBuf.addArrayLen(cserver.dbnum);
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
||||||
auto &spsnapshot = (*spvecspsnapshot)[idb];
|
auto &spsnapshot = (*spvecsnapshot)[idb];
|
||||||
size_t snapshotDeclaredCount = spsnapshot->count();
|
size_t snapshotDeclaredCount = spsnapshot->size();
|
||||||
replBuf.addArrayLen(snapshotDeclaredCount);
|
replBuf.addArrayLen(snapshotDeclaredCount);
|
||||||
size_t count = 0;
|
size_t count = 0;
|
||||||
bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{
|
bool result = spsnapshot->iterate_threadsafe([&replBuf, &count, &cbData, &lastLogTime, &cbLastUpdate](const char *strKey, robj_roptr o) -> bool{
|
||||||
replBuf.addArrayLen(2);
|
replBuf.addArrayLen(2);
|
||||||
|
|
||||||
replBuf.addString(rgchKey, cchKey);
|
replBuf.addString(strKey, sdslen(strKey));
|
||||||
replBuf.addString((const char *)rgchVal, cchVal);
|
sds strT = serializeStoredObjectAndExpire(o);
|
||||||
|
replBuf.addString(strT, sdslen(strT));
|
||||||
++count;
|
++count;
|
||||||
if ((count % 8092) == 0) {
|
if ((count % 8092) == 0) {
|
||||||
auto curTime = ustime();
|
auto curTime = ustime();
|
||||||
@ -1230,7 +1230,8 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
|
|||||||
lastLogTime = ustime();
|
lastLogTime = ustime();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cbData += cchKey + cchVal;
|
cbData += sdslen(strKey) + sdslen(strT);
|
||||||
|
sdsfree(strT);
|
||||||
return replBuf.isActive();
|
return replBuf.isActive();
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -1302,7 +1303,7 @@ int startBgsaveForReplication(int mincapa) {
|
|||||||
/* Only do rdbSave* when rsiptr is not NULL,
|
/* Only do rdbSave* when rsiptr is not NULL,
|
||||||
* otherwise replica will miss repl-stream-db. */
|
* otherwise replica will miss repl-stream-db. */
|
||||||
if (rsiptr) {
|
if (rsiptr) {
|
||||||
if (mincapa & SLAVE_CAPA_ROCKSDB_SNAPSHOT && g_pserver->m_pstorageFactory)
|
if (mincapa & SLAVE_CAPA_KEYDB_FASTSYNC && FFastSyncEnabled())
|
||||||
retval = rdbSaveSnapshotForReplication(rsiptr);
|
retval = rdbSaveSnapshotForReplication(rsiptr);
|
||||||
else if (socket_target)
|
else if (socket_target)
|
||||||
retval = rdbSaveToSlavesSockets(rsiptr);
|
retval = rdbSaveToSlavesSockets(rsiptr);
|
||||||
@ -1481,7 +1482,7 @@ void syncCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* CASE 0: Fast Sync */
|
/* CASE 0: Fast Sync */
|
||||||
if ((c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && g_pserver->m_pstorageFactory) {
|
if (c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC && FFastSyncEnabled()) {
|
||||||
serverLog(LL_NOTICE,"Fast SYNC on next replication cycle");
|
serverLog(LL_NOTICE,"Fast SYNC on next replication cycle");
|
||||||
/* CASE 1: BGSAVE is in progress, with disk target. */
|
/* CASE 1: BGSAVE is in progress, with disk target. */
|
||||||
} else if (g_pserver->FRdbSaveInProgress() &&
|
} else if (g_pserver->FRdbSaveInProgress() &&
|
||||||
@ -1672,8 +1673,8 @@ void replconfCommand(client *c) {
|
|||||||
c->slave_capa |= SLAVE_CAPA_PSYNC2;
|
c->slave_capa |= SLAVE_CAPA_PSYNC2;
|
||||||
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
|
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
|
||||||
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
|
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
|
||||||
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "rocksdb-snapshot-load"))
|
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "keydb-fastsync"))
|
||||||
c->slave_capa |= SLAVE_CAPA_ROCKSDB_SNAPSHOT;
|
c->slave_capa |= SLAVE_CAPA_KEYDB_FASTSYNC;
|
||||||
|
|
||||||
fCapaCommand = true;
|
fCapaCommand = true;
|
||||||
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
|
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
|
||||||
@ -1738,10 +1739,14 @@ void replconfCommand(client *c) {
|
|||||||
|
|
||||||
if (fCapaCommand) {
|
if (fCapaCommand) {
|
||||||
sds reply = sdsnew("+OK");
|
sds reply = sdsnew("+OK");
|
||||||
if (g_pserver->fActiveReplica)
|
if (g_pserver->fActiveReplica) {
|
||||||
reply = sdscat(reply, " active-replica");
|
reply = sdscat(reply, " active-replica");
|
||||||
if (g_pserver->m_pstorageFactory && (c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && !g_pserver->fActiveReplica)
|
}
|
||||||
reply = sdscat(reply, " rocksdb-snapshot-save");
|
if ((c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && FFastSyncEnabled()) {
|
||||||
|
reply = sdscat(reply, " keydb-fastsync-save");
|
||||||
|
} else {
|
||||||
|
c->slave_capa = (c->slave_capa & (~SLAVE_CAPA_KEYDB_FASTSYNC)); // never try to fast sync for this as they won't expect it
|
||||||
|
}
|
||||||
reply = sdscat(reply, "\r\n");
|
reply = sdscat(reply, "\r\n");
|
||||||
addReplySds(c, reply);
|
addReplySds(c, reply);
|
||||||
} else {
|
} else {
|
||||||
@ -2515,6 +2520,7 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
serverAssert(mi->parseState != nullptr);
|
||||||
for (int iter = 0; iter < 10; ++iter) {
|
for (int iter = 0; iter < 10; ++iter) {
|
||||||
if (mi->parseState->shouldThrottle())
|
if (mi->parseState->shouldThrottle())
|
||||||
return false;
|
return false;
|
||||||
@ -2525,11 +2531,16 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
|
|||||||
|
|
||||||
auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen);
|
auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen);
|
||||||
if (nread <= 0) {
|
if (nread <= 0) {
|
||||||
if (connGetState(conn) != CONN_STATE_CONNECTED)
|
if (connGetState(conn) != CONN_STATE_CONNECTED) {
|
||||||
|
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
|
||||||
|
(nread == -1) ? strerror(errno) : "connection lost");
|
||||||
cancelReplicationHandshake(mi, true);
|
cancelReplicationHandshake(mi, true);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
g_pserver->stat_net_input_bytes += nread;
|
||||||
mi->repl_transfer_lastio = g_pserver->unixtime;
|
mi->repl_transfer_lastio = g_pserver->unixtime;
|
||||||
|
mi->repl_transfer_read += nread;
|
||||||
sdsIncrLen(mi->bulkreadBuffer,nread);
|
sdsIncrLen(mi->bulkreadBuffer,nread);
|
||||||
|
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
@ -2601,8 +2612,9 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
|
|||||||
rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db");
|
rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db");
|
||||||
rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset");
|
rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset");
|
||||||
sds str = mi->parseState->getMetaDataStr("repl-id");
|
sds str = mi->parseState->getMetaDataStr("repl-id");
|
||||||
if (sdslen(str) == CONFIG_RUN_ID_SIZE+1) {
|
if (sdslen(str) == CONFIG_RUN_ID_SIZE) {
|
||||||
memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1);
|
memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1);
|
||||||
|
rsi.repl_id_is_set = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fFinished = true;
|
fFinished = true;
|
||||||
@ -3001,7 +3013,7 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mi->isRocksdbSnapshotRepl) {
|
if (mi->isKeydbFastsync) {
|
||||||
if (!readSnapshotBulkPayload(conn, mi, rsi))
|
if (!readSnapshotBulkPayload(conn, mi, rsi))
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
@ -3014,17 +3026,15 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
|
|
||||||
/* Final setup of the connected slave <- master link */
|
/* Final setup of the connected slave <- master link */
|
||||||
replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db);
|
replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db);
|
||||||
if (mi->isRocksdbSnapshotRepl) {
|
if (mi->isKeydbFastsync) {
|
||||||
/* We need to handle the case where the initial querybuf data was read by fast sync */
|
/* We need to handle the case where the initial querybuf data was read by fast sync */
|
||||||
/* This should match the work readQueryFromClient would do for a master client */
|
/* This should match the work readQueryFromClient would do for a master client */
|
||||||
mi->master->querybuf = sdscatsds(mi->master->querybuf, mi->bulkreadBuffer);
|
mi->master->querybuf = sdscatsds(mi->master->querybuf, mi->bulkreadBuffer);
|
||||||
|
mi->master->pending_querybuf = sdscatsds(mi->master->pending_querybuf, mi->bulkreadBuffer);
|
||||||
|
mi->master->read_reploff += sdslen(mi->bulkreadBuffer);
|
||||||
|
|
||||||
sdsfree(mi->bulkreadBuffer);
|
sdsfree(mi->bulkreadBuffer);
|
||||||
mi->bulkreadBuffer = nullptr;
|
mi->bulkreadBuffer = nullptr;
|
||||||
|
|
||||||
mi->master->pending_querybuf = sdscatlen(mi->master->pending_querybuf,
|
|
||||||
mi->master->querybuf,sdslen(mi->master->querybuf));
|
|
||||||
|
|
||||||
mi->master->read_reploff += sdslen(mi->master->querybuf);
|
|
||||||
}
|
}
|
||||||
mi->repl_transfer_s = nullptr;
|
mi->repl_transfer_s = nullptr;
|
||||||
mi->repl_state = REPL_STATE_CONNECTED;
|
mi->repl_state = REPL_STATE_CONNECTED;
|
||||||
@ -3063,13 +3073,13 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Send the initial ACK immediately to put this replica in online state. */
|
/* Send the initial ACK immediately to put this replica in online state. */
|
||||||
if (usemark || mi->isRocksdbSnapshotRepl) replicationSendAck(mi);
|
if (usemark || mi->isKeydbFastsync) replicationSendAck(mi);
|
||||||
|
|
||||||
/* Restart the AOF subsystem now that we finished the sync. This
|
/* Restart the AOF subsystem now that we finished the sync. This
|
||||||
* will trigger an AOF rewrite, and when done will start appending
|
* will trigger an AOF rewrite, and when done will start appending
|
||||||
* to the new file. */
|
* to the new file. */
|
||||||
if (g_pserver->aof_enabled) restartAOFAfterSYNC();
|
if (g_pserver->aof_enabled) restartAOFAfterSYNC();
|
||||||
if (mi->isRocksdbSnapshotRepl)
|
if (mi->isKeydbFastsync)
|
||||||
readQueryFromClient(conn); // There may be querybuf data we just appeneded
|
readQueryFromClient(conn); // There may be querybuf data we just appeneded
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -3395,15 +3405,15 @@ void parseMasterCapa(redisMaster *mi, sds strcapa)
|
|||||||
char *pchEnd = szStart;
|
char *pchEnd = szStart;
|
||||||
|
|
||||||
mi->isActive = false;
|
mi->isActive = false;
|
||||||
mi->isRocksdbSnapshotRepl = false;
|
mi->isKeydbFastsync = false;
|
||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
if (*pchEnd == ' ' || *pchEnd == '\0') {
|
if (*pchEnd == ' ' || *pchEnd == '\0') {
|
||||||
// Parse the word
|
// Parse the word
|
||||||
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
|
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
|
||||||
mi->isActive = true;
|
mi->isActive = true;
|
||||||
} else if (strncmp(szStart, "rocksdb-snapshot-save", pchEnd - szStart) == 0) {
|
} else if (strncmp(szStart, "keydb-fastsync-save", pchEnd - szStart) == 0) {
|
||||||
mi->isRocksdbSnapshotRepl = true;
|
mi->isKeydbFastsync = true;
|
||||||
}
|
}
|
||||||
szStart = pchEnd + 1;
|
szStart = pchEnd + 1;
|
||||||
}
|
}
|
||||||
@ -3550,9 +3560,9 @@ retry_connect:
|
|||||||
"capa","psync2",
|
"capa","psync2",
|
||||||
"capa","activeExpire",
|
"capa","activeExpire",
|
||||||
};
|
};
|
||||||
if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) {
|
if (FFastSyncEnabled() && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) {
|
||||||
veccapabilities.push_back("capa");
|
veccapabilities.push_back("capa");
|
||||||
veccapabilities.push_back("rocksdb-snapshot-load");
|
veccapabilities.push_back("keydb-fastsync");
|
||||||
}
|
}
|
||||||
|
|
||||||
err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr);
|
err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr);
|
||||||
@ -3864,6 +3874,10 @@ int cancelReplicationHandshake(redisMaster *mi, int reconnect) {
|
|||||||
delete mi->parseState;
|
delete mi->parseState;
|
||||||
mi->parseState = nullptr;
|
mi->parseState = nullptr;
|
||||||
}
|
}
|
||||||
|
if (mi->bulkreadBuffer) {
|
||||||
|
sdsfree(mi->bulkreadBuffer);
|
||||||
|
mi->bulkreadBuffer = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
if (mi->repl_state == REPL_STATE_TRANSFER) {
|
if (mi->repl_state == REPL_STATE_TRANSFER) {
|
||||||
replicationAbortSyncTransfer(mi);
|
replicationAbortSyncTransfer(mi);
|
||||||
@ -4329,7 +4343,6 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
|
|||||||
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
||||||
listEmpty(c->reply);
|
listEmpty(c->reply);
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
c->sentlenAsync = 0;
|
|
||||||
c->reply_bytes = 0;
|
c->reply_bytes = 0;
|
||||||
c->bufpos = 0;
|
c->bufpos = 0;
|
||||||
resetClient(c);
|
resetClient(c);
|
||||||
|
@ -418,7 +418,10 @@ public:
|
|||||||
sdsstring &operator=(const sdsstring &other)
|
sdsstring &operator=(const sdsstring &other)
|
||||||
{
|
{
|
||||||
sdsfree(m_str);
|
sdsfree(m_str);
|
||||||
m_str = sdsdup(other.m_str);
|
if (other.m_str != nullptr)
|
||||||
|
m_str = sdsdup(other.m_str);
|
||||||
|
else
|
||||||
|
m_str = nullptr;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5102,8 +5102,8 @@ int processCommand(client *c, int callFlags) {
|
|||||||
|
|
||||||
sds str = sdsempty();
|
sds str = sdsempty();
|
||||||
for (int j = 0; j < numkeys; j++) {
|
for (int j = 0; j < numkeys; j++) {
|
||||||
sdscatsds(str, (sds)ptrFromObj(c->argv[keyindex[j]]));
|
str = sdscatsds(str, (sds)ptrFromObj(c->argv[keyindex[j]]));
|
||||||
sdscat(str, " ");
|
str = sdscat(str, " ");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numkeys > 0)
|
if (numkeys > 0)
|
||||||
|
11
src/server.h
11
src/server.h
@ -620,7 +620,7 @@ typedef enum {
|
|||||||
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
|
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
|
||||||
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
|
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
|
||||||
#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */
|
#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */
|
||||||
#define SLAVE_CAPA_ROCKSDB_SNAPSHOT (1<<3)
|
#define SLAVE_CAPA_KEYDB_FASTSYNC (1<<3)
|
||||||
|
|
||||||
/* Synchronous read timeout - replica side */
|
/* Synchronous read timeout - replica side */
|
||||||
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
|
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
|
||||||
@ -1610,7 +1610,6 @@ struct client {
|
|||||||
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
|
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
|
||||||
size_t sentlen; /* Amount of bytes already sent in the current
|
size_t sentlen; /* Amount of bytes already sent in the current
|
||||||
buffer or object being sent. */
|
buffer or object being sent. */
|
||||||
size_t sentlenAsync; /* same as sentlen buf for async buffers (which are a different stream) */
|
|
||||||
time_t ctime; /* Client creation time. */
|
time_t ctime; /* Client creation time. */
|
||||||
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
|
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
|
||||||
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
||||||
@ -1871,7 +1870,7 @@ struct redisMaster {
|
|||||||
long long master_initial_offset; /* Master PSYNC offset. */
|
long long master_initial_offset; /* Master PSYNC offset. */
|
||||||
|
|
||||||
bool isActive = false;
|
bool isActive = false;
|
||||||
bool isRocksdbSnapshotRepl = false;
|
bool isKeydbFastsync = false;
|
||||||
int repl_state; /* Replication status if the instance is a replica */
|
int repl_state; /* Replication status if the instance is a replica */
|
||||||
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
|
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
|
||||||
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
|
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
|
||||||
@ -2688,6 +2687,7 @@ struct redisServer {
|
|||||||
|
|
||||||
int fActiveReplica; /* Can this replica also be a master? */
|
int fActiveReplica; /* Can this replica also be a master? */
|
||||||
int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */
|
int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */
|
||||||
|
int fEnableFastSync = false;
|
||||||
|
|
||||||
// Format:
|
// Format:
|
||||||
// Lower 20 bits: a counter incrementing for each command executed in the same millisecond
|
// Lower 20 bits: a counter incrementing for each command executed in the same millisecond
|
||||||
@ -3182,6 +3182,7 @@ void trimStringObjectIfNeeded(robj *o);
|
|||||||
robj *deserializeStoredObject(const void *data, size_t cb);
|
robj *deserializeStoredObject(const void *data, size_t cb);
|
||||||
std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset);
|
std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset);
|
||||||
sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr);
|
sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr);
|
||||||
|
sds serializeStoredObjectAndExpire(robj_roptr o);
|
||||||
|
|
||||||
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
|
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
|
||||||
|
|
||||||
@ -3971,6 +3972,10 @@ inline int ielFromEventLoop(const aeEventLoop *eventLoop)
|
|||||||
return iel;
|
return iel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline bool FFastSyncEnabled() {
|
||||||
|
return g_pserver->fEnableFastSync && !g_pserver->fActiveReplica;
|
||||||
|
}
|
||||||
|
|
||||||
inline int FCorrectThread(client *c)
|
inline int FCorrectThread(client *c)
|
||||||
{
|
{
|
||||||
return (c->conn == nullptr)
|
return (c->conn == nullptr)
|
||||||
|
@ -584,6 +584,7 @@ start_server {tags {"repl"}} {
|
|||||||
set master [srv 0 client]
|
set master [srv 0 client]
|
||||||
$master config set repl-diskless-sync yes
|
$master config set repl-diskless-sync yes
|
||||||
$master config set repl-diskless-sync-delay 1
|
$master config set repl-diskless-sync-delay 1
|
||||||
|
$master config set enable-keydb-fastsync no
|
||||||
set master_host [srv 0 host]
|
set master_host [srv 0 host]
|
||||||
set master_port [srv 0 port]
|
set master_port [srv 0 port]
|
||||||
set master_pid [srv 0 pid]
|
set master_pid [srv 0 pid]
|
||||||
@ -727,6 +728,8 @@ start_server {tags {"repl"}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
$master config set enable-keydb-fastsync yes
|
||||||
}
|
}
|
||||||
|
|
||||||
if 0 {
|
if 0 {
|
||||||
@ -867,8 +870,10 @@ test {replicaof right after disconnection} {
|
|||||||
test {Kill rdb child process if its dumping RDB is not useful} {
|
test {Kill rdb child process if its dumping RDB is not useful} {
|
||||||
start_server {tags {"repl"}} {
|
start_server {tags {"repl"}} {
|
||||||
set slave1 [srv 0 client]
|
set slave1 [srv 0 client]
|
||||||
|
$slave1 config set enable-keydb-fastsync no
|
||||||
start_server {} {
|
start_server {} {
|
||||||
set slave2 [srv 0 client]
|
set slave2 [srv 0 client]
|
||||||
|
$slave2 config set enable-keydb-fastsync no
|
||||||
start_server {} {
|
start_server {} {
|
||||||
set master [srv 0 client]
|
set master [srv 0 client]
|
||||||
set master_host [srv 0 host]
|
set master_host [srv 0 host]
|
||||||
|
@ -182,6 +182,8 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
|
|||||||
fail "Replication not started."
|
fail "Replication not started."
|
||||||
}
|
}
|
||||||
|
|
||||||
|
after 100
|
||||||
|
|
||||||
# measure used memory after the slave connected and set maxmemory
|
# measure used memory after the slave connected and set maxmemory
|
||||||
set orig_used [s -1 used_memory]
|
set orig_used [s -1 used_memory]
|
||||||
set orig_client_buf [s -1 mem_clients_normal]
|
set orig_client_buf [s -1 mem_clients_normal]
|
||||||
|
@ -51,7 +51,7 @@ if {$::tls} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test {TLS: Able to connect while with SAN having a comprehensive list} {
|
test {TLS Audit Log: Able to connect while with SAN having a comprehensive list} {
|
||||||
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {dummy.keydb.dev san2.keydb.dev other.keydb.dev}}} {
|
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {dummy.keydb.dev san2.keydb.dev other.keydb.dev}}} {
|
||||||
catch {r PING} e
|
catch {r PING} e
|
||||||
assert_match {PONG} $e
|
assert_match {PONG} $e
|
||||||
@ -65,7 +65,7 @@ if {$::tls} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test {TLS Audit LogTLS: Able to connect while with SAN having a comprehensive list with wildcards} {
|
test {TLS Audit Log: Able to connect while with SAN having a comprehensive list with wildcards} {
|
||||||
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {dummy.* san*.dev other.*}}} {
|
start_server {tags {"tls"} overrides {tls-auditlog-blocklist {dummy.* san*.dev other.*}}} {
|
||||||
catch {r PING} e
|
catch {r PING} e
|
||||||
assert_match {PONG} $e
|
assert_match {PONG} $e
|
||||||
|
Loading…
x
Reference in New Issue
Block a user