Merge branch 'keydbpro' of https://github.com/JohnSully/KeyDB-Pro into keydbpro

Former-commit-id: 3d63d38aae762c60a22154109827ebfd46aae2a5
This commit is contained in:
John Sully 2020-01-05 20:19:55 -05:00
commit 64c07168d7
11 changed files with 290 additions and 30 deletions

View File

@ -42,6 +42,9 @@
int keyIsExpired(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key, robj *o);
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset);
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o);
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
@ -565,6 +568,11 @@ void flushdbCommand(client *c) {
{
if (!strcasecmp(szFromObj(c->argv[1]), "cache"))
{
if (g_pserver->m_pstorageFactory == nullptr)
{
addReplyError(c, "Cannot flush cache without a storage provider set");
return;
}
c->db->removeAllCachedValues();
addReply(c,shared.ok);
return;
@ -587,6 +595,11 @@ void flushallCommand(client *c) {
{
if (!strcasecmp(szFromObj(c->argv[1]), "cache"))
{
if (g_pserver->m_pstorageFactory == nullptr)
{
addReplyError(c, "Cannot flush cache without a storage provider set");
return;
}
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->removeAllCachedValues();
addReply(c,shared.ok);
@ -2100,9 +2113,24 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database
{
m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(this, sdsKey, data, cb);
size_t offset = 0;
sds sdsNewKey = sdsdupshared(sdsKey);
auto spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset);
robj *o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr);
dictAdd(m_pdict, sdsdupshared(sdsKey), o);
dictAdd(m_pdict, sdsNewKey, o);
o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr)
{
auto itr = m_setexpire->find(sdsKey);
if (itr != m_setexpire->end())
m_setexpire->erase(itr);
m_setexpire->insert(std::move(*spexpire));
serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end()));
});
*pde = dictFind(m_pdict, sdsKey);
}
@ -2117,7 +2145,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite)
{
sds temp = serializeStoredObject(o);
sds temp = serializeStoredObjectAndExpire(this, szKey, o);
m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite);
sdsfree(temp);
}
@ -2162,7 +2190,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
if (de == nullptr)
continue;
robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObject(o);
sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o);
vecRet.emplace_back(std::move(change), unique_sds_ptr(temp));
}
}
@ -2273,4 +2301,86 @@ void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
++m_cnewKeysPending;
}
}
}
sds serializeExpire(const expireEntry *pexpire)
{
sds str = sdsnewlen(nullptr, sizeof(unsigned));
if (pexpire == nullptr)
{
unsigned zero = 0;
memcpy(str, &zero, sizeof(unsigned));
return str;
}
auto &e = *pexpire;
unsigned celem = (unsigned)e.size();
memcpy(str, &celem, sizeof(unsigned));
for (auto itr = e.begin(); itr != e.end(); ++itr)
{
unsigned subkeylen = itr.subkey() ? (unsigned)sdslen(itr.subkey()) : 0;
size_t strOffset = sdslen(str);
str = sdsgrowzero(str, sdslen(str) + sizeof(unsigned) + subkeylen + sizeof(long long));
memcpy(str + strOffset, &subkeylen, sizeof(unsigned));
if (itr.subkey())
memcpy(str + strOffset + sizeof(unsigned), itr.subkey(), subkeylen);
long long when = itr.when();
memcpy(str + strOffset + sizeof(unsigned) + subkeylen, &when, sizeof(when));
}
return str;
}
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset)
{
unsigned celem;
if (cch < sizeof(unsigned))
throw "Corrupt expire entry";
memcpy(&celem, str, sizeof(unsigned));
std::unique_ptr<expireEntry> spexpire;
size_t offset = sizeof(unsigned);
for (; celem > 0; --celem)
{
serverAssert(cch > (offset+sizeof(unsigned)));
unsigned subkeylen;
memcpy(&subkeylen, str + offset, sizeof(unsigned));
offset += sizeof(unsigned);
sds subkey = nullptr;
if (subkeylen != 0)
{
serverAssert(cch > (offset + subkeylen));
subkey = sdsnewlen(nullptr, subkeylen);
memcpy(subkey, str + offset, subkeylen);
offset += subkeylen;
}
long long when;
serverAssert(cch >= (offset + sizeof(long long)));
memcpy(&when, str + offset, sizeof(long long));
offset += sizeof(long long);
if (spexpire == nullptr)
spexpire = std::make_unique<expireEntry>(key, subkey, when);
else
spexpire->update(subkey, when);
}
*poffset = offset;
return spexpire;
}
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o)
{
auto itrExpire = db->setexpire()->find(key);
const expireEntry *pexpire = nullptr;
if (itrExpire != db->setexpire()->end())
pexpire = &(*itrExpire);
sds str = serializeExpire(pexpire);
str = serializeStoredObject(o, str);
return str;
}

