Tracking: BCAST: parsing of the options + skeleton.
This commit is contained in:
parent
f53cc00c09
commit
dfe126f3e9
@ -154,6 +154,7 @@ client *createClient(connection *conn) {
|
||||
c->peerid = NULL;
|
||||
c->client_list_node = NULL;
|
||||
c->client_tracking_redirection = 0;
|
||||
c->client_tracking_prefix_nodes = NULL;
|
||||
c->auth_callback = NULL;
|
||||
c->auth_callback_privdata = NULL;
|
||||
c->auth_module = NULL;
|
||||
@ -2219,38 +2220,72 @@ NULL
|
||||
UNIT_MILLISECONDS) != C_OK) return;
|
||||
pauseClients(duration);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") &&
|
||||
(c->argc == 3 || c->argc == 5))
|
||||
{
|
||||
/* CLIENT TRACKING (on|off) [REDIRECT <id>] */
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) {
|
||||
/* CLIENT TRACKING (on|off) [REDIRECT <id>] [BCAST] [PREFIX first]
|
||||
* [PREFIX second] ... */
|
||||
long long redir = 0;
|
||||
int bcast = 0;
|
||||
robj **prefix;
|
||||
size_t numprefix = 0;
|
||||
|
||||
/* Parse the redirection option: we'll require the client with
|
||||
* the specified ID to exist right now, even if it is possible
|
||||
* it will get disconnected later. */
|
||||
if (c->argc == 5) {
|
||||
if (strcasecmp(c->argv[3]->ptr,"redirect") != 0) {
|
||||
addReply(c,shared.syntaxerr);
|
||||
return;
|
||||
} else {
|
||||
if (getLongLongFromObjectOrReply(c,c->argv[4],&redir,NULL) !=
|
||||
/* Parse the options. */
|
||||
if (for int j = 3; j < argc; j++) {
|
||||
int moreargs = (c->argc-1) - j;
|
||||
|
||||
if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
|
||||
j++;
|
||||
if (getLongLongFromObjectOrReply(c,c->argv[j],&redir,NULL) !=
|
||||
C_OK) return;
|
||||
/* We will require the client with the specified ID to exist
|
||||
* right now, even if it is possible that it gets disconnected
|
||||
* later. Still a valid sanity check. */
|
||||
if (lookupClientByID(redir) == NULL) {
|
||||
addReplyError(c,"The client ID you want redirect to "
|
||||
"does not exist");
|
||||
return;
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
||||
bcast++;
|
||||
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) {
|
||||
j++;
|
||||
prefix = zrealloc(sizeof(robj*)*(numprefix+1));
|
||||
prefix[numprefix++] = argv[j];
|
||||
} else {
|
||||
addReply(c,shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* Make sure options are compatible among each other and with the
|
||||
* current state of the client. */
|
||||
if (!bcast && numprefix) {
|
||||
addReplyError("PREFIX option requires BCAST mode to be enabled");
|
||||
zfree(prefix);
|
||||
return;
|
||||
}
|
||||
|
||||
if (client->flags & CLIENT_TRACKING) {
|
||||
int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST;
|
||||
if (oldbcast != bcast) {
|
||||
}
|
||||
addReplyError(
|
||||
"You can't switch BCAST mode on/off before disabling "
|
||||
"tracking for this client, and then re-enabling it with "
|
||||
"a different mode.");
|
||||
zfree(prefix);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Options are ok: enable or disable the tracking for this client. */
|
||||
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
||||
enableTracking(c,redir);
|
||||
enableTracking(c,redir,bcast,prefix,numprefix);
|
||||
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
||||
disableTracking(c);
|
||||
} else {
|
||||
addReply(c,shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
zfree(prefix);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"getredir") && c->argc == 2) {
|
||||
/* CLIENT GETREDIR */
|
||||
|
@ -3310,8 +3310,11 @@ void call(client *c, int flags) {
|
||||
if (c->cmd->flags & CMD_READONLY) {
|
||||
client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
|
||||
server.lua_caller : c;
|
||||
if (caller->flags & CLIENT_TRACKING)
|
||||
if (caller->flags & CLIENT_TRACKING &&
|
||||
!(caller->flags & CLIENT_TRACKING_BCAST))
|
||||
{
|
||||
trackingRememberKeys(caller);
|
||||
}
|
||||
}
|
||||
|
||||
server.fixed_time_expire--;
|
||||
|
@ -247,6 +247,7 @@ typedef long long ustime_t; /* microsecond time type. */
|
||||
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
|
||||
perform client side caching. */
|
||||
#define CLIENT_TRACKING_BROKEN_REDIR (1ULL<<32) /* Target client is invalid. */
|
||||
#define CLIENT_TRACKING_BCAST (1ULL<<33) /* Tracking in BCAST mode. */
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if CLIENT_BLOCKED flag is set. */
|
||||
@ -822,6 +823,11 @@ typedef struct client {
|
||||
* invalidation messages for keys fetched by this client will be send to
|
||||
* the specified client ID. */
|
||||
uint64_t client_tracking_redirection;
|
||||
list *client_tracking_prefix_nodes; /* This list contains listNode pointers
|
||||
to the nodes we have in every list
|
||||
of clients in the tracking bcast
|
||||
table. This way we can remove our
|
||||
client in O(1) for each list. */
|
||||
|
||||
/* Response buffer */
|
||||
int bufpos;
|
||||
@ -1648,7 +1654,7 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
|
||||
#endif
|
||||
|
||||
/* Client side caching (tracking mode) */
|
||||
void enableTracking(client *c, uint64_t redirect_to);
|
||||
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix);
|
||||
void disableTracking(client *c);
|
||||
void trackingRememberKeys(client *c);
|
||||
void trackingInvalidateKey(robj *keyobj);
|
||||
|
@ -42,6 +42,7 @@
|
||||
* Clients will normally take frequently requested objects in memory, removing
|
||||
* them when invalidation messages are received. */
|
||||
rax *TrackingTable = NULL;
|
||||
rax *PrefixTable = NULL;
|
||||
uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
|
||||
the whole tracking table. This givesn
|
||||
an hint about the total memory we
|
||||
@ -68,16 +69,25 @@ void disableTracking(client *c) {
|
||||
* eventually get freed, we'll send a message to the original client to
|
||||
* inform it of the condition. Multiple clients can redirect the invalidation
|
||||
* messages to the same client ID. */
|
||||
void enableTracking(client *c, uint64_t redirect_to) {
|
||||
if (c->flags & CLIENT_TRACKING) return;
|
||||
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
|
||||
c->flags |= CLIENT_TRACKING;
|
||||
c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
|
||||
c->client_tracking_redirection = redirect_to;
|
||||
server.tracking_clients++;
|
||||
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
|
||||
if (TrackingTable == NULL) {
|
||||
TrackingTable = raxNew();
|
||||
PrefixTable = raxNew();
|
||||
TrackingChannelName = createStringObject("__redis__:invalidate",20);
|
||||
}
|
||||
|
||||
if (bcast) {
|
||||
c->flags |= CLIENT_TRACKING_BCAST;
|
||||
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
|
||||
for (int j = 0; j < numprefix; j++) {
|
||||
sds sdsprefix = prefix[j]->ptr;
|
||||
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* This function is called after the excution of a readonly command in the
|
||||
|
Loading…
x
Reference in New Issue
Block a user