Generate RDB with Functions only via redis-cli --functions-rdb (#9968)

This is needed in order to ease the deployment of functions for ephemeral cases, where user
needs to spin up a server with functions pre-loaded.

#### Details:

* Added `--functions-rdb` option to _redis-cli_.
* Functions only rdb via `REPLCONF rdb-filter-only functions`. This is a placeholder for a space
  separated inclusion filter for the RDB. In the future can be `REPLCONF rdb-filter-only
  "functions db:3 key-patten:user*"` and a complementing `rdb-filter-exclude` `REPLCONF`
  can also be added.
* Handle "slave requirements" specification to RDB saving code so we can use the same RDB
  when different slaves express the same requirements (like functions-only) and not share the
  RDB when their requirements differ. This is currently just a flags `int`, but can be extended to
  a more complex structure with various filter fields.
* make sure to support filters only in diskless replication mode (not to override the persistence file),
  we do that by forcing diskless (even if disabled by config)

other changes:
* some refactoring in rdb.c (extract portion of a big function to a sub-function)
* rdb_key_save_delay used in AOFRW too
* sendChildInfo takes the number of updated keys (incremental, rather than absolute)

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
yoav-steinberg 2022-01-02 09:39:01 +02:00 committed by GitHub
parent 888e92eb57
commit 1bf6d6f11e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 286 additions and 131 deletions

View File

@ -1509,6 +1509,10 @@ int rewriteAppendOnlyFileRio(rio *aof) {
updated_time = now;
}
}
/* Delay before next key if required (for testing) */
if (server.rdb_key_save_delay)
debugDelay(server.rdb_key_save_delay);
}
dictReleaseIterator(di);
di = NULL;
@ -1552,7 +1556,7 @@ int rewriteAppendOnlyFile(char *filename) {
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
if (rdbSaveRio(SLAVE_REQ_NONE,&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}

View File

@ -599,7 +599,7 @@ void flushAllDataAndResetRDB(int flags) {
int saved_dirty = server.dirty;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(server.rdb_filename,rsiptr);
rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
server.dirty = saved_dirty;
}

View File

@ -544,7 +544,7 @@ NULL
if (save) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) != C_OK) {
addReplyErrorObject(c,shared.err);
return;
}

View File