View File

@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
/* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
/* Defrag helper for generic allocations.
*
@ -407,7 +407,7 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL;
}
void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
auto itr = set.find(oldkey);
if (itr != set.end())
{
@ -415,7 +415,10 @@ void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
eNew.setKeyUnsafe(newkey);
set.erase(itr);
set.insert(eNew);
serverAssert(set.find(newkey) != set.end());
return true;
}
return false;
}
long activeDefragQuickListNodes(quicklist *ql) {
@ -777,18 +780,21 @@ long defragKey(redisDb *db, dictEntry *de) {
long defragged = 0;
sds newsds;
ob = (robj*)dictGetVal(de);
/* Try to defrag the key name. */
newsds = activeDefragSds(keysds);
if (newsds)
{
defragged++, de->key = newsds;
if (!db->setexpire()->empty()) {
replaceSateliteOSetKeyPtr(*const_cast<expireset*>(db->setexpire()), keysds, newsds);
if (!db->setexpire()->empty()) {
bool fReplaced = replaceSateliteOSetKeyPtr(*const_cast<expireset*>(db->setexpire()), keysds, newsds);
serverAssert(fReplaced == ob->FExpires());
} else {
serverAssert(!ob->FExpires());
}
}
/* Try to defrag robj and / or string value. */
ob = (robj*)dictGetVal(de);
if (ob == nullptr)
return defragged;
if ((newob = activeDefragStringOb(ob, &defragged))) {
de->v.val = newob;
ob = newob;
@ -841,6 +847,7 @@ long defragKey(redisDb *db, dictEntry *de) {
} else {
serverPanic("Unknown object type");
}
return defragged;
}

View File

@ -293,7 +293,7 @@ extern "C" void fastlock_lock(struct fastlock *lock)
#elif defined(__arm__)
__asm__ __volatile__ ("yield");
#endif
if ((++cloops % 0x10000) == 0)
if ((++cloops % 0x100000) == 0)
{
fastlock_sleep(lock, tid, ticketT.u, mask);
}

View File

@ -45,7 +45,7 @@ fastlock_lock:
cmp dx, ax # is our ticket up?
je .LLocked # leave the loop
pause
add ecx, 0x10000 # Have we been waiting a long time? (oflow if we have)
add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have)
# 1000h is set so we overflow on the 1024*1024'th iteration (like the C code)
jnc .LLoop # If so, give up our timeslice to someone who's doing real work
# Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop"

View File

