Add RESET command. (#7982)
Perform full reset of all client connection states, is if the client was disconnected and re-connected. This affects: * MULTI state * Watched keys * MONITOR mode * Pub/Sub subscription * ACL/Authenticated state * Client tracking state * Cluster read-only/asking state * RESP version (reset to 2) * Selected database * CLIENT REPLY state The response is +RESET to make it easily distinguishable from other responses. Co-authored-by: Oran Agra <oran@redislabs.com> Co-authored-by: Itamar Haber <itamar@redislabs.com>
This commit is contained in:
parent
f6546eff45
commit
1fd456f91a
@ -97,6 +97,16 @@ void linkClient(client *c) {
|
|||||||
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
|
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Initialize client authentication state.
|
||||||
|
*/
|
||||||
|
static void clientSetDefaultAuth(client *c) {
|
||||||
|
/* If the default user does not require authentication, the user is
|
||||||
|
* directly authenticated. */
|
||||||
|
c->user = DefaultUser;
|
||||||
|
c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&
|
||||||
|
!(c->user->flags & USER_FLAG_DISABLED);
|
||||||
|
}
|
||||||
|
|
||||||
client *createClient(connection *conn) {
|
client *createClient(connection *conn) {
|
||||||
client *c = zmalloc(sizeof(client));
|
client *c = zmalloc(sizeof(client));
|
||||||
|
|
||||||
@ -130,16 +140,12 @@ client *createClient(connection *conn) {
|
|||||||
c->argv = NULL;
|
c->argv = NULL;
|
||||||
c->argv_len_sum = 0;
|
c->argv_len_sum = 0;
|
||||||
c->cmd = c->lastcmd = NULL;
|
c->cmd = c->lastcmd = NULL;
|
||||||
c->user = DefaultUser;
|
|
||||||
c->multibulklen = 0;
|
c->multibulklen = 0;
|
||||||
c->bulklen = -1;
|
c->bulklen = -1;
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
c->flags = 0;
|
c->flags = 0;
|
||||||
c->ctime = c->lastinteraction = server.unixtime;
|
c->ctime = c->lastinteraction = server.unixtime;
|
||||||
/* If the default user does not require authentication, the user is
|
clientSetDefaultAuth(c);
|
||||||
* directly authenticated. */
|
|
||||||
c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&
|
|
||||||
!(c->user->flags & USER_FLAG_DISABLED);
|
|
||||||
c->replstate = REPL_STATE_NONE;
|
c->replstate = REPL_STATE_NONE;
|
||||||
c->repl_put_online_on_ack = 0;
|
c->repl_put_online_on_ack = 0;
|
||||||
c->reploff = 0;
|
c->reploff = 0;
|
||||||
@ -2257,6 +2263,50 @@ int clientSetNameOrReply(client *c, robj *name) {
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Reset the client state to resemble a newly connected client.
|
||||||
|
*/
|
||||||
|
void resetCommand(client *c) {
|
||||||
|
listNode *ln;
|
||||||
|
|
||||||
|
/* MONITOR clients are also marked with CLIENT_SLAVE, we need to
|
||||||
|
* distinguish between the two.
|
||||||
|
*/
|
||||||
|
if (c->flags & CLIENT_MONITOR) {
|
||||||
|
ln = listSearchKey(server.monitors,c);
|
||||||
|
serverAssert(ln != NULL);
|
||||||
|
listDelNode(server.monitors,ln);
|
||||||
|
|
||||||
|
c->flags &= ~(CLIENT_MONITOR|CLIENT_SLAVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c->flags & (CLIENT_SLAVE|CLIENT_MASTER|CLIENT_MODULE)) {
|
||||||
|
addReplyError(c,"can only reset normal client connections");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c->flags & CLIENT_TRACKING) disableTracking(c);
|
||||||
|
selectDb(c,0);
|
||||||
|
c->resp = 2;
|
||||||
|
|
||||||
|
clientSetDefaultAuth(c);
|
||||||
|
moduleNotifyUserChanged(c);
|
||||||
|
discardTransaction(c);
|
||||||
|
|
||||||
|
pubsubUnsubscribeAllChannels(c,0);
|
||||||
|
pubsubUnsubscribeAllPatterns(c,0);
|
||||||
|
|
||||||
|
if (c->name) {
|
||||||
|
decrRefCount(c->name);
|
||||||
|
c->name = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Selectively clear state flags not covered above */
|
||||||
|
c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB|
|
||||||
|
CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP_NEXT);
|
||||||
|
|
||||||
|
addReplyStatus(c,"RESET");
|
||||||
|
}
|
||||||
|
|
||||||
void clientCommand(client *c) {
|
void clientCommand(client *c) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
|
17
src/server.c
17
src/server.c
@ -1036,7 +1036,11 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
|
|
||||||
{"stralgo",stralgoCommand,-2,
|
{"stralgo",stralgoCommand,-2,
|
||||||
"read-only @string",
|
"read-only @string",
|
||||||
0,lcsGetKeys,0,0,0,0,0,0}
|
0,lcsGetKeys,0,0,0,0,0,0},
|
||||||
|
|
||||||
|
{"reset",resetCommand,-1,
|
||||||
|
"no-script ok-stale ok-loading fast @connection",
|
||||||
|
0,NULL,0,0,0,0,0,0}
|
||||||
};
|
};
|
||||||
|
|
||||||
/*============================ Utility functions ============================ */
|
/*============================ Utility functions ============================ */
|
||||||
@ -3740,7 +3744,8 @@ int processCommand(client *c) {
|
|||||||
* set. */
|
* set. */
|
||||||
if (c->flags & CLIENT_MULTI &&
|
if (c->flags & CLIENT_MULTI &&
|
||||||
c->cmd->proc != execCommand &&
|
c->cmd->proc != execCommand &&
|
||||||
c->cmd->proc != discardCommand) {
|
c->cmd->proc != discardCommand &&
|
||||||
|
c->cmd->proc != resetCommand) {
|
||||||
reject_cmd_on_oom = 1;
|
reject_cmd_on_oom = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3806,10 +3811,11 @@ int processCommand(client *c) {
|
|||||||
c->cmd->proc != subscribeCommand &&
|
c->cmd->proc != subscribeCommand &&
|
||||||
c->cmd->proc != unsubscribeCommand &&
|
c->cmd->proc != unsubscribeCommand &&
|
||||||
c->cmd->proc != psubscribeCommand &&
|
c->cmd->proc != psubscribeCommand &&
|
||||||
c->cmd->proc != punsubscribeCommand) {
|
c->cmd->proc != punsubscribeCommand &&
|
||||||
|
c->cmd->proc != resetCommand) {
|
||||||
rejectCommandFormat(c,
|
rejectCommandFormat(c,
|
||||||
"Can't execute '%s': only (P)SUBSCRIBE / "
|
"Can't execute '%s': only (P)SUBSCRIBE / "
|
||||||
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
|
"(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
|
||||||
c->cmd->name);
|
c->cmd->name);
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
@ -3860,7 +3866,8 @@ int processCommand(client *c) {
|
|||||||
/* Exec the command */
|
/* Exec the command */
|
||||||
if (c->flags & CLIENT_MULTI &&
|
if (c->flags & CLIENT_MULTI &&
|
||||||
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
|
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
|
||||||
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
|
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
|
||||||
|
c->cmd->proc != resetCommand)
|
||||||
{
|
{
|
||||||
queueMultiCommand(c);
|
queueMultiCommand(c);
|
||||||
addReply(c,shared.queued);
|
addReply(c,shared.queued);
|
||||||
|
@ -2493,6 +2493,7 @@ void xtrimCommand(client *c);
|
|||||||
void lolwutCommand(client *c);
|
void lolwutCommand(client *c);
|
||||||
void aclCommand(client *c);
|
void aclCommand(client *c);
|
||||||
void stralgoCommand(client *c);
|
void stralgoCommand(client *c);
|
||||||
|
void resetCommand(client *c);
|
||||||
|
|
||||||
#if defined(__GNUC__)
|
#if defined(__GNUC__)
|
||||||
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
||||||
|
@ -249,6 +249,57 @@ start_server {tags {"other"}} {
|
|||||||
waitForBgsave r
|
waitForBgsave r
|
||||||
r save
|
r save
|
||||||
} {OK}
|
} {OK}
|
||||||
|
|
||||||
|
test {RESET clears client state} {
|
||||||
|
r client setname test-client
|
||||||
|
r client tracking on
|
||||||
|
|
||||||
|
assert_equal [r reset] "RESET"
|
||||||
|
set client [r client list]
|
||||||
|
assert_match {*name= *} $client
|
||||||
|
assert_match {*flags=N *} $client
|
||||||
|
}
|
||||||
|
|
||||||
|
test {RESET clears MONITOR state} {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
$rd monitor
|
||||||
|
assert_equal [$rd read] "OK"
|
||||||
|
|
||||||
|
$rd reset
|
||||||
|
|
||||||
|
# skip reset ouptut
|
||||||
|
$rd read
|
||||||
|
assert_equal [$rd read] "RESET"
|
||||||
|
|
||||||
|
assert_no_match {*flags=O*} [r client list]
|
||||||
|
}
|
||||||
|
|
||||||
|
test {RESET clears and discards MULTI state} {
|
||||||
|
r multi
|
||||||
|
r set key-a a
|
||||||
|
|
||||||
|
r reset
|
||||||
|
catch {r exec} err
|
||||||
|
assert_match {*EXEC without MULTI*} $err
|
||||||
|
}
|
||||||
|
|
||||||
|
test {RESET clears Pub/Sub state} {
|
||||||
|
r subscribe channel-1
|
||||||
|
r reset
|
||||||
|
|
||||||
|
# confirm we're not subscribed by executing another command
|
||||||
|
r set key val
|
||||||
|
}
|
||||||
|
|
||||||
|
test {RESET clears authenticated state} {
|
||||||
|
r acl setuser user1 on >secret +@all
|
||||||
|
r auth user1 secret
|
||||||
|
assert_equal [r acl whoami] user1
|
||||||
|
|
||||||
|
r reset
|
||||||
|
|
||||||
|
assert_equal [r acl whoami] default
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
start_server {tags {"other"}} {
|
start_server {tags {"other"}} {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user