From f88592451d0fdbcadff537c830f11bdf13caeda1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 14 Jul 2019 00:23:31 -0400 Subject: [PATCH] Plumb support for sub expires to all expire related code Former-commit-id: 184abac6942a9a6aa8783741b50b23210afddcc5 --- src/aof.cpp | 25 ++++++++++---- src/cluster.cpp | 5 ++- src/db.cpp | 87 ++++++++++++++++++++++++++++++++++++++++--------- src/debug.cpp | 6 +++- src/defrag.cpp | 4 +-- src/expire.cpp | 7 ++-- src/module.cpp | 6 +++- src/rdb.cpp | 28 +++++++++++----- src/server.h | 71 ++++++++++++++++++++++++++++++++++++++-- 9 files changed, 198 insertions(+), 41 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index c7160489b..5c6385c84 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1321,13 +1321,12 @@ int rewriteAppendOnlyFileRio(rio *aof) { while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; - long long expiretime; keystr = (sds)dictGetKey(de); o = (robj*)dictGetVal(de); initStaticStringObject(key,keystr); - expiretime = getExpire(db,&key); + expireEntry *pexpire = getExpire(db,&key); /* Save the key and associated value */ if (o->type == OBJ_STRING) { @@ -1353,11 +1352,23 @@ int rewriteAppendOnlyFileRio(rio *aof) { serverPanic("Unknown object type"); } /* Save the expire time */ - if (expiretime != -1) { - char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; + if (pexpire != nullptr) { + for (auto &subExpire : *pexpire) { + if (subExpire.subkey() == nullptr) + { + char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + } + else + { + char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n"; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr; + } + if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) goto werr; // common + } } /* Read some diff from the parent process from time to time. */ if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { diff --git a/src/cluster.cpp b/src/cluster.cpp index 8978f184b..619ce3b3a 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -5194,7 +5194,10 @@ try_again: /* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; - long long expireat = getExpire(c->db,kv[j]); + expireEntry *pexpire = getExpire(c->db,kv[j]); + long long expireat = -1; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expireat); if (expireat != -1) { ttl = expireat-mstime(); diff --git a/src/db.cpp b/src/db.cpp index 40e44a7c1..a3dec518a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -976,7 +976,6 @@ void shutdownCommand(client *c) { void renameGenericCommand(client *c, int nx) { robj *o; - long long expire; int samekey = 0; /* When source and dest key is the same, no operation is performed, @@ -992,7 +991,15 @@ void renameGenericCommand(client *c, int nx) { } incrRefCount(o); - expire = getExpire(c->db,c->argv[1]); + + std::unique_ptr spexpire; + + { // scope pexpireOld since it will be invalid soon + expireEntry *pexpireOld = getExpire(c->db,c->argv[1]); + if (pexpireOld != nullptr) + spexpire = std::make_unique(std::move(*pexpireOld)); + } + if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { if (nx) { decrRefCount(o); @@ -1005,8 +1012,8 @@ void renameGenericCommand(client *c, int nx) { } dbDelete(c->db,c->argv[1]); dbAdd(c->db,c->argv[2],o); - if (expire != -1) - setExpire(c,c->db,c->argv[2],nullptr,expire); + if (spexpire != nullptr) + setExpire(c,c->db,c->argv[2],std::move(*spexpire)); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", @@ -1029,7 +1036,7 @@ void moveCommand(client *c) { robj *o; redisDb *src, *dst; int srcid; - long long dbid, expire; + long long dbid; if (g_pserver->cluster_enabled) { addReplyError(c,"MOVE is not allowed in cluster mode"); @@ -1063,7 +1070,13 @@ void moveCommand(client *c) { addReply(c,shared.czero); return; } - expire = getExpire(c->db,c->argv[1]); + + std::unique_ptr spexpire; + { // scope pexpireOld + expireEntry *pexpireOld = getExpire(c->db,c->argv[1]); + if (pexpireOld != nullptr) + spexpire = std::make_unique(std::move(*pexpireOld)); + } if (o->FExpires()) removeExpire(c->db,c->argv[1]); serverAssert(!o->FExpires()); @@ -1077,7 +1090,7 @@ void moveCommand(client *c) { return; } dbAdd(dst,c->argv[1],o); - if (expire != -1) setExpire(c,dst,c->argv[1],nullptr,expire); + if (spexpire != nullptr) setExpire(c,dst,c->argv[1],std::move(*spexpire)); addReply(c,shared.cone); } @@ -1251,24 +1264,53 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) rememberSlaveKeyWithExpire(db,key); } -/* Return the expire time of the specified key, or -1 if no expire +void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) +{ + dictEntry *kde; + + serverAssert(GlobalLocksAcquired()); + + /* Reuse the sds from the main dict in the expire dict */ + kde = dictFind(db->pdict,ptrFromObj(key)); + serverAssertWithInfo(NULL,key,kde != NULL); + + if (((robj*)dictGetVal(kde))->refcount == OBJ_SHARED_REFCOUNT) + { + // shared objects cannot have the expire bit set, create a real object + dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + } + + if (((robj*)dictGetVal(kde))->FExpires()) + removeExpire(db, key); + + e.setKeyUnsafe((sds)dictGetKey(kde)); + db->setexpire->insert(e); + ((robj*)dictGetVal(kde))->SetFExpires(true); + + + int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; + if (c && writable_slave && !(c->flags & CLIENT_MASTER)) + rememberSlaveKeyWithExpire(db,key); +} + +/* Return the expire time of the specified key, or null if no expire * is associated with this key (i.e. the key is non volatile) */ -long long getExpire(redisDb *db, robj_roptr key) { +expireEntry *getExpire(redisDb *db, robj_roptr key) { dictEntry *de; /* No expire? return ASAP */ if (db->setexpire->size() == 0) - return -1; + return nullptr; de = dictFind(db->pdict, ptrFromObj(key)); if (de == NULL) - return -1; + return nullptr; robj *obj = (robj*)dictGetVal(de); if (!obj->FExpires()) - return -1; + return nullptr; auto itr = db->setexpire->find((sds)dictGetKey(de)); - return itr->when(); + return itr.operator->(); } /* Propagate expires into slaves and the AOF file. @@ -1296,15 +1338,28 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { decrRefCount(argv[1]); } -/* Check if the key is expired. */ +/* Check if the key is expired. Note, this does not check subexpires */ int keyIsExpired(redisDb *db, robj *key) { - mstime_t when = getExpire(db,key); + expireEntry *pexpire = getExpire(db,key); - if (when < 0) return 0; /* No expire for this key */ + if (pexpire == nullptr) return 0; /* No expire for this key */ /* Don't expire anything while loading. It will be done later. */ if (g_pserver->loading) return 0; + long long when = -1; + for (auto &exp : *pexpire) + { + if (exp.subkey() == nullptr) + { + when = exp.when(); + break; + } + } + + if (when == -1) + return 0; + /* If we are in the context of a Lua script, we pretend that time is * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the diff --git a/src/debug.cpp b/src/debug.cpp index 4d2f4bbca..41c73b55c 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -124,9 +124,13 @@ void mixStringObjectDigest(unsigned char *digest, robj_roptr o) { void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) { uint32_t aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); - long long expiretime = getExpire(db,keyobj); + expireEntry *pexpire = getExpire(db,keyobj); + long long expiretime = -1; char buf[128]; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expiretime); + /* Save the key and associated value */ if (o->type == OBJ_STRING) { mixStringObjectDigest(digest,o); diff --git a/src/defrag.cpp b/src/defrag.cpp index a6acb8e72..c49cd2665 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -409,10 +409,10 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { auto itr = set.find(oldkey); - serverAssert(false); if (itr != set.end()) { - expireEntry eNew(newkey, nullptr, itr->when()); + expireEntry eNew(std::move(*itr)); + eNew.setKeyUnsafe(newkey); set.erase(itr); set.insert(eNew); } diff --git a/src/expire.cpp b/src/expire.cpp index c10047d2c..5d257428d 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -535,7 +535,7 @@ void pexpireatCommand(client *c) { /* Implements TTL and PTTL */ void ttlGenericCommand(client *c, int output_ms) { - long long expire, ttl = -1; + long long expire = -1, ttl = -1; /* If the key does not exist at all, return -2 */ if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == nullptr) { @@ -544,7 +544,10 @@ void ttlGenericCommand(client *c, int output_ms) { } /* The key exists. Return -1 if it has no expire, or the actual * TTL value otherwise. */ - expire = getExpire(c->db,c->argv[1]); + expireEntry *pexpire = getExpire(c->db,c->argv[1]); + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expire); + if (expire != -1) { ttl = expire-mstime(); if (ttl < 0) ttl = 0; diff --git a/src/module.cpp b/src/module.cpp index 3ef824a7f..052c8744a 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -1644,7 +1644,11 @@ int RM_UnlinkKey(RedisModuleKey *key) { * If no TTL is associated with the key or if the key is empty, * REDISMODULE_NO_EXPIRE is returned. */ mstime_t RM_GetExpire(RedisModuleKey *key) { - mstime_t expire = getExpire(key->db,key->key); + expireEntry *pexpire = getExpire(key->db,key->key); + mstime_t expire = -1; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expire); + if (expire == -1 || key->value == NULL) return -1; expire -= mstime(); return expire >= 0 ? expire : 0; diff --git a/src/rdb.cpp b/src/rdb.cpp index c1b15e2ca..bed797305 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1031,12 +1031,13 @@ size_t rdbSavedObjectLen(robj *o) { * On error -1 is returned. * On success if the key was actually saved 1 is returned, otherwise 0 * is returned (the key was already expired). */ -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { int savelru = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU; int savelfu = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU; /* Save the expire time */ - if (expiretime != -1) { + long long expiretime = -1; + if (pexpire != nullptr && pexpire->FGetPrimaryExpire(&expiretime)) { if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; } @@ -1061,9 +1062,21 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { if (rdbWriteRaw(rdb,buf,1) == -1) return -1; } - char szMvcc[32]; - snprintf(szMvcc, 32, "%" PRIu64, val->mvcc_tstamp); - if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szMvcc) == -1) return -1; + char szT[32]; + snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); + if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + + if (pexpire != nullptr) + { + for (auto itr : *pexpire) + { + if (itr.subkey() == nullptr) + continue; // already saved + snprintf(szT, 32, "%lld", itr.when()); + rdbSaveAuxFieldStrStr(rdb,"keydb-subexpire-key",itr.subkey()); + rdbSaveAuxFieldStrStr(rdb,"keydb-subexpire-when",szT); + } + } /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -1099,12 +1112,11 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o) { robj key; - long long expire; initStaticStringObject(key,(char*)keystr); - expire = getExpire(db, &key); + expireEntry *pexpire = getExpire(db, &key); - if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) + if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1) return 0; /* When this RDB is produced as part of an AOF rewrite, move diff --git a/src/server.h b/src/server.h index 99104cf48..f350410d0 100644 --- a/src/server.h +++ b/src/server.h @@ -770,6 +770,7 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) class expireEntryFat { + friend class expireEntry; public: struct subexpireEntry { @@ -806,7 +807,8 @@ public: bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); } - + const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; } + size_t size() const noexcept { return m_vecexpireEntries.size(); } }; class expireEntry { @@ -818,6 +820,39 @@ class expireEntry { long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer public: + class iter + { + expireEntry *m_pentry = nullptr; + size_t m_idx = 0; + + public: + iter(expireEntry *pentry, size_t idx) + : m_pentry(pentry), m_idx(idx) + {} + + iter &operator++() { ++m_idx; return *this; } + + const char *subkey() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].spsubkey.get(); + return nullptr; + } + long long when() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].when; + return m_pentry->when(); + } + + bool operator!=(const iter &other) + { + return m_idx != other.m_idx; + } + + const iter &operator*() const { return *this; } + }; + expireEntry(sds key, const char *subkey, long long when) { if (subkey != nullptr) @@ -843,7 +878,7 @@ public: { u.m_key = e.u.m_key; m_when = e.m_when; - e.u.m_key = nullptr; + e.u.m_key = (char*)key(); // we do this so it can still be found in the set e.m_when = 0; } @@ -853,6 +888,14 @@ public: delete u.m_pfatentry; } + void setKeyUnsafe(sds key) + { + if (FFat()) + u.m_pfatentry->m_keyPrimary = key; + else + u.m_key = key; + } + inline bool FFat() const noexcept { return m_when == LLONG_MIN; } expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } @@ -907,6 +950,27 @@ public: u.m_pfatentry->expireSubKey(subkey, when); } + iter begin() { return iter(this, 0); } + iter end() + { + if (FFat()) + return iter(this, u.m_pfatentry->size()); + return iter(this, 1); + } + + bool FGetPrimaryExpire(long long *pwhen) + { + *pwhen = -1; + for (auto itr : *this) + { + if (itr.subkey() == nullptr) + { + *pwhen = itr.when(); + return true; + } + } + return false; + } explicit operator const char*() const noexcept { return key(); } explicit operator long long() const noexcept { return when(); } @@ -2337,8 +2401,9 @@ int removeExpire(redisDb *db, robj *key); int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); -long long getExpire(redisDb *db, robj_roptr key); +expireEntry *getExpire(redisDb *db, robj_roptr key); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); +void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry); robj_roptr lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply);