Support serializing all types
Former-commit-id: dfdd41f15c0567925f02fd5df779f597ab16894d
This commit is contained in:
parent
60c8887aae
commit
e6bf931f2c
@ -1498,7 +1498,24 @@ sds serializeStoredStringObject(robj_roptr o)
|
||||
{
|
||||
sds str = sdsempty();
|
||||
sdscatlen(str, &(*o), sizeof(robj));
|
||||
sdscat(str, szFromObj(o));
|
||||
switch (o->encoding)
|
||||
{
|
||||
case OBJ_ENCODING_RAW:
|
||||
sdscat(str, szFromObj(o));
|
||||
break;
|
||||
|
||||
case OBJ_ENCODING_INT:
|
||||
break; //nop
|
||||
|
||||
case OBJ_ENCODING_EMBSTR:
|
||||
size_t cch = sdslen(szFromObj(o));
|
||||
if (cch > sizeof(redisObject::m_ptr))
|
||||
{
|
||||
sdscatlen(str, szFromObj(o) + sizeof(redisObject::m_ptr), cch - sizeof(redisObject::m_ptr));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return str;
|
||||
}
|
||||
|
||||
@ -1509,6 +1526,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb)
|
||||
switch (oT->encoding)
|
||||
{
|
||||
case OBJ_ENCODING_EMBSTR:
|
||||
case OBJ_ENCODING_INT:
|
||||
newObject = (robj*)zmalloc(cb, MALLOC_LOCAL);
|
||||
memcpy(newObject, data, cb);
|
||||
return newObject;
|
||||
@ -1531,6 +1549,18 @@ robj *deserializeStoredObject(const void *data, size_t cb)
|
||||
{
|
||||
case OBJ_STRING:
|
||||
return deserializeStoredStringObject((char*)data, cb);
|
||||
|
||||
default:
|
||||
rio payload;
|
||||
int type;
|
||||
robj *obj;
|
||||
rioInitWithConstBuffer(&payload,data,cb);
|
||||
if (((type = rdbLoadObjectType(&payload)) == -1) ||
|
||||
((obj = rdbLoadObject(type,&payload,nullptr, OBJ_MVCC_INVALID)) == nullptr))
|
||||
{
|
||||
serverPanic("Bad data format");
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
serverPanic("Unknown object type loading from storage");
|
||||
}
|
||||
@ -1541,6 +1571,11 @@ sds serializeStoredObject(robj_roptr o)
|
||||
{
|
||||
case OBJ_STRING:
|
||||
return serializeStoredStringObject(o);
|
||||
|
||||
default:
|
||||
rio rdb;
|
||||
createDumpPayload(&rdb,o,nullptr);
|
||||
return (sds)rdb.io.buffer.ptr;
|
||||
}
|
||||
serverPanic("Attempting to store unknown object type");
|
||||
}
|
28
src/rio.cpp
28
src/rio.cpp
@ -73,6 +73,14 @@ static size_t rioBufferRead(rio *r, void *buf, size_t len) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
static size_t rioConstBufferRead(rio *r, void *buf, size_t len) {
|
||||
if (r->io.buffer.len-r->io.buffer.pos < (off_t)len)
|
||||
return 0; /* not enough buffer to return len bytes. */
|
||||
memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
|
||||
r->io.buffer.pos += len;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Returns read/write position in buffer. */
|
||||
static off_t rioBufferTell(rio *r) {
|
||||
return r->io.buffer.pos;
|
||||
@ -97,12 +105,32 @@ static const rio rioBufferIO = {
|
||||
{ { NULL, 0 } } /* union for io-specific vars */
|
||||
};
|
||||
|
||||
static const rio rioConstBufferIO = {
|
||||
rioConstBufferRead,
|
||||
nullptr,
|
||||
rioBufferTell,
|
||||
rioBufferFlush,
|
||||
NULL, /* update_checksum */
|
||||
0, /* current checksum */
|
||||
0, /* bytes read or written */
|
||||
0, /* read/write chunk size */
|
||||
{ { NULL, 0 } } /* union for io-specific vars */
|
||||
};
|
||||
|
||||
void rioInitWithBuffer(rio *r, sds s) {
|
||||
*r = rioBufferIO;
|
||||
r->io.buffer.ptr = s;
|
||||
r->io.buffer.pos = 0;
|
||||
}
|
||||
|
||||
void rioInitWithConstBuffer(rio *r, const void *buf, size_t cb)
|
||||
{
|
||||
*r = rioConstBufferIO;
|
||||
r->io.buffer.ptr = (sds)buf;
|
||||
r->io.buffer.pos = 0;
|
||||
r->io.buffer.len = cb;
|
||||
}
|
||||
|
||||
/* --------------------- Stdio file pointer implementation ------------------- */
|
||||
|
||||
/* Returns 1 or 0 for success/failure. */
|
||||
|
@ -70,6 +70,7 @@ struct _rio {
|
||||
struct {
|
||||
sds ptr;
|
||||
off_t pos;
|
||||
off_t len; // For const buffers only
|
||||
} buffer;
|
||||
/* Stdio file pointer target. */
|
||||
struct {
|
||||
@ -130,6 +131,7 @@ static inline int rioFlush(rio *r) {
|
||||
|
||||
void rioInitWithFile(rio *r, FILE *fp);
|
||||
void rioInitWithBuffer(rio *r, sds s);
|
||||
void rioInitWithConstBuffer(rio *r, const void *rgch, size_t cch);
|
||||
void rioInitWithFdset(rio *r, int *fds, int numfds);
|
||||
|
||||
void rioFreeFdset(rio *r);
|
||||
|
@ -2691,6 +2691,7 @@ void clusterPropagatePublish(robj *channel, robj *message);
|
||||
void migrateCloseTimedoutSockets(void);
|
||||
void clusterBeforeSleep(void);
|
||||
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len);
|
||||
void createDumpPayload(rio *payload, robj_roptr o, robj *key);
|
||||
|
||||
/* Sentinel */
|
||||
void initSentinelConfig(void);
|
||||
|
Loading…
x
Reference in New Issue
Block a user