From c2b5a8802e00ac07471a78b9d475402b747b1b88 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 16 Oct 2019 14:23:28 -0400 Subject: [PATCH] Support serializing all types Former-commit-id: dfdd41f15c0567925f02fd5df779f597ab16894d --- src/object.cpp | 37 ++++++++++++++++++++++++++++++++++++- src/rio.cpp | 28 ++++++++++++++++++++++++++++ src/rio.h | 2 ++ src/server.h | 1 + 4 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/object.cpp b/src/object.cpp index 141cc7256..9c296a99c 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1498,7 +1498,24 @@ sds serializeStoredStringObject(robj_roptr o) { sds str = sdsempty(); sdscatlen(str, &(*o), sizeof(robj)); - sdscat(str, szFromObj(o)); + switch (o->encoding) + { + case OBJ_ENCODING_RAW: + sdscat(str, szFromObj(o)); + break; + + case OBJ_ENCODING_INT: + break; //nop + + case OBJ_ENCODING_EMBSTR: + size_t cch = sdslen(szFromObj(o)); + if (cch > sizeof(redisObject::m_ptr)) + { + sdscatlen(str, szFromObj(o) + sizeof(redisObject::m_ptr), cch - sizeof(redisObject::m_ptr)); + } + break; + } + return str; } @@ -1509,6 +1526,7 @@ robj *deserializeStoredStringObject(const char *data, size_t cb) switch (oT->encoding) { case OBJ_ENCODING_EMBSTR: + case OBJ_ENCODING_INT: newObject = (robj*)zmalloc(cb, MALLOC_LOCAL); memcpy(newObject, data, cb); return newObject; @@ -1531,6 +1549,18 @@ robj *deserializeStoredObject(const void *data, size_t cb) { case OBJ_STRING: return deserializeStoredStringObject((char*)data, cb); + + default: + rio payload; + int type; + robj *obj; + rioInitWithConstBuffer(&payload,data,cb); + if (((type = rdbLoadObjectType(&payload)) == -1) || + ((obj = rdbLoadObject(type,&payload,nullptr, OBJ_MVCC_INVALID)) == nullptr)) + { + serverPanic("Bad data format"); + } + return obj; } serverPanic("Unknown object type loading from storage"); } @@ -1541,6 +1571,11 @@ sds serializeStoredObject(robj_roptr o) { case OBJ_STRING: return serializeStoredStringObject(o); + + default: + rio rdb; + createDumpPayload(&rdb,o,nullptr); + return (sds)rdb.io.buffer.ptr; } serverPanic("Attempting to store unknown object type"); } \ No newline at end of file diff --git a/src/rio.cpp b/src/rio.cpp index d6d1937eb..59642da5e 100644 --- a/src/rio.cpp +++ b/src/rio.cpp @@ -73,6 +73,14 @@ static size_t rioBufferRead(rio *r, void *buf, size_t len) { return 1; } +static size_t rioConstBufferRead(rio *r, void *buf, size_t len) { + if (r->io.buffer.len-r->io.buffer.pos < (off_t)len) + return 0; /* not enough buffer to return len bytes. */ + memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len); + r->io.buffer.pos += len; + return 1; +} + /* Returns read/write position in buffer. */ static off_t rioBufferTell(rio *r) { return r->io.buffer.pos; @@ -97,12 +105,32 @@ static const rio rioBufferIO = { { { NULL, 0 } } /* union for io-specific vars */ }; +static const rio rioConstBufferIO = { + rioConstBufferRead, + nullptr, + rioBufferTell, + rioBufferFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + void rioInitWithBuffer(rio *r, sds s) { *r = rioBufferIO; r->io.buffer.ptr = s; r->io.buffer.pos = 0; } +void rioInitWithConstBuffer(rio *r, const void *buf, size_t cb) +{ + *r = rioConstBufferIO; + r->io.buffer.ptr = (sds)buf; + r->io.buffer.pos = 0; + r->io.buffer.len = cb; +} + /* --------------------- Stdio file pointer implementation ------------------- */ /* Returns 1 or 0 for success/failure. */ diff --git a/src/rio.h b/src/rio.h index 3ec32263b..1b6c10915 100644 --- a/src/rio.h +++ b/src/rio.h @@ -70,6 +70,7 @@ struct _rio { struct { sds ptr; off_t pos; + off_t len; // For const buffers only } buffer; /* Stdio file pointer target. */ struct { @@ -130,6 +131,7 @@ static inline int rioFlush(rio *r) { void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); +void rioInitWithConstBuffer(rio *r, const void *rgch, size_t cch); void rioInitWithFdset(rio *r, int *fds, int numfds); void rioFreeFdset(rio *r); diff --git a/src/server.h b/src/server.h index 3bb2c7584..8e99af815 100644 --- a/src/server.h +++ b/src/server.h @@ -2691,6 +2691,7 @@ void clusterPropagatePublish(robj *channel, robj *message); void migrateCloseTimedoutSockets(void); void clusterBeforeSleep(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len); +void createDumpPayload(rio *payload, robj_roptr o, robj *key); /* Sentinel */ void initSentinelConfig(void);