@ -1619,20 +1619,30 @@ robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key,
return o;
}
sds serializeStoredObject(robj_roptr o)
sds serializeStoredObject(robj_roptr o, sds sdsPrefix)
{
switch (o->type)
{
case OBJ_STRING:
{
sds sdsT = sdsnewlen(nullptr, 1);
sdsT[0] = RDB_TYPE_STRING;
sds sdsT = nullptr;
if (sdsPrefix)
sdsT = sdsgrowzero(sdsPrefix, sdslen(sdsPrefix)+1);
else
sdsT = sdsnewlen(nullptr, 1);
sdsT[sdslen(sdsT)-1] = RDB_TYPE_STRING;
return serializeStoredStringObject(sdsT, o);
}
default:
rio rdb;
createDumpPayload(&rdb,o,nullptr);
if (sdsPrefix)
{
sds rval = sdscatsds(sdsPrefix, (sds)rdb.io.buffer.ptr);
sdsfree((sds)rdb.io.buffer.ptr);
return rval;
}
return (sds)rdb.io.buffer.ptr;
}
serverPanic("Attempting to store unknown object type");

View File

@ -985,6 +985,19 @@ LError:
return;
}
void processReplconfLicense(client *c, robj *arg)
{
if (cserver.license_key != nullptr)
{
if (strcmp(cserver.license_key, szFromObj(arg)) == 0) {
addReplyError(c, "Each replica must have a unique license key");
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return;
}
}
addReply(c, shared.ok);
}
/* REPLCONF <option> <value> <option> <value> ...
* This command is used by a replica in order to configure the replication
* process before starting it with the SYNC command.
@ -1067,6 +1080,9 @@ void replconfCommand(client *c) {
/* REPLCONF uuid is used to set and send the UUID of each host */
processReplconfUuid(c, c->argv[j+1]);
return; // the process function replies to the client for both error and success
} else if (!strcasecmp(szFromObj(c->argv[j]),"license")) {
processReplconfLicense(c, c->argv[j+1]);
return;
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)ptrFromObj(c->argv[j]));
@ -2072,6 +2088,36 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
}
}
sdsfree(err);
mi->repl_state = REPL_STATE_SEND_KEY;
// fallthrough
}
/* Send LICENSE Key */
if (mi->repl_state == REPL_STATE_SEND_KEY)
{
if (cserver.license_key == nullptr)
{
mi->repl_state = REPL_STATE_SEND_PORT;
}
else
{
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF","license",cserver.license_key,NULL);
if (err) goto write_error;
mi->repl_state = REPL_STATE_KEY_ACK;
return;
}
}
/* LICENSE Key Ack */
if (mi->repl_state == REPL_STATE_KEY_ACK)
{
err = sendSynchronousCommand(mi, SYNC_CMD_READ,fd,NULL);
if (err[0] == '-') {
serverLog(LL_WARNING, "Recieved error from client: %s", err);
sdsfree(err);
goto error;
}
sdsfree(err);
mi->repl_state = REPL_STATE_SEND_PORT;
// fallthrough
}
@ -2297,7 +2343,8 @@ int connectWithMaster(redisMaster *mi) {
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
mi->masterhost,mi->masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
int sev = g_pserver->enable_multimaster ? LL_NOTICE : LL_WARNING; // with multimaster its not unheard of to intentiallionall have downed masters
serverLog(sev,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}

View File

@ -178,6 +178,8 @@ sds sdsdup(const char *s) {
}
sds sdsdupshared(const char *s) {
if (s == NULL)
return NULL;
unsigned char flags = s[-1];
if ((flags & SDS_TYPE_MASK) != SDS_TYPE_REFCOUNTED)
return sdsnewlen(s, -sdslen(s));
@ -234,6 +236,9 @@ void sdsclear(sds s) {
* Note: this does not change the *length* of the sds string as returned
* by sdslen(), but only the free buffer space we have. */
sds sdsMakeRoomFor(sds s, size_t addlen) {
if (s == NULL)
return sdsnewlen(NULL, addlen);
void *sh, *newsh;
size_t avail = sdsavail(s);
size_t len, newlen;

View File

@ -554,17 +554,19 @@ public:
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */
#define REPL_STATE_SEND_UUID 6 /* send our UUID */
#define REPL_STATE_RECEIVE_UUID 7 /* they should ack with their UUID */
#define REPL_STATE_SEND_PORT 8 /* Send REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 9 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_IP 10 /* Send REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 11 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_CAPA 12 /* Send REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 13 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 14 /* Send PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 15 /* Wait for PSYNC reply */
#define REPL_STATE_SEND_KEY 8
#define REPL_STATE_KEY_ACK 9
#define REPL_STATE_SEND_PORT 10 /* Send REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 11 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_IP 12 /* Send REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 13 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_CAPA 14 /* Send REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 15 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 16 /* Send PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 17 /* Wait for PSYNC reply */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 16 /* Receiving .rdb from master */
#define REPL_STATE_CONNECTED 17 /* Connected to master */
#define REPL_STATE_TRANSFER 18 /* Receiving .rdb from master */
#define REPL_STATE_CONNECTED 19 /* Connected to master */
/* State of slaves from the POV of the master. Used in client->replstate.
* In SEND_BULK and ONLINE state the replica receives new updates
@ -1163,6 +1165,13 @@ public:
pfatentry()->m_vecexpireEntries.begin() + itr.m_idx);
}
size_t size() const
{
if (FFat())
return u.m_pfatentry->size();
return 1;
}
bool FGetPrimaryExpire(long long *pwhen) const
{
*pwhen = -1;
@ -2633,7 +2642,8 @@ unsigned long long estimateObjectIdleTime(robj_roptr o);
void trimStringObjectIfNeeded(robj *o);
robj *deserializeStoredObject(const redisDbPersistentData *db, const char *key, const void *data, size_t cb);
sds serializeStoredObject(robj_roptr o);
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset);
sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr);
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)

View File

@ -283,7 +283,13 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
bool fContinue = true;
if (de == nullptr)
{
robj *o = fKeyOnly ? nullptr : deserializeStoredObject(this, sdsKey, data, cbData);
robj *o = nullptr;
if (!fKeyOnly)
{
size_t offset = 0;
deserializeExpire(sdsKey, (const char*)data, cbData, &offset);
o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data)+offset, cbData-offset);
}
fContinue = fn(sdsKey, o);
if (o != nullptr)
decrRefCount(o);

View File

@ -219,4 +219,37 @@ start_server {tags {"expire"}} {
set ttl [r ttl foo]
assert {$ttl <= 98 && $ttl > 90}
}
test { EXPIREMEMBER works (set) } {
r flushall
r sadd testkey foo bar baz
r expiremember testkey foo 1
after 1500
assert_equal {2} [r scard testkey]
}
test { EXPIREMEMBER works (hash) } {
r flushall
r hset testkey foo bar
r expiremember testkey foo 1
after 1500
r exists testkey
} {0}
test { EXPIREMEMBER works (zset) } {
r flushall
r zadd testkey 1 foo
r zadd testkey 2 bar
assert_equal {2} [r zcard testkey]
r expiremember testkey foo 1
after 1500
assert_equal {1} [r zcard testkey]
}
test { TTL for subkey expires works } {
r flushall
r sadd testkey foo bar baz
r expiremember testkey foo 10000
assert [expr [r ttl testkey foo] > 0]
}
}

View File

@ -67,6 +67,38 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
assert_equal {0} [r dbsize]
}
test { SUBKEY EXPIRE persists after cache flush } {
r flushall
r sadd testkey foo bar baz
r expiremember testkey foo 10000
r flushall cache
assert [expr [r ttl testkey foo] > 0]
}
test { LIST pop works after flushing cache } {
r flushall
r lpush testkey foo
r flushall cache
assert_equal {foo} [r lpop testkey]
}
test { DIGEST string the same after flushing cache } {
r flushall
r set testkey foo
r set testkey1 foo ex 10000
set expectedDigest [r debug digest]
r flushall cache
assert_equal $expectedDigest [r debug digest]
}
test { DIGEST list the same after flushing cache } {
r flushall
r lpush testkey foo bar
set expectedDigest [r debug digest]
r flushall cache
assert_equal $expectedDigest [r debug digest]
}
r flushall
foreach policy {
allkeys-random allkeys-lru allkeys-lfu