From ad0cb8da40680972e2f04654b7ae09aef6f9d6cf Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 1 Jan 2020 11:52:00 -0500 Subject: [PATCH 1/9] Fix issue #130 due to fastlock timeout reduction Former-commit-id: dbef17c2e16f115733242721e9b5a43f01e7a554 --- src/fastlock.cpp | 2 +- src/fastlock_x64.asm | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index d566bb267..19375cd0e 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -293,7 +293,7 @@ extern "C" void fastlock_lock(struct fastlock *lock) #elif defined(__arm__) __asm__ __volatile__ ("yield"); #endif - if ((++cloops % 0x10000) == 0) + if ((++cloops % 0x100000) == 0) { fastlock_sleep(lock, tid, ticketT.u, mask); } diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index f7ab6316e..7c9990a6d 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -45,7 +45,7 @@ fastlock_lock: cmp dx, ax # is our ticket up? je .LLocked # leave the loop pause - add ecx, 0x10000 # Have we been waiting a long time? (oflow if we have) + add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have) # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) jnc .LLoop # If so, give up our timeslice to someone who's doing real work # Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" From 2e50d383a645d748842dc21bc89627f1942b4dda Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 1 Jan 2020 19:13:48 -0500 Subject: [PATCH 2/9] C++ wrapper classes for SDS Former-commit-id: 45817db8c3a86815945359113dcbccfde4257ce5 --- src/sds.c | 43 +++++++++++++++-- src/sds.h | 138 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 176 insertions(+), 5 deletions(-) diff --git a/src/sds.c b/src/sds.c index 637db9663..e1678a95b 100644 --- a/src/sds.c +++ b/src/sds.c @@ -53,11 +53,18 @@ static inline int sdsHdrSize(char type) { return sizeof(struct sdshdr32); case SDS_TYPE_64: return sizeof(struct sdshdr64); + case SDS_TYPE_REFCOUNTED: + return sizeof(struct sdshdrrefcount); } return 0; } -static inline char sdsReqType(size_t string_size) { +static inline char sdsReqType(ssize_t string_size) { + if (string_size < 0){ + string_size = -string_size; + if (string_size < 1<<16) + return SDS_TYPE_REFCOUNTED; + } if (string_size < 1<<5) return SDS_TYPE_5; if (string_size < 1<<8) @@ -86,10 +93,12 @@ static inline char sdsReqType(size_t string_size) { * You can print the string with printf() as there is an implicit \0 at the * end of the string. However the string is binary safe and can contain * \0 characters in the middle, as the length is stored in the sds header. */ -sds sdsnewlen(const void *init, size_t initlen) { +sds sdsnewlen(const void *init, ssize_t initlen) { void *sh; sds s; char type = sdsReqType(initlen); + if (initlen < 0) + initlen = -initlen; /* Empty strings are usually created in order to append. Use type 8 * since type 5 is not good at this. */ if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8; @@ -137,6 +146,13 @@ sds sdsnewlen(const void *init, size_t initlen) { *fp = type; break; } + case SDS_TYPE_REFCOUNTED: { + SDS_HDR_VAR_REFCOUNTED(s); + sh->len = initlen; + sh->refcount = 1; + *fp = type; + break; + } } if (initlen && init) memcpy(s, init, initlen); @@ -161,9 +177,25 @@ sds sdsdup(const char *s) { return sdsnewlen(s, sdslen(s)); } +sds sdsdupshared(const char *s) { + unsigned char flags = s[-1]; + if ((flags & SDS_TYPE_MASK) != SDS_TYPE_REFCOUNTED) + return sdsnewlen(s, -sdslen(s)); + SDS_HDR_VAR_REFCOUNTED(s); + __atomic_fetch_add(&sh->refcount, 1, __ATOMIC_RELAXED); + return (sds)s; +} + /* Free an sds string. No operation is performed if 's' is NULL. */ void sdsfree(const char *s) { if (s == NULL) return; + unsigned char flags = s[-1]; + if ((flags & SDS_TYPE_MASK) == SDS_TYPE_REFCOUNTED) + { + SDS_HDR_VAR_REFCOUNTED(s); + if (__atomic_fetch_sub(&sh->refcount, 1, __ATOMIC_RELAXED) > 1) + return; + } s_free((char*)s-sdsHdrSize(s[-1])); } @@ -368,6 +400,11 @@ void sdsIncrLen(sds s, ssize_t incr) { len = (sh->len += incr); break; } + case SDS_TYPE_REFCOUNTED: { + SDS_HDR_VAR_REFCOUNTED(s); + len = (sh->len += incr); + break; + } default: len = 0; /* Just to avoid compilation warnings. */ } s[len] = '\0'; @@ -787,7 +824,7 @@ void sdstoupper(sds s) { * If two strings share exactly the same prefix, but one of the two has * additional characters, the longer string is considered to be greater than * the smaller one. */ -int sdscmp(const sds s1, const sds s2) { +int sdscmp(const char *s1, const char *s2) { size_t l1, l2, minlen; int cmp; diff --git a/src/sds.h b/src/sds.h index 7f6f141e0..23a11afa4 100644 --- a/src/sds.h +++ b/src/sds.h @@ -91,15 +91,27 @@ struct __attribute__ ((__packed__)) sdshdr64 { #endif }; +struct __attribute__ ((__packed__)) sdshdrrefcount { + uint64_t len; /* used */ + uint16_t refcount; + unsigned char flags; /* 3 lsb of type, 5 unused bits */ +#ifndef __cplusplus + char buf[]; +#endif +}; + #define SDS_TYPE_5 0 #define SDS_TYPE_8 1 #define SDS_TYPE_16 2 #define SDS_TYPE_32 3 #define SDS_TYPE_64 4 +#define SDS_TYPE_REFCOUNTED 5 #define SDS_TYPE_MASK 7 #define SDS_TYPE_BITS 3 #define SDS_HDR_VAR(T,s) struct sdshdr##T *sh = (struct sdshdr##T *)(((void*)((s)-(sizeof(struct sdshdr##T))))); +#define SDS_HDR_VAR_REFCOUNTED(s) struct sdshdrrefcount *sh = (struct sdshdrrefcount *)(((void*)((s)-(sizeof(struct sdshdrrefcount))))); #define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T)))) +#define SDS_HDR_REFCOUNTED(s) ((struct sdshdrrefcount *)((s)-(sizeof(struct sdshdrrefcount)))) #define SDS_TYPE_5_LEN(f) ((f)>>SDS_TYPE_BITS) static inline size_t sdslen(const char *s) { @@ -121,6 +133,8 @@ static inline size_t sdslen(const char *s) { return SDS_HDR(32,s)->len; case SDS_TYPE_64: return SDS_HDR(64,s)->len; + case SDS_TYPE_REFCOUNTED: + return SDS_HDR_REFCOUNTED(s)->len; } } return 0; @@ -148,6 +162,9 @@ static inline size_t sdsavail(const char * s) { SDS_HDR_VAR(64,s); return sh->alloc - sh->len; } + case SDS_TYPE_REFCOUNTED: { + return 0; // immutable + } } return 0; } @@ -173,6 +190,9 @@ static inline void sdssetlen(sds s, size_t newlen) { case SDS_TYPE_64: SDS_HDR(64,s)->len = newlen; break; + case SDS_TYPE_REFCOUNTED: + SDS_HDR_REFCOUNTED(s)->len = newlen; + break; } } @@ -198,6 +218,9 @@ static inline void sdsinclen(sds s, size_t inc) { case SDS_TYPE_64: SDS_HDR(64,s)->len += inc; break; + case SDS_TYPE_REFCOUNTED: + SDS_HDR_REFCOUNTED(s)->len += inc; + break; } } @@ -215,6 +238,8 @@ static inline size_t sdsalloc(const sds s) { return SDS_HDR(32,s)->alloc; case SDS_TYPE_64: return SDS_HDR(64,s)->alloc; + case SDS_TYPE_REFCOUNTED: + return SDS_HDR_REFCOUNTED(s)->len; } return 0; } @@ -237,13 +262,22 @@ static inline void sdssetalloc(sds s, size_t newlen) { case SDS_TYPE_64: SDS_HDR(64,s)->alloc = newlen; break; + case SDS_TYPE_REFCOUNTED: + break; } } -sds sdsnewlen(const void *init, size_t initlen); +static inline int sdsisshared(const char *s) +{ + unsigned char flags = s[-1]; + return ((flags & SDS_TYPE_MASK) == SDS_TYPE_REFCOUNTED); +} + +sds sdsnewlen(const void *init, ssize_t initlen); sds sdsnew(const char *init); sds sdsempty(void); sds sdsdup(const char *s); +sds sdsdupshared(const char *s); void sdsfree(const char *s); sds sdsgrowzero(sds s, size_t len); sds sdscatlen(sds s, const void *t, size_t len); @@ -265,7 +299,7 @@ sds sdstrim(sds s, const char *cset); void sdsrange(sds s, ssize_t start, ssize_t end); void sdsupdatelen(sds s); void sdsclear(sds s); -int sdscmp(const sds s1, const sds s2); +int sdscmp(const char *s1, const char *s2); sds *sdssplitlen(const char *s, ssize_t len, const char *sep, int seplen, int *count); void sdsfreesplitres(sds *tokens, int count); void sdstolower(sds s); @@ -298,6 +332,106 @@ int sdsTest(int argc, char *argv[]); #ifdef __cplusplus } + +class sdsview +{ +protected: + sds m_str = nullptr; + + sdsview() = default; // Not allowed to create a sdsview directly with a nullptr +public: + sdsview(sds str) + : m_str(str) + {} + + sdsview(const char *str) + : m_str((sds)str) + {} + + bool operator<(const sdsview &other) const + { + return sdscmp(m_str, other.m_str) < 0; + } + + bool operator==(const sdsview &other) const + { + return sdscmp(m_str, other.m_str) == 0; + } + + bool operator==(const char *other) const + { + return sdscmp(m_str, other) == 0; + } + + char operator[](size_t idx) const + { + return m_str[idx]; + } + + size_t size() const + { + return sdslen(m_str); + } + + const char *get() const { return m_str; } + + explicit operator const char*() const { return m_str; } +}; + +class sdsstring : public sdsview +{ +public: + sdsstring() = default; + explicit sdsstring(sds str) + : sdsview(str) + {} + + sdsstring(const sdsstring &other) + : sdsview(sdsdup(other.m_str)) + {} + + sdsstring(sdsstring &&other) + : sdsview(other.m_str) + { + other.m_str = nullptr; + } + + ~sdsstring() + { + sdsfree(m_str); + } +}; + +class sdsimmutablestring : public sdsstring +{ +public: + sdsimmutablestring() = default; + explicit sdsimmutablestring(sds str) + : sdsstring(str) + {} + + explicit sdsimmutablestring(const char *str) + : sdsstring((sds)str) + {} + + sdsimmutablestring(const sdsimmutablestring &other) + : sdsstring(sdsdupshared(other.m_str)) + {} + + sdsimmutablestring(sdsimmutablestring &&other) + : sdsstring(other.m_str) + { + other.m_str = nullptr; + } + + auto &operator=(const sdsimmutablestring &other) + { + sdsfree(m_str); + m_str = sdsdupshared(other.m_str); + return *this; + } +}; + #endif #endif From 85c8fc72b7cda2b3da44712dce383cd58bdc2095 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 1 Jan 2020 20:41:17 -0500 Subject: [PATCH 3/9] Fix issue where expire is lost when performing a defrag Former-commit-id: aea333bb78fafabbddb340dfd4c232c2e207cfba --- src/defrag.cpp | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/defrag.cpp b/src/defrag.cpp index c49cd2665..1b5596609 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); -void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); +bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -407,7 +407,7 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } -void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { +bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { auto itr = set.find(oldkey); if (itr != set.end()) { @@ -415,7 +415,10 @@ void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { eNew.setKeyUnsafe(newkey); set.erase(itr); set.insert(eNew); + serverAssert(set.find(newkey) != set.end()); + return true; } + return false; } long activeDefragQuickListNodes(quicklist *ql) { @@ -777,16 +780,22 @@ long defragKey(redisDb *db, dictEntry *de) { long defragged = 0; sds newsds; + ob = (robj*)dictGetVal(de); + /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); if (newsds) + { defragged++, de->key = newsds; - if (!db->setexpire->empty()) { - replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds); + if (!db->setexpire->empty()) { + bool fReplaced = replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds); + serverAssert(fReplaced == ob->FExpires()); + } else { + serverAssert(!ob->FExpires()); + } } /* Try to defrag robj and / or string value. */ - ob = (robj*)dictGetVal(de); if ((newob = activeDefragStringOb(ob, &defragged))) { de->v.val = newob; ob = newob; @@ -839,6 +848,7 @@ long defragKey(redisDb *db, dictEntry *de) { } else { serverPanic("Unknown object type"); } + return defragged; } From 6ab3e82e450afa3b8c66bdb311b8a9ec2ef66c5f Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 2 Jan 2020 15:36:02 -0500 Subject: [PATCH 4/9] Drop severity of master disconnect log when multimaster is enabled Former-commit-id: edb993d52b25c30392c6eb1e60896498f991a223 --- src/replication.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/replication.cpp b/src/replication.cpp index ec2bbea63..b8236420d 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2298,7 +2298,8 @@ int connectWithMaster(redisMaster *mi) { fd = anetTcpNonBlockBestEffortBindConnect(NULL, mi->masterhost,mi->masterport,NET_FIRST_BIND_ADDR); if (fd == -1) { - serverLog(LL_WARNING,"Unable to connect to MASTER: %s", + int sev = g_pserver->enable_multimaster ? LL_NOTICE : LL_WARNING; // with multimaster its not unheard of to intentiallionall have downed masters + serverLog(sev,"Unable to connect to MASTER: %s", strerror(errno)); return C_ERR; } From ce0fde973a03c9bbbfa06232a93484c20578299d Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 3 Jan 2020 15:53:36 -0500 Subject: [PATCH 5/9] Add support for storing expirations in FLASH Former-commit-id: 1dca07bd564042fce1b01d275641f35b918ae557 --- src/db.cpp | 118 +++++++++++++++++++++++++++++++++++++++++++++-- src/object.cpp | 16 +++++-- src/sds.c | 3 ++ src/server.h | 10 +++- src/snapshot.cpp | 8 +++- 5 files changed, 146 insertions(+), 9 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 65cec482f..4d14d35a1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -42,6 +42,9 @@ int keyIsExpired(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); +std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); +sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); + /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ @@ -565,6 +568,11 @@ void flushdbCommand(client *c) { { if (!strcasecmp(szFromObj(c->argv[1]), "cache")) { + if (g_pserver->m_pstorageFactory == nullptr) + { + addReplyError(c, "Cannot flush cache without a storage provider set"); + return; + } c->db->removeAllCachedValues(); addReply(c,shared.ok); return; @@ -587,6 +595,11 @@ void flushallCommand(client *c) { { if (!strcasecmp(szFromObj(c->argv[1]), "cache")) { + if (g_pserver->m_pstorageFactory == nullptr) + { + addReplyError(c, "Cannot flush cache without a storage provider set"); + return; + } for (int idb = 0; idb < cserver.dbnum; ++idb) g_pserver->db[idb]->removeAllCachedValues(); addReply(c,shared.ok); @@ -2100,9 +2113,24 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database { m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){ - robj *o = deserializeStoredObject(this, sdsKey, data, cb); + size_t offset = 0; + sds sdsNewKey = sdsdupshared(sdsKey); + auto spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); + robj *o = deserializeStoredObject(this, sdsKey, reinterpret_cast(data) + offset, cb - offset); serverAssert(o != nullptr); - dictAdd(m_pdict, sdsdupshared(sdsKey), o); + dictAdd(m_pdict, sdsNewKey, o); + + o->SetFExpires(spexpire != nullptr); + + if (spexpire != nullptr) + { + auto itr = m_setexpire->find(sdsKey); + if (itr != m_setexpire->end()) + m_setexpire->erase(itr); + m_setexpire->insert(std::move(*spexpire)); + serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); + } + serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); }); *pde = dictFind(m_pdict, sdsKey); } @@ -2117,7 +2145,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite) { - sds temp = serializeStoredObject(o); + sds temp = serializeStoredObjectAndExpire(this, szKey, o); m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite); sdsfree(temp); } @@ -2162,7 +2190,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() if (de == nullptr) continue; robj *o = (robj*)dictGetVal(de); - sds temp = serializeStoredObject(o); + sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); } } @@ -2273,4 +2301,86 @@ void redisDbPersistentData::trackkey(const char *key, bool fUpdate) ++m_cnewKeysPending; } } +} + +sds serializeExpire(const expireEntry *pexpire) +{ + sds str = sdsnewlen(nullptr, sizeof(unsigned)); + + if (pexpire == nullptr) + { + unsigned zero = 0; + memcpy(str, &zero, sizeof(unsigned)); + return str; + } + + auto &e = *pexpire; + unsigned celem = (unsigned)e.size(); + memcpy(str, &celem, sizeof(unsigned)); + + for (auto itr = e.begin(); itr != e.end(); ++itr) + { + unsigned subkeylen = itr.subkey() ? (unsigned)sdslen(itr.subkey()) : 0; + size_t strOffset = sdslen(str); + str = sdsgrowzero(str, sdslen(str) + sizeof(unsigned) + subkeylen + sizeof(long long)); + memcpy(str + strOffset, &subkeylen, sizeof(unsigned)); + if (itr.subkey()) + memcpy(str + strOffset + sizeof(unsigned), itr.subkey(), subkeylen); + long long when = itr.when(); + memcpy(str + strOffset + sizeof(unsigned) + subkeylen, &when, sizeof(when)); + } + return str; +} + +std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset) +{ + unsigned celem; + if (cch < sizeof(unsigned)) + throw "Corrupt expire entry"; + memcpy(&celem, str, sizeof(unsigned)); + std::unique_ptr spexpire; + + size_t offset = sizeof(unsigned); + for (; celem > 0; --celem) + { + serverAssert(cch > (offset+sizeof(unsigned))); + + unsigned subkeylen; + memcpy(&subkeylen, str + offset, sizeof(unsigned)); + offset += sizeof(unsigned); + + sds subkey = nullptr; + if (subkeylen != 0) + { + serverAssert(cch > (offset + subkeylen)); + subkey = sdsnewlen(nullptr, subkeylen); + memcpy(subkey, str + offset, subkeylen); + offset += subkeylen; + } + + long long when; + serverAssert(cch >= (offset + sizeof(long long))); + memcpy(&when, str + offset, sizeof(long long)); + offset += sizeof(long long); + + if (spexpire == nullptr) + spexpire = std::make_unique(key, subkey, when); + else + spexpire->update(subkey, when); + } + + *poffset = offset; + return spexpire; +} + +sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o) +{ + auto itrExpire = db->setexpire()->find(key); + const expireEntry *pexpire = nullptr; + if (itrExpire != db->setexpire()->end()) + pexpire = &(*itrExpire); + + sds str = serializeExpire(pexpire); + str = serializeStoredObject(o, str); + return str; } \ No newline at end of file diff --git a/src/object.cpp b/src/object.cpp index bfbd1524f..450e202c9 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1619,20 +1619,30 @@ robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, return o; } -sds serializeStoredObject(robj_roptr o) +sds serializeStoredObject(robj_roptr o, sds sdsPrefix) { switch (o->type) { case OBJ_STRING: { - sds sdsT = sdsnewlen(nullptr, 1); - sdsT[0] = RDB_TYPE_STRING; + sds sdsT = nullptr; + if (sdsPrefix) + sdsT = sdsgrowzero(sdsPrefix, sdslen(sdsPrefix)+1); + else + sdsT = sdsnewlen(nullptr, 1); + sdsT[sdslen(sdsT)-1] = RDB_TYPE_STRING; return serializeStoredStringObject(sdsT, o); } default: rio rdb; createDumpPayload(&rdb,o,nullptr); + if (sdsPrefix) + { + sds rval = sdscatsds(sdsPrefix, (sds)rdb.io.buffer.ptr); + sdsfree((sds)rdb.io.buffer.ptr); + return rval; + } return (sds)rdb.io.buffer.ptr; } serverPanic("Attempting to store unknown object type"); diff --git a/src/sds.c b/src/sds.c index e1678a95b..d6027725f 100644 --- a/src/sds.c +++ b/src/sds.c @@ -234,6 +234,9 @@ void sdsclear(sds s) { * Note: this does not change the *length* of the sds string as returned * by sdslen(), but only the free buffer space we have. */ sds sdsMakeRoomFor(sds s, size_t addlen) { + if (s == NULL) + return sdsnewlen(NULL, addlen); + void *sh, *newsh; size_t avail = sdsavail(s); size_t len, newlen; diff --git a/src/server.h b/src/server.h index b9289f46d..8bf1d344c 100644 --- a/src/server.h +++ b/src/server.h @@ -1163,6 +1163,13 @@ public: pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); } + size_t size() const + { + if (FFat()) + return u.m_pfatentry->size(); + return 1; + } + bool FGetPrimaryExpire(long long *pwhen) const { *pwhen = -1; @@ -2633,7 +2640,8 @@ unsigned long long estimateObjectIdleTime(robj_roptr o); void trimStringObjectIfNeeded(robj *o); robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb); -sds serializeStoredObject(robj_roptr o); +std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); +sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 5b2503b82..d0f81e7ef 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -283,7 +283,13 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function(data)+offset, cbData-offset); + } fContinue = fn(sdsKey, o); if (o != nullptr) decrRefCount(o); From e49ec97f98aa8e07d4a66968faf502bd42cec1d5 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 3 Jan 2020 16:50:13 -0500 Subject: [PATCH 6/9] subkey expire testes Former-commit-id: 0cf3af6857c192bd03656c28b5a0a2bb11416b8c --- tests/unit/expire.tcl | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index de24eabed..477df0242 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -219,4 +219,37 @@ start_server {tags {"expire"}} { set ttl [r ttl foo] assert {$ttl <= 98 && $ttl > 90} } + + test { EXPIREMEMBER works (set) } { + r flushall + r sadd testkey foo bar baz + r expiremember testkey foo 1 + after 1500 + assert_equal {2} [r scard testkey] + } + + test { EXPIREMEMBER works (hash) } { + r flushall + r hset testkey foo bar + r expiremember testkey foo 1 + after 1500 + r exists testkey + } {0} + + test { EXPIREMEMBER works (zset) } { + r flushall + r zadd testkey 1 foo + r zadd testkey 2 bar + assert_equal {2} [r zcard testkey] + r expiremember testkey foo 1 + after 1500 + assert_equal {1} [r zcard testkey] + } + + test { TTL for subkey expires works } { + r flushall + r sadd testkey foo bar baz + r expiremember testkey foo 10000 + assert [expr [r ttl testkey foo] > 0] + } } From efdf09be36e16daccd349fda96584be7d6f8c292 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 3 Jan 2020 17:06:07 -0500 Subject: [PATCH 7/9] Fix crash with subkey expire Former-commit-id: 8e1d416714484ff6ff4242c5d9a24b1458bbfb7b --- src/sds.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sds.c b/src/sds.c index d6027725f..10bab6089 100644 --- a/src/sds.c +++ b/src/sds.c @@ -178,6 +178,8 @@ sds sdsdup(const char *s) { } sds sdsdupshared(const char *s) { + if (s == NULL) + return NULL; unsigned char flags = s[-1]; if ((flags & SDS_TYPE_MASK) != SDS_TYPE_REFCOUNTED) return sdsnewlen(s, -sdslen(s)); From 2109d8972eb29f62bbcc24a90b87e3c0389facde Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 3 Jan 2020 17:11:23 -0500 Subject: [PATCH 8/9] Additional flash tests Former-commit-id: 3f9b1a35821cb3a3bf82aabb180c13a9eddf4e93 --- tests/unit/flash.tcl | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/unit/flash.tcl b/tests/unit/flash.tcl index 75be4d58f..d342baecf 100644 --- a/tests/unit/flash.tcl +++ b/tests/unit/flash.tcl @@ -67,6 +67,38 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks. assert_equal {0} [r dbsize] } + test { SUBKEY EXPIRE persists after cache flush } { + r flushall + r sadd testkey foo bar baz + r expiremember testkey foo 10000 + r flushall cache + assert [expr [r ttl testkey foo] > 0] + } + + test { LIST pop works after flushing cache } { + r flushall + r lpush testkey foo + r flushall cache + assert_equal {foo} [r lpop testkey] + } + + test { DIGEST string the same after flushing cache } { + r flushall + r set testkey foo + r set testkey1 foo ex 10000 + set expectedDigest [r debug digest] + r flushall cache + assert_equal $expectedDigest [r debug digest] + } + + test { DIGEST list the same after flushing cache } { + r flushall + r lpush testkey foo bar + set expectedDigest [r debug digest] + r flushall cache + assert_equal $expectedDigest [r debug digest] + } + r flushall foreach policy { allkeys-random allkeys-lru allkeys-lfu From 78924a295e81413a1e91ef31e56a04bd896a9291 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 4 Jan 2020 17:15:06 -0500 Subject: [PATCH 9/9] Enforce seperate license keys for connected replicas Former-commit-id: bc005cb50b1010a2bc9170e261cd93dba849c35f --- src/replication.cpp | 46 +++++++++++++++++++++++++++++++++++++++++++++ src/server.h | 22 ++++++++++++---------- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 730746081..44fa91452 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -985,6 +985,19 @@ LError: return; } +void processReplconfLicense(client *c, robj *arg) +{ + if (cserver.license_key != nullptr) + { + if (strcmp(cserver.license_key, szFromObj(arg)) == 0) { + addReplyError(c, "Each replica must have a unique license key"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } + } + addReply(c, shared.ok); +} + /* REPLCONF