Fix PSYNC test crashes

This commit is contained in:
John Sully 2022-05-04 01:37:40 +00:00
parent 43886c1d61
commit ec6378767d
6 changed files with 28 additions and 20 deletions

View File

@ -97,6 +97,7 @@ robj *createRawStringObject(const char *ptr, size_t len) {
* allocated in the same chunk as the object itself. */
robj *createEmbeddedStringObject(const char *ptr, size_t len) {
serverAssert(len <= UINT8_MAX);
// Note: If the size changes update serializeStoredStringObject
size_t allocsize = sizeof(struct sdshdr8)+len+1;
if (allocsize < sizeof(void*))
allocsize = sizeof(void*);
@ -1585,20 +1586,13 @@ sds serializeStoredStringObject(sds str, robj_roptr o)
static_assert((sizeof(robj) + sizeof(mvcc)) == sizeof(redisObjectStack), "");
switch (o->encoding)
{
case OBJ_ENCODING_EMBSTR:
case OBJ_ENCODING_RAW:
str = sdscatsds(str, (sds)szFromObj(o));
break;
case OBJ_ENCODING_INT:
break; //nop
case OBJ_ENCODING_EMBSTR:
size_t cb = zmalloc_size(o.unsafe_robjcast());
if (cb > sizeof(robj))
{
str = sdscatlen(str, o.unsafe_robjcast() + 1, cb - sizeof(robj));
}
break;
}
return str;
@ -1618,7 +1612,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
break;
case OBJ_ENCODING_EMBSTR:
newObject = createEmbeddedStringObject(szFromObj(oT), sdslen(szFromObj(oT)));
newObject = createEmbeddedStringObject(data+sizeof(robj)+sizeof(mvcc), cb-sizeof(robj)-sizeof(uint64_t));
break;
case OBJ_ENCODING_RAW:

View File

@ -1580,6 +1580,8 @@ void *rdbSaveThread(void *vargs)
ssize_t cbStart = zmalloc_used_memory();
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]);
args->~rdbSaveThreadArgs();
zfree(args);
ssize_t cbDiff = (cbStart - (ssize_t)zmalloc_used_memory());
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
@ -1640,7 +1642,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
} else
{
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zcalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
// Placement new
args = new (args) rdbSaveThreadArgs();
rdbSaveInfo rsiT;
if (rsi == nullptr)
rsi = &rsiT;
@ -1656,6 +1658,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
if (pthread_create(&child, NULL, rdbSaveThread, args)) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
args->~rdbSaveThreadArgs();
zfree(args);
return C_ERR;
}
@ -3201,7 +3204,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
* are required to skip AUX fields they don't understand.
*
* An AUX field is composed of two strings: key and value. */
robj *auxkey, *auxval;
robj *auxkey = nullptr, *auxval = nullptr;
if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
@ -3221,12 +3224,16 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
} else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) {
if (rsi) {
MasterSaveInfo msi;
char *masters = szFromObj(auxval);
char *saveptr;
char *entry = strtok_r(masters, ":", &saveptr);
while (entry != NULL) {
memcpy(msi.master_replid, entry, sizeof(msi.master_replid));
MasterSaveInfo msi;
bool fSet = true;
if (strlen(entry) == sizeof(msi.master_replid)-1)
memcpy(msi.master_replid, entry, sizeof(msi.master_replid));
else
fSet = false;
entry = strtok_r(NULL, ":", &saveptr);
if (entry == nullptr) break;
msi.master_initial_offset = atoll(entry);
@ -3240,7 +3247,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
if (entry == nullptr) break;
msi.selected_db = atoi(entry);
entry = strtok_r(NULL, ":", &saveptr);
rsi->addMaster(msi);
if (fSet)
rsi->addMaster(msi);
}
}
} else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) {
@ -3752,6 +3760,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
aeThreadOffline();
close(args->safe_to_exit_pipe);
args->rsi.~rdbSaveInfo();
zfree(args);
g_pserver->rdbThreadVars.fDone = true;
return (retval == C_OK) ? (void*)0 : (void*)1;
@ -3784,7 +3793,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
args->rdb_pipe_write = pipefds[1]; /* write end */
anetNonBlock(NULL, g_pserver->rdb_pipe_read);
args->rsi = *(new (args) rdbSaveInfo(*rsi));
args->rsi = *(new (&args->rsi) rdbSaveInfo(*rsi));
memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid));
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
@ -3842,6 +3851,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
g_pserver->rdb_pipe_conns = NULL;
g_pserver->rdb_pipe_numconns = 0;
g_pserver->rdb_pipe_numconns_writing = 0;
args->rsi.~rdbSaveInfo();
zfree(args);
closeChildInfoPipe();
return C_ERR;

View File

@ -6694,10 +6694,10 @@ static void sigShutdownHandler(int sig) {
serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum, 1);
g_pserver->garbageCollector.shutdown();
exit(1); /* Exit with an error since this was not a clean shutdown. */
_Exit(1); /* Exit with an error since this was not a clean shutdown. */
} else if (g_pserver->loading) {
serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now.");
exit(0);
_Exit(0); // calling dtors is undesirable, exit immediately
}
serverLogFromHandler(LL_WARNING, msg);

View File

@ -1888,7 +1888,9 @@ struct redisMaster {
};
struct MasterSaveInfo {
MasterSaveInfo() = default;
MasterSaveInfo() {
memcpy(master_replid, "0000000000000000000000000000000000000000", sizeof(master_replid));
}
MasterSaveInfo(const redisMaster &mi) {
memcpy(master_replid, mi.master_replid, sizeof(mi.master_replid));
if (mi.master) {
@ -1935,6 +1937,7 @@ public:
repl_stream_db = -1;
repl_id_is_set = 0;
memcpy(repl_id, "0000000000000000000000000000000000000000", sizeof(repl_id));
repl_id[CONFIG_RUN_ID_SIZE] = '\0';
repl_offset = -1;
fForceSetKey = TRUE;
mvccMinThreshold = 0;

View File

@ -61,6 +61,7 @@ set ::all_tests {
integration/keydb-cli
integration/keydb-benchmark
integration/replication-fast
integration/replication-psync-multimaster
unit/pubsub
unit/slowlog
unit/scripting