RDB modules values serialization format version 2.
The original RDB serialization format was not parsable without the module loaded, becuase the structure was managed only by the module itself. Moreover RDB is a streaming protocol in the sense that it is both produce di an append-only fashion, and is also sometimes directly sent to the socket (in the case of diskless replication). The fact that modules values cannot be parsed without the relevant module loaded is a problem in many ways: RDB checking tools must have loaded modules even for doing things not involving the value at all, like splitting an RDB into N RDBs by key or alike, or just checking the RDB for sanity. In theory module values could be just a blob of data with a prefixed length in order for us to be able to skip it. However prefixing the values with a length would mean one of the following: 1. To be able to write some data at a previous offset. This breaks stremaing. 2. To bufferize values before outputting them. This breaks performances. 3. To have some chunked RDB output format. This breaks simplicity. Moreover, the above solution, still makes module values a totally opaque matter, with the fowllowing problems: 1. The RDB check tool can just skip the value without being able to at least check the general structure. For datasets composed mostly of modules values this means to just check the outer level of the RDB not actually doing any checko on most of the data itself. 2. It is not possible to do any recovering or processing of data for which a module no longer exists in the future, or is unknown. So this commit implements a different solution. The modules RDB serialization API is composed if well defined calls to store integers, floats, doubles or strings. After this commit, the parts generated by the module API have a one-byte prefix for each of the above emitted parts, and there is a final EOF byte as well. So even if we don't know exactly how to interpret a module value, we can always parse it at an high level, check the overall structure, understand the types used to store the information, and easily skip the whole value. The change is backward compatible: older RDB files can be still loaded since the new encoding has a new RDB type: MODULE_2 (of value 7). The commit also implements the ability to check RDB files for sanity taking advantage of the new feature.
This commit is contained in:
parent
c3998728a2
commit
365dd037dc
75
src/module.c
75
src/module.c
@ -2705,11 +2705,13 @@ moduleType *moduleTypeLookupModuleByID(uint64_t id) {
|
||||
}
|
||||
|
||||
/* Turn an (unresolved) module ID into a type name, to show the user an
|
||||
* error when RDB files contain module data we can't load. */
|
||||
* error when RDB files contain module data we can't load.
|
||||
* The buffer pointed by 'name' must be 10 bytes at least. The function will
|
||||
* fill it with a null terminated module name. */
|
||||
void moduleTypeNameByID(char *name, uint64_t moduleid) {
|
||||
const char *cset = ModuleTypeNameCharSet;
|
||||
|
||||
name[0] = '\0';
|
||||
name[9] = '\0';
|
||||
char *p = name+8;
|
||||
moduleid >>= 10;
|
||||
for (int j = 0; j < 9; j++) {
|
||||
@ -2877,7 +2879,8 @@ void moduleRDBLoadError(RedisModuleIO *io) {
|
||||
* data types. */
|
||||
void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) {
|
||||
if (io->error) return;
|
||||
int retval = rdbSaveLen(io->rio, value);
|
||||
int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_UINT);
|
||||
if (retval != -1) rdbSaveLen(io->rio, value);
|
||||
if (retval == -1) {
|
||||
io->error = 1;
|
||||
} else {
|
||||
@ -2889,13 +2892,18 @@ void RM_SaveUnsigned(RedisModuleIO *io, uint64_t value) {
|
||||
* be called in the context of the rdb_load method of modules implementing
|
||||
* new data types. */
|
||||
uint64_t RM_LoadUnsigned(RedisModuleIO *io) {
|
||||
if (io->ver == 2) {
|
||||
uint64_t opcode = rdbLoadLen(io->rio,NULL);
|
||||
if (opcode != RDB_MODULE_OPCODE_UINT) goto loaderr;
|
||||
}
|
||||
uint64_t value;
|
||||
int retval = rdbLoadLenByRef(io->rio, NULL, &value);
|
||||
if (retval == -1) {
|
||||
moduleRDBLoadError(io);
|
||||
return 0; /* Never reached. */
|
||||
}
|
||||
if (retval == -1) goto loaderr;
|
||||
return value;
|
||||
|
||||
loaderr:
|
||||
moduleRDBLoadError(io);
|
||||
return 0; /* Never reached. */
|
||||
}
|
||||
|
||||
/* Like RedisModule_SaveUnsigned() but for signed 64 bit values. */
|
||||
@ -2920,7 +2928,8 @@ int64_t RM_LoadSigned(RedisModuleIO *io) {
|
||||
* the RDB file. */
|
||||
void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) {
|
||||
if (io->error) return;
|
||||
int retval = rdbSaveStringObject(io->rio,s);
|
||||
int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
|
||||
if (retval != -1) retval = rdbSaveStringObject(io->rio,s);
|
||||
if (retval == -1) {
|
||||
io->error = 1;
|
||||
} else {
|
||||
@ -2932,7 +2941,8 @@ void RM_SaveString(RedisModuleIO *io, RedisModuleString *s) {
|
||||
* as input. */
|
||||
void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) {
|
||||
if (io->error) return;
|
||||
int retval = rdbSaveRawString(io->rio,(unsigned char*)str,len);
|
||||
int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_STRING);
|
||||
if (retval != -1) retval = rdbSaveRawString(io->rio,(unsigned char*)str,len);
|
||||
if (retval == -1) {
|
||||
io->error = 1;
|
||||
} else {
|
||||
@ -2942,13 +2952,18 @@ void RM_SaveStringBuffer(RedisModuleIO *io, const char *str, size_t len) {
|
||||
|
||||
/* Implements RM_LoadString() and RM_LoadStringBuffer() */
|
||||
void *moduleLoadString(RedisModuleIO *io, int plain, size_t *lenptr) {
|
||||
if (io->ver == 2) {
|
||||
uint64_t opcode = rdbLoadLen(io->rio,NULL);
|
||||
if (opcode != RDB_MODULE_OPCODE_STRING) goto loaderr;
|
||||
}
|
||||
void *s = rdbGenericLoadStringObject(io->rio,
|
||||
plain ? RDB_LOAD_PLAIN : RDB_LOAD_NONE, lenptr);
|
||||
if (s == NULL) {
|
||||
moduleRDBLoadError(io);
|
||||
return NULL; /* Never reached. */
|
||||
}
|
||||
if (s == NULL) goto loaderr;
|
||||
return s;
|
||||
|
||||
loaderr:
|
||||
moduleRDBLoadError(io);
|
||||
return NULL; /* Never reached. */
|
||||
}
|
||||
|
||||
/* In the context of the rdb_load method of a module data type, loads a string
|
||||
@ -2980,7 +2995,8 @@ char *RM_LoadStringBuffer(RedisModuleIO *io, size_t *lenptr) {
|
||||
* It is possible to load back the value with RedisModule_LoadDouble(). */
|
||||
void RM_SaveDouble(RedisModuleIO *io, double value) {
|
||||
if (io->error) return;
|
||||
int retval = rdbSaveBinaryDoubleValue(io->rio, value);
|
||||
int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_DOUBLE);
|
||||
if (retval != -1) retval = rdbSaveBinaryDoubleValue(io->rio, value);
|
||||
if (retval == -1) {
|
||||
io->error = 1;
|
||||
} else {
|
||||
@ -2991,21 +3007,27 @@ void RM_SaveDouble(RedisModuleIO *io, double value) {
|
||||
/* In the context of the rdb_save method of a module data type, loads back the
|
||||
* double value saved by RedisModule_SaveDouble(). */
|
||||
double RM_LoadDouble(RedisModuleIO *io) {
|
||||
if (io->ver == 2) {
|
||||
uint64_t opcode = rdbLoadLen(io->rio,NULL);
|
||||
if (opcode != RDB_MODULE_OPCODE_DOUBLE) goto loaderr;
|
||||
}
|
||||
double value;
|
||||
int retval = rdbLoadBinaryDoubleValue(io->rio, &value);
|
||||
if (retval == -1) {
|
||||
moduleRDBLoadError(io);
|
||||
return 0; /* Never reached. */
|
||||
}
|
||||
if (retval == -1) goto loaderr;
|
||||
return value;
|
||||
|
||||
loaderr:
|
||||
moduleRDBLoadError(io);
|
||||
return 0; /* Never reached. */
|
||||
}
|
||||
|
||||
/* In the context of the rdb_save method of a module data type, saves a float
|
||||
/* In the context of the rdb_save method of a module data type, saves a float
|
||||
* value to the RDB file. The float can be a valid number, a NaN or infinity.
|
||||
* It is possible to load back the value with RedisModule_LoadFloat(). */
|
||||
void RM_SaveFloat(RedisModuleIO *io, float value) {
|
||||
if (io->error) return;
|
||||
int retval = rdbSaveBinaryFloatValue(io->rio, value);
|
||||
int retval = rdbSaveLen(io->rio, RDB_MODULE_OPCODE_FLOAT);
|
||||
if (retval != -1) retval = rdbSaveBinaryFloatValue(io->rio, value);
|
||||
if (retval == -1) {
|
||||
io->error = 1;
|
||||
} else {
|
||||
@ -3016,13 +3038,18 @@ void RM_SaveFloat(RedisModuleIO *io, float value) {
|
||||
/* In the context of the rdb_save method of a module data type, loads back the
|
||||
* float value saved by RedisModule_SaveFloat(). */
|
||||
float RM_LoadFloat(RedisModuleIO *io) {
|
||||
if (io->ver == 2) {
|
||||
uint64_t opcode = rdbLoadLen(io->rio,NULL);
|
||||
if (opcode != RDB_MODULE_OPCODE_FLOAT) goto loaderr;
|
||||
}
|
||||
float value;
|
||||
int retval = rdbLoadBinaryFloatValue(io->rio, &value);
|
||||
if (retval == -1) {
|
||||
moduleRDBLoadError(io);
|
||||
return 0; /* Never reached. */
|
||||
}
|
||||
if (retval == -1) goto loaderr;
|
||||
return value;
|
||||
|
||||
loaderr:
|
||||
moduleRDBLoadError(io);
|
||||
return 0; /* Never reached. */
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
|
63
src/rdb.c
63
src/rdb.c
@ -623,7 +623,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
|
||||
else
|
||||
serverPanic("Unknown hash encoding");
|
||||
case OBJ_MODULE:
|
||||
return rdbSaveType(rdb,RDB_TYPE_MODULE);
|
||||
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
|
||||
default:
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
@ -775,8 +775,12 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) {
|
||||
if (retval == -1) return -1;
|
||||
io.bytes += retval;
|
||||
|
||||
/* Then write the module-specific representation. */
|
||||
/* Then write the module-specific representation + EOF marker. */
|
||||
mt->rdb_save(&io,mv->value);
|
||||
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
|
||||
if (retval == -1) return -1;
|
||||
io.bytes += retval;
|
||||
|
||||
if (io.ctx) {
|
||||
moduleFreeContext(io.ctx);
|
||||
zfree(io.ctx);
|
||||
@ -1102,6 +1106,45 @@ void rdbRemoveTempFile(pid_t childpid) {
|
||||
unlink(tmpfile);
|
||||
}
|
||||
|
||||
/* This function is called by rdbLoadObject() when the code is in RDB-check
|
||||
* mode and we find a module value of type 2 that can be parsed without
|
||||
* the need of the actual module. The value is parsed for errors, finally
|
||||
* a dummy redis object is returned just to conform to the API. */
|
||||
robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
|
||||
uint64_t opcode;
|
||||
while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) {
|
||||
if (opcode == RDB_MODULE_OPCODE_SINT ||
|
||||
opcode == RDB_MODULE_OPCODE_UINT)
|
||||
{
|
||||
uint64_t len;
|
||||
if (rdbLoadLenByRef(rdb,NULL,&len) == -1) {
|
||||
rdbExitReportCorruptRDB(
|
||||
"Error reading integer from module %s value", modulename);
|
||||
}
|
||||
} else if (opcode == RDB_MODULE_OPCODE_STRING) {
|
||||
robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
|
||||
if (o == NULL) {
|
||||
rdbExitReportCorruptRDB(
|
||||
"Error reading string from module %s value", modulename);
|
||||
}
|
||||
decrRefCount(o);
|
||||
} else if (opcode == RDB_MODULE_OPCODE_FLOAT) {
|
||||
float val;
|
||||
if (rdbLoadBinaryFloatValue(rdb,&val) == -1) {
|
||||
rdbExitReportCorruptRDB(
|
||||
"Error reading float from module %s value", modulename);
|
||||
}
|
||||
} else if (opcode == RDB_MODULE_OPCODE_DOUBLE) {
|
||||
double val;
|
||||
if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) {
|
||||
rdbExitReportCorruptRDB(
|
||||
"Error reading double from module %s value", modulename);
|
||||
}
|
||||
}
|
||||
}
|
||||
return createStringObject("module-dummy-value",18);
|
||||
}
|
||||
|
||||
/* Load a Redis object of the specified type from the specified file.
|
||||
* On success a newly allocated object is returned, otherwise NULL. */
|
||||
robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
||||
@ -1353,11 +1396,14 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
||||
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
||||
break;
|
||||
}
|
||||
} else if (rdbtype == RDB_TYPE_MODULE) {
|
||||
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
|
||||
uint64_t moduleid = rdbLoadLen(rdb,NULL);
|
||||
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
|
||||
char name[10];
|
||||
|
||||
if (rdbCheckMode && rdbtype == RDB_TYPE_MODULE_2)
|
||||
return rdbLoadCheckModuleValue(rdb,name);
|
||||
|
||||
if (mt == NULL) {
|
||||
moduleTypeNameByID(name,moduleid);
|
||||
serverLog(LL_WARNING,"The RDB file contains module data I can't load: no matching module '%s'", name);
|
||||
@ -1365,9 +1411,20 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
||||
}
|
||||
RedisModuleIO io;
|
||||
moduleInitIOContext(io,mt,rdb);
|
||||
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
|
||||
/* Call the rdb_load method of the module providing the 10 bit
|
||||
* encoding version in the lower 10 bits of the module ID. */
|
||||
void *ptr = mt->rdb_load(&io,moduleid&1023);
|
||||
|
||||
/* Module v2 serialization has an EOF mark at the end. */
|
||||
if (io.ver == 2) {
|
||||
uint64_t eof = rdbLoadLen(rdb,NULL);
|
||||
if (eof != RDB_MODULE_OPCODE_EOF) {
|
||||
serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
if (ptr == NULL) {
|
||||
moduleTypeNameByID(name,moduleid);
|
||||
serverLog(LL_WARNING,"The RDB file contains module data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
|
||||
|
12
src/rdb.h
12
src/rdb.h
@ -78,6 +78,8 @@
|
||||
#define RDB_TYPE_HASH 4
|
||||
#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */
|
||||
#define RDB_TYPE_MODULE 6
|
||||
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without
|
||||
the generating module being loaded. */
|
||||
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
|
||||
|
||||
/* Object types for encoded objects. */
|
||||
@ -90,7 +92,7 @@
|
||||
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
|
||||
|
||||
/* Test if a type is an object type. */
|
||||
#define rdbIsObjectType(t) ((t >= 0 && t <= 6) || (t >= 9 && t <= 14))
|
||||
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 14))
|
||||
|
||||
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
|
||||
#define RDB_OPCODE_AUX 250
|
||||
@ -100,6 +102,14 @@
|
||||
#define RDB_OPCODE_SELECTDB 254
|
||||
#define RDB_OPCODE_EOF 255
|
||||
|
||||
/* Module serialized values sub opcodes */
|
||||
#define RDB_MODULE_OPCODE_EOF 0 /* End of module value. */
|
||||
#define RDB_MODULE_OPCODE_SINT 1 /* Signed integer. */
|
||||
#define RDB_MODULE_OPCODE_UINT 2 /* Unsigned integer. */
|
||||
#define RDB_MODULE_OPCODE_FLOAT 3 /* Float. */
|
||||
#define RDB_MODULE_OPCODE_DOUBLE 4 /* Double. */
|
||||
#define RDB_MODULE_OPCODE_STRING 5 /* String. */
|
||||
|
||||
/* rdbLoad...() functions flags. */
|
||||
#define RDB_LOAD_NONE 0
|
||||
#define RDB_LOAD_ENC (1<<0)
|
||||
|
@ -530,14 +530,19 @@ typedef struct RedisModuleIO {
|
||||
rio *rio; /* Rio stream. */
|
||||
moduleType *type; /* Module type doing the operation. */
|
||||
int error; /* True if error condition happened. */
|
||||
int ver; /* Module serialization version: 1 (old),
|
||||
* 2 (current version with opcodes annotation). */
|
||||
struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/
|
||||
} RedisModuleIO;
|
||||
|
||||
/* Macro to initialize an IO context. Note that the 'ver' field is populated
|
||||
* inside rdb.c according to the version of the value to load. */
|
||||
#define moduleInitIOContext(iovar,mtype,rioptr) do { \
|
||||
iovar.rio = rioptr; \
|
||||
iovar.type = mtype; \
|
||||
iovar.bytes = 0; \
|
||||
iovar.error = 0; \
|
||||
iovar.ver = 0; \
|
||||
iovar.ctx = NULL; \
|
||||
} while(0);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user