add async to hget and fix bug where a command can hold a key from main db without lock
Former-commit-id: b0b8f426fda7cdabd3898a2d0b38d0fae47d1e69
This commit is contained in:
parent
c61f8822cf
commit
8e7d88c514
13
src/db.cpp
13
src/db.cpp
@ -206,7 +206,7 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) {
|
|||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
||||||
}
|
}
|
||||||
robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) {
|
robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint, AeLocker &locker) {
|
||||||
robj_roptr o;
|
robj_roptr o;
|
||||||
|
|
||||||
if (aeThreadOwnsLock()) {
|
if (aeThreadOwnsLock()) {
|
||||||
@ -217,7 +217,6 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) {
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
int idb = db->id;
|
int idb = db->id;
|
||||||
if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) {
|
if (serverTL->rgdbSnapshot[idb] == nullptr || serverTL->rgdbSnapshot[idb]->mvccCheckpoint() < mvccCheckpoint) {
|
||||||
AeLocker locker;
|
|
||||||
locker.arm(serverTL->current_client);
|
locker.arm(serverTL->current_client);
|
||||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||||
db->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
db->endSnapshot(serverTL->rgdbSnapshot[idb]);
|
||||||
@ -230,6 +229,9 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint) {
|
|||||||
o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
o = lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
|
||||||
serverTL->disable_async_commands = true; // don't try this again
|
serverTL->disable_async_commands = true; // don't try this again
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
locker.disarm();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
if (serverTL->rgdbSnapshot[idb] != nullptr) {
|
||||||
o = serverTL->rgdbSnapshot[idb]->find_cached_threadsafe(szFromObj(key)).val();
|
o = serverTL->rgdbSnapshot[idb]->find_cached_threadsafe(szFromObj(key)).val();
|
||||||
@ -263,7 +265,12 @@ static void SentReplyOnKeyMiss(client *c, robj *reply){
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
|
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
|
||||||
robj_roptr o = lookupKeyRead(c->db, key, c->mvccCheckpoint);
|
robj_roptr o = lookupKeyRead(c->db, key);
|
||||||
|
if (!o) SentReplyOnKeyMiss(c, reply);
|
||||||
|
return o;
|
||||||
|
}
|
||||||
|
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply, AeLocker &locker) {
|
||||||
|
robj_roptr o = lookupKeyRead(c->db, key, c->mvccCheckpoint, locker);
|
||||||
if (!o) SentReplyOnKeyMiss(c, reply);
|
if (!o) SentReplyOnKeyMiss(c, reply);
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
@ -599,7 +599,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,NULL,1,1,1,0,0,0},
|
0,NULL,1,1,1,0,0,0},
|
||||||
|
|
||||||
{"hget",hgetCommand,3,
|
{"hget",hgetCommand,3,
|
||||||
"read-only fast @hash",
|
"read-only fast async @hash",
|
||||||
0,NULL,1,1,1,0,0,0},
|
0,NULL,1,1,1,0,0,0},
|
||||||
|
|
||||||
{"hmset",hsetCommand,-4,
|
{"hmset",hsetCommand,-4,
|
||||||
|
@ -3363,6 +3363,7 @@ int rewriteConfig(char *path, int force_all);
|
|||||||
void initConfigValues();
|
void initConfigValues();
|
||||||
|
|
||||||
/* db.c -- Keyspace access API */
|
/* db.c -- Keyspace access API */
|
||||||
|
class AeLocker;
|
||||||
int removeExpire(redisDb *db, robj *key);
|
int removeExpire(redisDb *db, robj *key);
|
||||||
int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey);
|
int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey);
|
||||||
void propagateExpire(redisDb *db, robj *key, int lazy);
|
void propagateExpire(redisDb *db, robj *key, int lazy);
|
||||||
@ -3370,10 +3371,11 @@ void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey);
|
|||||||
int expireIfNeeded(redisDb *db, robj *key);
|
int expireIfNeeded(redisDb *db, robj *key);
|
||||||
void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when);
|
void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when);
|
||||||
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry);
|
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry);
|
||||||
robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint);
|
robj_roptr lookupKeyRead(redisDb *db, robj *key, uint64_t mvccCheckpoint, AeLocker &locker);
|
||||||
robj_roptr lookupKeyRead(redisDb *db, robj *key);
|
robj_roptr lookupKeyRead(redisDb *db, robj *key);
|
||||||
int checkAlreadyExpired(long long when);
|
int checkAlreadyExpired(long long when);
|
||||||
robj *lookupKeyWrite(redisDb *db, robj *key);
|
robj *lookupKeyWrite(redisDb *db, robj *key);
|
||||||
|
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply, AeLocker &locker);
|
||||||
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply);
|
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply);
|
||||||
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
|
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
|
||||||
robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
|
robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
#include "aelocker.h"
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
|
|
||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
@ -799,8 +800,8 @@ static void addHashFieldToReply(client *c, robj_roptr o, sds field) {
|
|||||||
|
|
||||||
void hgetCommand(client *c) {
|
void hgetCommand(client *c) {
|
||||||
robj_roptr o;
|
robj_roptr o;
|
||||||
|
AeLocker locker;
|
||||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == nullptr ||
|
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp], locker)) == nullptr ||
|
||||||
checkType(c,o,OBJ_HASH)) return;
|
checkType(c,o,OBJ_HASH)) return;
|
||||||
|
|
||||||
addHashFieldToReply(c, o, szFromObj(c->argv[2]));
|
addHashFieldToReply(c, o, szFromObj(c->argv[2]));
|
||||||
@ -809,10 +810,11 @@ void hgetCommand(client *c) {
|
|||||||
void hmgetCommand(client *c) {
|
void hmgetCommand(client *c) {
|
||||||
robj_roptr o;
|
robj_roptr o;
|
||||||
int i;
|
int i;
|
||||||
|
AeLocker locker;
|
||||||
|
|
||||||
/* Don't abort when the key cannot be found. Non-existing keys are empty
|
/* Don't abort when the key cannot be found. Non-existing keys are empty
|
||||||
* hashes, where HMGET should respond with a series of null bulks. */
|
* hashes, where HMGET should respond with a series of null bulks. */
|
||||||
o = lookupKeyRead(c->db, c->argv[1], c->mvccCheckpoint);
|
o = lookupKeyRead(c->db, c->argv[1], c->mvccCheckpoint, locker);
|
||||||
if (checkType(c,o,OBJ_HASH)) return;
|
if (checkType(c,o,OBJ_HASH)) return;
|
||||||
|
|
||||||
addReplyArrayLen(c, c->argc-2);
|
addReplyArrayLen(c, c->argc-2);
|
||||||
@ -889,10 +891,11 @@ void genericHgetallCommand(client *c, int flags) {
|
|||||||
robj_roptr o;
|
robj_roptr o;
|
||||||
hashTypeIterator *hi;
|
hashTypeIterator *hi;
|
||||||
int length, count = 0;
|
int length, count = 0;
|
||||||
|
AeLocker locker;
|
||||||
|
|
||||||
robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ?
|
robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ?
|
||||||
shared.emptymap[c->resp] : shared.emptyarray;
|
shared.emptymap[c->resp] : shared.emptyarray;
|
||||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyResp))
|
if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyResp,locker))
|
||||||
== nullptr || checkType(c,o,OBJ_HASH)) return;
|
== nullptr || checkType(c,o,OBJ_HASH)) return;
|
||||||
|
|
||||||
/* We return a map if the user requested keys and values, like in the
|
/* We return a map if the user requested keys and values, like in the
|
||||||
@ -946,9 +949,10 @@ void hexistsCommand(client *c) {
|
|||||||
void hscanCommand(client *c) {
|
void hscanCommand(client *c) {
|
||||||
robj_roptr o;
|
robj_roptr o;
|
||||||
unsigned long cursor;
|
unsigned long cursor;
|
||||||
|
AeLocker locker;
|
||||||
|
|
||||||
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
|
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
|
||||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == nullptr ||
|
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan,locker)) == nullptr ||
|
||||||
checkType(c,o,OBJ_HASH)) return;
|
checkType(c,o,OBJ_HASH)) return;
|
||||||
scanGenericCommand(c,o,cursor);
|
scanGenericCommand(c,o,cursor);
|
||||||
}
|
}
|
||||||
|
@ -285,8 +285,9 @@ void psetexCommand(client *c) {
|
|||||||
|
|
||||||
int getGenericCommand(client *c) {
|
int getGenericCommand(client *c) {
|
||||||
robj_roptr o;
|
robj_roptr o;
|
||||||
|
AeLocker locker;
|
||||||
|
|
||||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == nullptr)
|
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp],locker)) == nullptr)
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
if (checkType(c,o,OBJ_STRING)) {
|
if (checkType(c,o,OBJ_STRING)) {
|
||||||
@ -525,9 +526,10 @@ void getrangeCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void mgetCommand(client *c) {
|
void mgetCommand(client *c) {
|
||||||
|
AeLocker locker;
|
||||||
addReplyArrayLen(c,c->argc-1);
|
addReplyArrayLen(c,c->argc-1);
|
||||||
for (int i = 1; i < c->argc; i++) {
|
for (int i = 1; i < c->argc; i++) {
|
||||||
robj_roptr o = lookupKeyRead(c->db,c->argv[i],c->mvccCheckpoint);
|
robj_roptr o = lookupKeyRead(c->db,c->argv[i],c->mvccCheckpoint,locker);
|
||||||
if (o == nullptr || o->type != OBJ_STRING) {
|
if (o == nullptr || o->type != OBJ_STRING) {
|
||||||
addReplyNull(c);
|
addReplyNull(c);
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user