From 8e7d88c514bf6299eca400f9d5a472b5f2c2349e Mon Sep 17 00:00:00 2001 From: malavan Date: Thu, 2 Dec 2021 17:11:26 +0000 Subject: [PATCH] add async to hget and fix bug where a command can hold a key from main db without lock Former-commit-id: b0b8f426fda7cdabd3898a2d0b38d0fae47d1e69 --- src/db.cpp | 13 ++++++++++--- src/server.cpp | 2 +- src/server.h | 4 +++- src/t_hash.cpp | 14 +++++++++----- src/t_string.cpp | 6 ++++-- 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 163c12468..6a820c6e7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -206,7 +206,7 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) { serverAssert(GlobalLocksAcquired()); return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); } -robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) { +robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint, AeLocker &locker) { robj_roptr o; if (aeThreadOwnsLock()) { @@ -217,7 +217,6 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) { return nullptr; int idb = db->id; if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) { - AeLocker locker; locker.arm(serverTL->current_client); if (serverTL->rgdbSnapshot[idb] != nullptr) { db->endSnapshot(serverTL->rgdbSnapshot[idb]); @@ -230,6 +229,9 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) { o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE); serverTL->disable_async_commands = true; // don't try this again } + else { + locker.disarm(); + } } if (serverTL->rgdbSnapshot[idb] != nullptr) { o = serverTL->rgdbSnapshot[idb]->find_cached_threadsafe(szFromObj(key)).val(); @@ -263,7 +265,12 @@ static void SentReplyOnKeyMiss(client *c, robj *reply){ } } robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) { - robj_roptr o = lookupKeyRead(c->db, key, c->mvccCheckpoint); + robj_roptr o = lookupKeyRead(c->db, key); + if (!o) SentReplyOnKeyMiss(c, reply); + return o; +} +robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply, AeLocker &locker) { + robj_roptr o = lookupKeyRead(c->db, key, c->mvccCheckpoint, locker); if (!o) SentReplyOnKeyMiss(c, reply); return o; } diff --git a/src/server.cpp b/src/server.cpp index 57469bf09..d4fe5caa2 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -599,7 +599,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,1,1,1,0,0,0}, {"hget",hgetCommand,3, - "read-only fast @hash", + "read-only fast async @hash", 0,NULL,1,1,1,0,0,0}, {"hmset",hsetCommand,-4, diff --git a/src/server.h b/src/server.h index e014e606a..466221f61 100644 --- a/src/server.h +++ b/src/server.h @@ -3363,6 +3363,7 @@ int rewriteConfig(char *path, int force_all); void initConfigValues(); /* db.c -- Keyspace access API */ +class AeLocker; int removeExpire(redisDb *db, robj *key); int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey); void propagateExpire(redisDb *db, robj *key, int lazy); @@ -3370,10 +3371,11 @@ void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey); int expireIfNeeded(redisDb *db, robj *key); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry); -robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint); +robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint, AeLocker &locker); robj_roptr lookupKeyRead(redisDb *db, robj *key); int checkAlreadyExpired(long long when); robj *lookupKeyWrite(redisDb *db, robj *key); +robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply, AeLocker &locker); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply); robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags); diff --git a/src/t_hash.cpp b/src/t_hash.cpp index e724e406e..829719435 100644 --- a/src/t_hash.cpp +++ b/src/t_hash.cpp @@ -28,6 +28,7 @@ */ #include "server.h" +#include "aelocker.h" #include /*----------------------------------------------------------------------------- @@ -799,8 +800,8 @@ static void addHashFieldToReply(client *c, robj_roptr o, sds field) { void hgetCommand(client *c) { robj_roptr o; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == nullptr || + AeLocker locker; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp], locker)) == nullptr || checkType(c,o,OBJ_HASH)) return; addHashFieldToReply(c, o, szFromObj(c->argv[2])); @@ -809,10 +810,11 @@ void hgetCommand(client *c) { void hmgetCommand(client *c) { robj_roptr o; int i; + AeLocker locker; /* Don't abort when the key cannot be found. Non-existing keys are empty * hashes, where HMGET should respond with a series of null bulks. */ - o = lookupKeyRead(c->db, c->argv[1], c->mvccCheckpoint); + o = lookupKeyRead(c->db, c->argv[1], c->mvccCheckpoint, locker); if (checkType(c,o,OBJ_HASH)) return; addReplyArrayLen(c, c->argc-2); @@ -889,10 +891,11 @@ void genericHgetallCommand(client *c, int flags) { robj_roptr o; hashTypeIterator *hi; int length, count = 0; + AeLocker locker; robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ? shared.emptymap[c->resp] : shared.emptyarray; - if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyResp)) + if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyResp,locker)) == nullptr || checkType(c,o,OBJ_HASH)) return; /* We return a map if the user requested keys and values, like in the @@ -946,9 +949,10 @@ void hexistsCommand(client *c) { void hscanCommand(client *c) { robj_roptr o; unsigned long cursor; + AeLocker locker; if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == nullptr || + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan,locker)) == nullptr || checkType(c,o,OBJ_HASH)) return; scanGenericCommand(c,o,cursor); } diff --git a/src/t_string.cpp b/src/t_string.cpp index 08fc78edd..b6c989982 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -285,8 +285,9 @@ void psetexCommand(client *c) { int getGenericCommand(client *c) { robj_roptr o; + AeLocker locker; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == nullptr) + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp],locker)) == nullptr) return C_OK; if (checkType(c,o,OBJ_STRING)) { @@ -525,9 +526,10 @@ void getrangeCommand(client *c) { } void mgetCommand(client *c) { + AeLocker locker; addReplyArrayLen(c,c->argc-1); for (int i = 1; i < c->argc; i++) { - robj_roptr o = lookupKeyRead(c->db,c->argv[i],c->mvccCheckpoint); + robj_roptr o = lookupKeyRead(c->db,c->argv[i],c->mvccCheckpoint,locker); if (o == nullptr || o->type != OBJ_STRING) { addReplyNull(c); } else {