replica redirect read&write to primary in standalone mode (#325)
To implement #319 1. replica is able to redirect read and write commands to it's primary in standalone mode * reply with "-REDIRECT primary-ip:port" 2. add a subcommand `CLIENT CAPA redirect`, a client can announce the capability to handle redirection * if a client can handle redirection, the data access commands (read and write) will be redirected 3. allow `readonly` and `readwrite` command in standalone mode, may be a breaking change * a client with redirect capability cannot process read commands on a replica by default * use READONLY command can allow read commands on a replica --------- Signed-off-by: zhaozhao.zz <zhaozhao.zz@alibaba-inc.com>
This commit is contained in:
parent
ab3873011a
commit
28c5a17edf
@ -1449,20 +1449,12 @@ void askingCommand(client *c) {
|
|||||||
* In this mode replica will not redirect clients as long as clients access
|
* In this mode replica will not redirect clients as long as clients access
|
||||||
* with read-only commands to keys that are served by the replica's primary. */
|
* with read-only commands to keys that are served by the replica's primary. */
|
||||||
void readonlyCommand(client *c) {
|
void readonlyCommand(client *c) {
|
||||||
if (server.cluster_enabled == 0) {
|
|
||||||
addReplyError(c, "This instance has cluster support disabled");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
c->flags |= CLIENT_READONLY;
|
c->flags |= CLIENT_READONLY;
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* The READWRITE command just clears the READONLY command state. */
|
/* The READWRITE command just clears the READONLY command state. */
|
||||||
void readwriteCommand(client *c) {
|
void readwriteCommand(client *c) {
|
||||||
if (server.cluster_enabled == 0) {
|
|
||||||
addReplyError(c, "This instance has cluster support disabled");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
c->flags &= ~CLIENT_READONLY;
|
c->flags &= ~CLIENT_READONLY;
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
}
|
}
|
||||||
|
@ -1089,6 +1089,28 @@ struct COMMAND_ARG CLIENT_CACHING_Args[] = {
|
|||||||
{MAKE_ARG("mode",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_CACHING_mode_Subargs},
|
{MAKE_ARG("mode",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_CACHING_mode_Subargs},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/********** CLIENT CAPA ********************/
|
||||||
|
|
||||||
|
#ifndef SKIP_CMD_HISTORY_TABLE
|
||||||
|
/* CLIENT CAPA history */
|
||||||
|
#define CLIENT_CAPA_History NULL
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifndef SKIP_CMD_TIPS_TABLE
|
||||||
|
/* CLIENT CAPA tips */
|
||||||
|
#define CLIENT_CAPA_Tips NULL
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifndef SKIP_CMD_KEY_SPECS_TABLE
|
||||||
|
/* CLIENT CAPA key specs */
|
||||||
|
#define CLIENT_CAPA_Keyspecs NULL
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* CLIENT CAPA argument table */
|
||||||
|
struct COMMAND_ARG CLIENT_CAPA_Args[] = {
|
||||||
|
{MAKE_ARG("capability",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,0,NULL)},
|
||||||
|
};
|
||||||
|
|
||||||
/********** CLIENT GETNAME ********************/
|
/********** CLIENT GETNAME ********************/
|
||||||
|
|
||||||
#ifndef SKIP_CMD_HISTORY_TABLE
|
#ifndef SKIP_CMD_HISTORY_TABLE
|
||||||
@ -1552,6 +1574,7 @@ struct COMMAND_ARG CLIENT_UNBLOCK_Args[] = {
|
|||||||
/* CLIENT command table */
|
/* CLIENT command table */
|
||||||
struct COMMAND_STRUCT CLIENT_Subcommands[] = {
|
struct COMMAND_STRUCT CLIENT_Subcommands[] = {
|
||||||
{MAKE_CMD("caching","Instructs the server whether to track the keys in the next request.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CACHING_History,0,CLIENT_CACHING_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_CACHING_Keyspecs,0,NULL,1),.args=CLIENT_CACHING_Args},
|
{MAKE_CMD("caching","Instructs the server whether to track the keys in the next request.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CACHING_History,0,CLIENT_CACHING_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_CACHING_Keyspecs,0,NULL,1),.args=CLIENT_CACHING_Args},
|
||||||
|
{MAKE_CMD("capa","A client claims its capability.","O(1)","8.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CAPA_History,0,CLIENT_CAPA_Tips,0,clientCommand,-3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_CAPA_Keyspecs,0,NULL,1),.args=CLIENT_CAPA_Args},
|
||||||
{MAKE_CMD("getname","Returns the name of the connection.","O(1)","2.6.9",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETNAME_History,0,CLIENT_GETNAME_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETNAME_Keyspecs,0,NULL,0)},
|
{MAKE_CMD("getname","Returns the name of the connection.","O(1)","2.6.9",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETNAME_History,0,CLIENT_GETNAME_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETNAME_Keyspecs,0,NULL,0)},
|
||||||
{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)},
|
{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)},
|
||||||
{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)},
|
{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)},
|
||||||
|
29
src/commands/client-capa.json
Normal file
29
src/commands/client-capa.json
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
{
|
||||||
|
"CAPA": {
|
||||||
|
"summary": "A client claims its capability.",
|
||||||
|
"complexity": "O(1)",
|
||||||
|
"group": "connection",
|
||||||
|
"since": "8.0.0",
|
||||||
|
"arity": -3,
|
||||||
|
"container": "CLIENT",
|
||||||
|
"function": "clientCommand",
|
||||||
|
"command_flags": [
|
||||||
|
"NOSCRIPT",
|
||||||
|
"LOADING",
|
||||||
|
"STALE"
|
||||||
|
],
|
||||||
|
"acl_categories": [
|
||||||
|
"CONNECTION"
|
||||||
|
],
|
||||||
|
"reply_schema": {
|
||||||
|
"const": "OK"
|
||||||
|
},
|
||||||
|
"arguments": [
|
||||||
|
{
|
||||||
|
"multiple": "true",
|
||||||
|
"name": "capability",
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
@ -168,6 +168,7 @@ client *createClient(connection *conn) {
|
|||||||
c->bulklen = -1;
|
c->bulklen = -1;
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
c->flags = 0;
|
c->flags = 0;
|
||||||
|
c->capa = 0;
|
||||||
c->slot = -1;
|
c->slot = -1;
|
||||||
c->ctime = c->last_interaction = server.unixtime;
|
c->ctime = c->last_interaction = server.unixtime;
|
||||||
c->duration = 0;
|
c->duration = 0;
|
||||||
@ -3589,6 +3590,13 @@ NULL
|
|||||||
} else {
|
} else {
|
||||||
addReplyErrorObject(c, shared.syntaxerr);
|
addReplyErrorObject(c, shared.syntaxerr);
|
||||||
}
|
}
|
||||||
|
} else if (!strcasecmp(c->argv[1]->ptr, "capa") && c->argc >= 3) {
|
||||||
|
for (int i = 2; i < c->argc; i++) {
|
||||||
|
if (!strcasecmp(c->argv[i]->ptr, "redirect")) {
|
||||||
|
c->capa |= CLIENT_CAPA_REDIRECT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addReply(c, shared.ok);
|
||||||
} else {
|
} else {
|
||||||
addReplySubcommandSyntaxError(c);
|
addReplySubcommandSyntaxError(c);
|
||||||
}
|
}
|
||||||
|
@ -3867,6 +3867,12 @@ int processCommand(client *c) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&
|
||||||
|
(is_write_command || (is_read_command && !(c->flags & CLIENT_READONLY)))) {
|
||||||
|
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
|
||||||
|
return C_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* Disconnect some clients if total clients memory is too high. We do this
|
/* Disconnect some clients if total clients memory is too high. We do this
|
||||||
* before key eviction, after the last command was executed and consumed
|
* before key eviction, after the last command was executed and consumed
|
||||||
* some client output buffer memory. */
|
* some client output buffer memory. */
|
||||||
|
@ -429,6 +429,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
|||||||
#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */
|
#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */
|
||||||
#define CLIENT_AUTHENTICATED (1ULL << 52) /* Indicate a client has successfully authenticated */
|
#define CLIENT_AUTHENTICATED (1ULL << 52) /* Indicate a client has successfully authenticated */
|
||||||
|
|
||||||
|
/* Client capabilities */
|
||||||
|
#define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
typedef enum blocking_type {
|
typedef enum blocking_type {
|
||||||
@ -1205,6 +1208,7 @@ typedef struct client {
|
|||||||
uint64_t flags; /* Client flags: CLIENT_* macros. */
|
uint64_t flags; /* Client flags: CLIENT_* macros. */
|
||||||
connection *conn;
|
connection *conn;
|
||||||
int resp; /* RESP protocol version. Can be 2 or 3. */
|
int resp; /* RESP protocol version. Can be 2 or 3. */
|
||||||
|
uint32_t capa; /* Client capabilities: CLIENT_CAPA* macros. */
|
||||||
serverDb *db; /* Pointer to currently SELECTed DB. */
|
serverDb *db; /* Pointer to currently SELECTed DB. */
|
||||||
robj *name; /* As set by CLIENT SETNAME. */
|
robj *name; /* As set by CLIENT SETNAME. */
|
||||||
robj *lib_name; /* The client library name as set by CLIENT SETINFO. */
|
robj *lib_name; /* The client library name as set by CLIENT SETINFO. */
|
||||||
|
36
tests/integration/replica-redirect.tcl
Normal file
36
tests/integration/replica-redirect.tcl
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
start_server {tags {needs:repl external:skip}} {
|
||||||
|
start_server {} {
|
||||||
|
set primary_host [srv -1 host]
|
||||||
|
set primary_port [srv -1 port]
|
||||||
|
|
||||||
|
r replicaof $primary_host $primary_port
|
||||||
|
wait_for_condition 50 100 {
|
||||||
|
[s 0 master_link_status] eq {up}
|
||||||
|
} else {
|
||||||
|
fail "Replicas not replicating from primary"
|
||||||
|
}
|
||||||
|
|
||||||
|
test {replica allow read command by default} {
|
||||||
|
r get foo
|
||||||
|
} {}
|
||||||
|
|
||||||
|
test {replica reply READONLY error for write command by default} {
|
||||||
|
assert_error {READONLY*} {r set foo bar}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {replica redirect read and write command after CLIENT CAPA REDIRECT} {
|
||||||
|
r client capa redirect
|
||||||
|
assert_error "REDIRECT $primary_host:$primary_port" {r set foo bar}
|
||||||
|
assert_error "REDIRECT $primary_host:$primary_port" {r get foo}
|
||||||
|
}
|
||||||
|
|
||||||
|
test {non-data access commands are not redirected} {
|
||||||
|
r ping
|
||||||
|
} {PONG}
|
||||||
|
|
||||||
|
test {replica allow read command in READONLY mode} {
|
||||||
|
r readonly
|
||||||
|
r get foo
|
||||||
|
} {}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user