@ -408,7 +408,7 @@ void functionDumpCommand(client *c) {
rio payload;
rioInitWithBuffer(&payload, sdsempty());
functionsSaveRio(&payload);
rdbSaveFunctions(&payload);
/* RDB version */
buf[0] = RDB_VERSION & 0xff;

View File

@ -171,6 +171,7 @@ client *createClient(connection *conn) {
c->slave_listening_port = 0;
c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;

216
src/rdb.c
View File

@ -1214,28 +1214,119 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
return io.bytes;
}
int functionsSaveRio(rio *rdb) {
int ret = C_ERR;
ssize_t rdbSaveFunctions(rio *rdb) {
dict *functions = functionsGet();
dictIterator *iter = dictGetIterator(functions);
dictEntry *entry = NULL;
ssize_t written = 0;
ssize_t ret;
while ((entry = dictNext(iter))) {
rdbSaveType(rdb, RDB_OPCODE_FUNCTION);
if ((ret = rdbSaveType(rdb, RDB_OPCODE_FUNCTION)) < 0) goto werr;
written += ret;
functionInfo *fi = dictGetVal(entry);
if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto done;
if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto done;
if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name))) < 0) goto werr;
written += ret;
if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name))) < 0) goto werr;
written += ret;
if (fi->desc) {
if (rdbSaveLen(rdb, 1) == -1) goto done; /* desc exists */
if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto done;
/* desc exists */
if ((ret = rdbSaveLen(rdb, 1)) < 0) goto werr;
written += ret;
if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc))) < 0) goto werr;
written += ret;
} else {
if (rdbSaveLen(rdb, 0) == -1) goto done; /* desc not exists */
/* desc not exists */
if ((ret = rdbSaveLen(rdb, 0)) < 0) goto werr;
written += ret;
}
if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto done;
if ((ret = rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code))) < 0) goto werr;
written += ret;
}
ret = C_OK;
done:
dictReleaseIterator(iter);
return ret;
return written;
werr:
dictReleaseIterator(iter);
return -1;
}
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
dictIterator *di;
dictEntry *de;
ssize_t written = 0;
ssize_t res;
static long long info_updated_time = 0;
size_t processed = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
redisDb *db = server.db + dbid;
dict *d = db->dict;
if (dictSize(d) == 0) return 0;
di = dictGetSafeIterator(d);
/* Write the SELECT DB opcode */
if ((res = rdbSaveType(rdb,RDB_OPCODE_SELECTDB)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, dbid)) < 0) goto werr;
written += res;
/* Write the RESIZE DB opcode. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr;
written += res;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
size_t rdb_bytes_before_key = rdb->processed_bytes;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
written += res;
/* In fork child process, we can try to release memory back to the
* OS and possibly avoid or decrease COW. We give the dismiss
* mechanism a hint about an estimated size of the object we stored. */
size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
/* Update child info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if (((*key_counter)++ & 1023) == 0) {
long long now = mstime();
if (now - info_updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, *key_counter, pname);
info_updated_time = now;
}
}
}
dictReleaseIterator(di);
return written;
werr:
dictReleaseIterator(di);
return -1;
}
/* Produces a dump of the database in RDB format sending it to the specified
@ -1246,87 +1337,30 @@ done:
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
char magic[10];
uint64_t cksum;
size_t processed = 0;
long key_counter = 0;
int j;
long key_count = 0;
long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
if (functionsSaveRio(rdb) != C_OK) goto werr;
/* save functions */
if (rdbSaveFunctions(rdb) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
/* Write the SELECT DB opcode */
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;
/* Write the RESIZE DB opcode. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
size_t rdb_bytes_before_key = rdb->processed_bytes;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire,j) == -1) goto werr;
/* In fork child process, we can try to release memory back to the
* OS and possibly avoid or decrease COW. We give the dismiss
* mechanism a hint about an estimated size of the object we stored. */
size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
/* Update child info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if ((key_count++ & 1023) == 0) {
long long now = mstime();
if (now - info_updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname);
info_updated_time = now;
}
}
/* save all databases, skip this if we're in functions-only mode */
if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY)) {
for (j = 0; j < server.dbnum; j++) {
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
if (!(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
@ -1340,7 +1374,6 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
@ -1352,7 +1385,7 @@ werr:
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
startSaving(RDBFLAGS_REPLICATION);
@ -1361,7 +1394,7 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rdbSaveRio(req,rdb,error,RDBFLAGS_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK;
@ -1374,7 +1407,7 @@ werr: /* Write error. */
}
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
int rdbSave(int req, char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp = NULL;
@ -1401,7 +1434,7 @@ int rdbSave(char *filename, rdbSaveInfo *rsi) {
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
if (rdbSaveRio(req,&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
@ -1444,7 +1477,7 @@ werr:
return C_ERR;
}
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
if (hasActiveChildProcess()) return C_ERR;
@ -1458,7 +1491,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
/* Child */
redisSetProcTitle("redis-rdb-bgsave");
redisSetCpuAffinity(server.bgsave_cpulist);
retval = rdbSave(filename,rsi);
retval = rdbSave(req, filename,rsi);
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
@ -3249,7 +3282,7 @@ void killRDBChild(void) {
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
@ -3288,6 +3321,9 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
/* Check slave has the exact requirements */
if (slave->slave_req != req)
continue;
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
}
@ -3304,7 +3340,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
redisSetProcTitle("redis-rdb-to-slaves");
redisSetCpuAffinity(server.bgsave_cpulist);
retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
retval = rdbSaveRioWithEOFMark(req,&rdb,NULL,rsi);
if (retval == C_OK && rioFlush(&rdb) == 0)
retval = C_ERR;
@ -3366,7 +3402,7 @@ void saveCommand(client *c) {
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
if (rdbSave(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) == C_OK) {
addReply(c,shared.ok);
} else {
addReplyErrorObject(c,shared.err);
@ -3403,7 +3439,7 @@ void bgsaveCommand(client *c) {
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
} else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
} else if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReplyErrorObject(c,shared.err);

View File

@ -148,10 +148,10 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid, int from_signal);
int rdbSave(char *filename, rdbSaveInfo *rsi);
int rdbSave(int req, char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
robj *rdbLoadObject(int type, rio *rdb, sds key, int dbid, int *error);
@ -170,8 +170,8 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
int functionsSaveRio(rio *rdb);
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif

View File

@ -226,6 +226,7 @@ static struct config {
int pipe_mode;
int pipe_timeout;
int getrdb_mode;
int get_functions_rdb_mode;
int stat_mode;
int scan_mode;
int intrinsic_latency_mode;
@ -1643,6 +1644,9 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"--rdb") && !lastarg) {
config.getrdb_mode = 1;
config.rdb_filename = argv[++i];
} else if (!strcmp(argv[i],"--functions-rdb") && !lastarg) {
config.get_functions_rdb_mode = 1;
config.rdb_filename = argv[++i];
} else if (!strcmp(argv[i],"--pipe")) {
config.pipe_mode = 1;
} else if (!strcmp(argv[i],"--pipe-timeout") && !lastarg) {
@ -1848,6 +1852,11 @@ static int parseOptions(int argc, char **argv) {
" line interface may not be safe.\n", stderr);
}
if (config.get_functions_rdb_mode && config.getrdb_mode) {
fprintf(stderr,"Option --functions-rdb and --rdb are mutually exclusive.\n");
exit(1);
}
if (config.stdin_lastarg && config.stdin_tag_arg) {
fprintf(stderr, "Options -x and -X are mutually exclusive.\n");
exit(1);
@ -1949,6 +1958,8 @@ static void usage(int err) {
" --replica Simulate a replica showing commands received from the master.\n"
" --rdb <filename> Transfer an RDB dump from remote server to local file.\n"
" Use filename of \"-\" to write to stdout.\n"
" --functions-rdb <filename> Like --rdb but only get the functions (not the keys)\n"
" when getting the RDB dump file.\n"
" --pipe Transfer raw Redis protocol from stdin to server.\n"
" --pipe-timeout <n> In --pipe mode, abort with error if after sending all data.\n"
" no reply is received within <n> seconds.\n"
@ -7158,7 +7169,8 @@ static void latencyDistMode(void) {
#define RDB_EOF_MARK_SIZE 40
void sendReplconf(const char* arg1, const char* arg2) {
int sendReplconf(const char* arg1, const char* arg2) {
int res = 1;
fprintf(stderr, "sending REPLCONF %s %s\n", arg1, arg2);
redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2);
@ -7167,10 +7179,12 @@ void sendReplconf(const char* arg1, const char* arg2) {
fprintf(stderr, "\nI/O error\n");
exit(1);
} else if(reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
/* non fatal, old versions may not support it */
fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
res = 0;
}
freeReplyObject(reply);
return res;
}
void sendCapa() {
@ -8411,6 +8425,7 @@ int main(int argc, char **argv) {
config.cluster_send_asking = 0;
config.slave_mode = 0;
config.getrdb_mode = 0;
config.get_functions_rdb_mode = 0;
config.stat_mode = 0;
config.scan_mode = 0;
config.intrinsic_latency_mode = 0;
@ -8522,11 +8537,15 @@ int main(int argc, char **argv) {
slaveMode();
}
/* Get RDB mode. */
if (config.getrdb_mode) {
/* Get RDB/functions mode. */
if (config.getrdb_mode || config.get_functions_rdb_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
sendRdbOnly();
if (config.get_functions_rdb_mode && !sendReplconf("rdb-filter-only", "functions")) {
fprintf(stderr, "Failed requesting functions only RDB from server, aborting\n");
exit(1);
}
getRDB(NULL);
}

View File

@ -826,12 +826,19 @@ need_full_resync:
* started.
*
* Returns C_OK on success or C_ERR otherwise. */
int startBgsaveForReplication(int mincapa) {
int startBgsaveForReplication(int mincapa, int req) {
int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
int socket_target = 0;
listIter li;
listNode *ln;
/* We use a socket target if slave can handle the EOF marker and we're configured to do diskless syncs.
* Note that in case we're creating a "filtered" RDB (functions-only) we also force socket replication
* to avoid overwriting the snapshot RDB file with filtered data. */
socket_target = (server.repl_diskless_sync || (req & SLAVE_REQ_RDB_FUNCTIONS_ONLY)) && (mincapa & SLAVE_CAPA_EOF);
/* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */
serverAssert(socket_target || !(req & SLAVE_REQ_RDB_FUNCTIONS_ONLY));
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");
@ -841,9 +848,9 @@ int startBgsaveForReplication(int mincapa) {
* otherwise slave will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr);
retval = rdbSaveToSlavesSockets(req,rsiptr);
else
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
retval = rdbSaveBackground(req,server.rdb_filename,rsiptr);
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
@ -886,8 +893,10 @@ int startBgsaveForReplication(int mincapa) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
/* Check slave has the exact requirements */
if (slave->slave_req != req)
continue;
replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
}
}
}
@ -946,6 +955,14 @@ void syncCommand(client *c) {
return;
}
/* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered
* RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing
* use of a socket is handled, if needed, in `startBgsaveForReplication`. */
if ((c->slave_req & SLAVE_REQ_RDB_FUNCTIONS_ONLY) && !(c->slave_capa & SLAVE_CAPA_EOF)) {
addReplyError(c,"Filtered replica requires EOF capability");
return;
}
serverLog(LL_NOTICE,"Replica %s asks for synchronization",
replicationGetSlaveName(c));
@ -1025,8 +1042,10 @@ void syncCommand(client *c) {
break;
}
/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
* capabilities of the slave that triggered the current BGSAVE
* and its exact requirements. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) &&
c->slave_req == slave->slave_req) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer.
* We don't copy buffer if clients don't want. */
@ -1062,7 +1081,7 @@ void syncCommand(client *c) {
/* We don't have a BGSAVE in progress, let's start one. Diskless
* or disk-based mode is determined by replica's capacity. */
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->slave_capa);
startBgsaveForReplication(c->slave_capa, c->slave_req);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but another BG operation is active. "
@ -1100,8 +1119,12 @@ void syncCommand(client *c) {
* Unlike other subcommands, this is used by master to get the replication
* offset from a replica.
*
* - rdb-only
* Only wants RDB snapshot without replication buffer. */
* - rdb-only <0|1>
* Only wants RDB snapshot without replication buffer.
*
* - rdb-filter-only <include-filters>
* Define "include" filters for the RDB snapshot. Currently we only support
* a single include filter: "functions". */
void replconfCommand(client *c) {
int j;
@ -1177,6 +1200,30 @@ void replconfCommand(client *c) {
return;
if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY;
else c->flags &= ~CLIENT_REPL_RDBONLY;
} else if (!strcasecmp(c->argv[j]->ptr,"rdb-filter-only")) {
/* REPLCONFG RDB-FILTER-ONLY is used to define "include" filters
* for the RDB snapshot. Currently we only support a single
* include filter: "functions". In the future we may want to add
* other filters like key patterns, key types, non-volatile, module
* aux fields, ...
* We might want to add the complementing "RDB-FILTER-EXCLUDE" to
* filter out certain data. */
int filter_count, i;
sds *filters;
if (!(filters = sdssplitargs(c->argv[j+1]->ptr, &filter_count))) {
addReplyErrorFormat(c, "Missing rdb-filter-only values");
return;
}
for (i = 0; i < filter_count; i++) {
if (!strcasecmp(filters[i], "functions"))
c->slave_req |= SLAVE_REQ_RDB_FUNCTIONS_ONLY;
else {
addReplyErrorFormat(c, "Unsupported rdb-filter-only option: %s", (char*)filters[i]);
sdsfreesplitres(filters, filter_count);
return;
}
}
sdsfreesplitres(filters, filter_count);
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
@ -3633,8 +3680,8 @@ void replicationCron(void) {
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
void replicationStartPendingFork(void) {
/* Start a BGSAVE good for replication if we have slaves in
int shouldStartChildReplication(int *mincapa_out, int *req_out) {
/* We should start a BGSAVE good for replication if we have slaves in
* WAIT_BGSAVE_START state.
*
* In case of diskless replication, we make sure to wait the specified
@ -3643,7 +3690,9 @@ void replicationStartPendingFork(void) {
if (!hasActiveChildProcess()) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1;
int mincapa;
int req;
int first = 1;
listNode *ln;
listIter li;
@ -3651,11 +3700,18 @@ void replicationStartPendingFork(void) {
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
if (first) {
/* Get first slave's requirements */
req = slave->slave_req;
} else if (req != slave->slave_req) {
/* Skip slaves that don't match */
continue;
}
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
mincapa = first ? slave->slave_capa : (mincapa & slave->slave_capa);
first = 0;
}
}
@ -3663,12 +3719,27 @@ void replicationStartPendingFork(void) {
(!server.repl_diskless_sync ||
max_idle >= server.repl_diskless_sync_delay))
{
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities. */
startBgsaveForReplication(mincapa);
if (mincapa_out)
*mincapa_out = mincapa;
if (req_out)
*req_out = req;
return 1;
}
}
return 0;
}
void replicationStartPendingFork(void) {
int mincapa = -1;
int req = -1;
if (shouldStartChildReplication(&mincapa, &req)) {
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities and requirements. */
startBgsaveForReplication(mincapa, req);
}
}
/* Find replica at IP:PORT from replica list */

View File

@ -1209,7 +1209,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
rdbSaveBackground(SLAVE_REQ_NONE, server.rdb_filename,rsiptr);
break;
}
}
@ -1298,7 +1298,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
{
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
if (rdbSaveBackground(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
@ -3692,7 +3692,7 @@ int prepareForShutdown(int flags) {
/* Snapshotting. Perform a SYNC SAVE and exit */
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
if (rdbSave(SLAVE_REQ_NONE, server.rdb_filename,rsiptr) != C_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background

View File

@ -380,6 +380,10 @@ typedef enum {
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
/* Slave requirements */
#define SLAVE_REQ_NONE 0
#define SLAVE_REQ_RDB_FUNCTIONS_ONLY (1 << 0)
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
@ -1058,6 +1062,7 @@ typedef struct client {
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */

View File

@ -315,34 +315,53 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS
file delete $tmpfile
}
proc test_redis_cli_rdb_dump {} {
proc test_redis_cli_rdb_dump {functions_only} {
r flushdb
r function flush
set dir [lindex [r config get dir] 1]
assert_equal "OK" [r debug populate 100000 key 1000]
catch {run_cli --rdb "$dir/cli.rdb"} output
assert_equal "OK" [r function create lua func1 "return 123"]
if {$functions_only} {
set args "--functions-rdb $dir/cli.rdb"
} else {
set args "--rdb $dir/cli.rdb"
}
catch {run_cli {*}$args} output
assert_match {*Transfer finished with success*} $output
file delete "$dir/dump.rdb"
file rename "$dir/cli.rdb" "$dir/dump.rdb"
assert_equal "OK" [r set should-not-exist 1]
assert_equal "OK" [r function create lua should_not_exist_func "return 456"]
assert_equal "OK" [r debug reload nosave]
assert_equal {} [r get should-not-exist]
assert_error "ERR Function does not exists" {r function info should_not_exist_func}
assert_equal "func1" [dict get [r function info func1] name]
if {$functions_only} {
assert_equal 0 [r dbsize]
} else {
assert_equal 100000 [r dbsize]
}
}
test "Dumping an RDB" {
foreach {functions_only} {no yes} {
test "Dumping an RDB - functions only: $functions_only" {
# Disk-based master
assert_match "OK" [r config set repl-diskless-sync no]
test_redis_cli_rdb_dump
test_redis_cli_rdb_dump $functions_only
# Disk-less master
assert_match "OK" [r config set repl-diskless-sync yes]
assert_match "OK" [r config set repl-diskless-sync-delay 0]
test_redis_cli_rdb_dump
test_redis_cli_rdb_dump $functions_only
} {} {needs:repl needs:debug}
} ;# foreach functions_only
test "Scan mode" {
r flushdb
populate 1000 key: 1