Tracking: BCAST: registration in the prefix table.
This commit is contained in:
parent
dfe126f3e9
commit
3f7ba86255
@ -154,7 +154,7 @@ client *createClient(connection *conn) {
|
|||||||
c->peerid = NULL;
|
c->peerid = NULL;
|
||||||
c->client_list_node = NULL;
|
c->client_list_node = NULL;
|
||||||
c->client_tracking_redirection = 0;
|
c->client_tracking_redirection = 0;
|
||||||
c->client_tracking_prefix_nodes = NULL;
|
c->client_tracking_prefixes = NULL;
|
||||||
c->auth_callback = NULL;
|
c->auth_callback = NULL;
|
||||||
c->auth_callback_privdata = NULL;
|
c->auth_callback_privdata = NULL;
|
||||||
c->auth_module = NULL;
|
c->auth_module = NULL;
|
||||||
@ -2028,7 +2028,6 @@ int clientSetNameOrReply(client *c, robj *name) {
|
|||||||
void clientCommand(client *c) {
|
void clientCommand(client *c) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
client *client;
|
|
||||||
|
|
||||||
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
|
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
|
||||||
const char *help[] = {
|
const char *help[] = {
|
||||||
@ -2142,7 +2141,7 @@ NULL
|
|||||||
/* Iterate clients killing all the matching clients. */
|
/* Iterate clients killing all the matching clients. */
|
||||||
listRewind(server.clients,&li);
|
listRewind(server.clients,&li);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
client = listNodeValue(ln);
|
client *client = listNodeValue(ln);
|
||||||
if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
|
if (addr && strcmp(getClientPeerId(client),addr) != 0) continue;
|
||||||
if (type != -1 && getClientType(client) != type) continue;
|
if (type != -1 && getClientType(client) != type) continue;
|
||||||
if (id != 0 && client->id != id) continue;
|
if (id != 0 && client->id != id) continue;
|
||||||
@ -2229,7 +2228,7 @@ NULL
|
|||||||
size_t numprefix = 0;
|
size_t numprefix = 0;
|
||||||
|
|
||||||
/* Parse the options. */
|
/* Parse the options. */
|
||||||
if (for int j = 3; j < argc; j++) {
|
for (int j = 3; j < c->argc; j++) {
|
||||||
int moreargs = (c->argc-1) - j;
|
int moreargs = (c->argc-1) - j;
|
||||||
|
|
||||||
if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
|
if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) {
|
||||||
@ -2246,10 +2245,10 @@ NULL
|
|||||||
}
|
}
|
||||||
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
} else if (!strcasecmp(c->argv[j]->ptr,"bcast")) {
|
||||||
bcast++;
|
bcast++;
|
||||||
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) {
|
} else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) {
|
||||||
j++;
|
j++;
|
||||||
prefix = zrealloc(sizeof(robj*)*(numprefix+1));
|
prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1));
|
||||||
prefix[numprefix++] = argv[j];
|
prefix[numprefix++] = c->argv[j];
|
||||||
} else {
|
} else {
|
||||||
addReply(c,shared.syntaxerr);
|
addReply(c,shared.syntaxerr);
|
||||||
return;
|
return;
|
||||||
@ -2259,16 +2258,16 @@ NULL
|
|||||||
/* Make sure options are compatible among each other and with the
|
/* Make sure options are compatible among each other and with the
|
||||||
* current state of the client. */
|
* current state of the client. */
|
||||||
if (!bcast && numprefix) {
|
if (!bcast && numprefix) {
|
||||||
addReplyError("PREFIX option requires BCAST mode to be enabled");
|
addReplyError(c,"PREFIX option requires BCAST mode to be enabled");
|
||||||
zfree(prefix);
|
zfree(prefix);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (client->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST;
|
int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST;
|
||||||
if (oldbcast != bcast) {
|
if (oldbcast != bcast) {
|
||||||
}
|
}
|
||||||
addReplyError(
|
addReplyError(c,
|
||||||
"You can't switch BCAST mode on/off before disabling "
|
"You can't switch BCAST mode on/off before disabling "
|
||||||
"tracking for this client, and then re-enabling it with "
|
"tracking for this client, and then re-enabling it with "
|
||||||
"a different mode.");
|
"a different mode.");
|
||||||
|
@ -823,12 +823,9 @@ typedef struct client {
|
|||||||
* invalidation messages for keys fetched by this client will be send to
|
* invalidation messages for keys fetched by this client will be send to
|
||||||
* the specified client ID. */
|
* the specified client ID. */
|
||||||
uint64_t client_tracking_redirection;
|
uint64_t client_tracking_redirection;
|
||||||
list *client_tracking_prefix_nodes; /* This list contains listNode pointers
|
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
|
||||||
to the nodes we have in every list
|
subscribed to in BCAST mode, in the
|
||||||
of clients in the tracking bcast
|
context of client side caching. */
|
||||||
table. This way we can remove our
|
|
||||||
client in O(1) for each list. */
|
|
||||||
|
|
||||||
/* Response buffer */
|
/* Response buffer */
|
||||||
int bufpos;
|
int bufpos;
|
||||||
char buf[PROTO_REPLY_CHUNK_BYTES];
|
char buf[PROTO_REPLY_CHUNK_BYTES];
|
||||||
|
@ -49,6 +49,15 @@ uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across
|
|||||||
are using server side for CSC. */
|
are using server side for CSC. */
|
||||||
robj *TrackingChannelName;
|
robj *TrackingChannelName;
|
||||||
|
|
||||||
|
/* This is the structure that we have as value of the PrefixTable, and
|
||||||
|
* represents the list of keys modified, and the list of clients that need
|
||||||
|
* to be notified, for a given prefix. */
|
||||||
|
typedef struct bcastState {
|
||||||
|
rax *keys; /* Keys modified in the current event loop cycle. */
|
||||||
|
rax *clients; /* Clients subscribed to the notification events for this
|
||||||
|
prefix. */
|
||||||
|
} bcastState;
|
||||||
|
|
||||||
/* Remove the tracking state from the client 'c'. Note that there is not much
|
/* Remove the tracking state from the client 'c'. Note that there is not much
|
||||||
* to do for us here, if not to decrement the counter of the clients in
|
* to do for us here, if not to decrement the counter of the clients in
|
||||||
* tracking mode, because we just store the ID of the client in the tracking
|
* tracking mode, because we just store the ID of the client in the tracking
|
||||||
@ -56,9 +65,51 @@ robj *TrackingChannelName;
|
|||||||
* client with many entries in the table is removed, it would cost a lot of
|
* client with many entries in the table is removed, it would cost a lot of
|
||||||
* time to do the cleanup. */
|
* time to do the cleanup. */
|
||||||
void disableTracking(client *c) {
|
void disableTracking(client *c) {
|
||||||
|
/* If this client is in broadcasting mode, we need to unsubscribe it
|
||||||
|
* from all the prefixes it is registered to. */
|
||||||
|
if (c->flags & CLIENT_TRACKING_BCAST) {
|
||||||
|
raxIterator ri;
|
||||||
|
raxStart(&ri,c->client_tracking_prefixes);
|
||||||
|
raxSeek(&ri,"^",NULL,0);
|
||||||
|
while(raxNext(&ri)) {
|
||||||
|
bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len);
|
||||||
|
serverAssert(bs != raxNotFound);
|
||||||
|
raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL);
|
||||||
|
/* Was it the last client? Remove the prefix from the
|
||||||
|
* table. */
|
||||||
|
if (raxSize(bs->clients) == 0) {
|
||||||
|
raxFree(bs->clients);
|
||||||
|
raxFree(bs->keys);
|
||||||
|
zfree(bs);
|
||||||
|
raxRemove(PrefixTable,ri.key,ri.key_len,NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
raxStop(&ri);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Clear flags and adjust the count. */
|
||||||
if (c->flags & CLIENT_TRACKING) {
|
if (c->flags & CLIENT_TRACKING) {
|
||||||
server.tracking_clients--;
|
server.tracking_clients--;
|
||||||
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR);
|
c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR|
|
||||||
|
CLIENT_TRACKING_BCAST);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
|
||||||
|
* already registered for the specified prefix, no operation is performed. */
|
||||||
|
void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
|
||||||
|
bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix));
|
||||||
|
/* If this is the first client subscribing to such prefix, create
|
||||||
|
* the prefix in the table. */
|
||||||
|
if (bs == raxNotFound) {
|
||||||
|
bs = zmalloc(sizeof(*bs));
|
||||||
|
bs->keys = raxNew();
|
||||||
|
bs->clients = raxNew();
|
||||||
|
raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
|
||||||
|
}
|
||||||
|
if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
|
||||||
|
raxInsert(c->client_tracking_prefixes,
|
||||||
|
(unsigned char*)prefix,plen,NULL,NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,9 +134,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s
|
|||||||
if (bcast) {
|
if (bcast) {
|
||||||
c->flags |= CLIENT_TRACKING_BCAST;
|
c->flags |= CLIENT_TRACKING_BCAST;
|
||||||
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
|
if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0);
|
||||||
for (int j = 0; j < numprefix; j++) {
|
for (size_t j = 0; j < numprefix; j++) {
|
||||||
sds sdsprefix = prefix[j]->ptr;
|
sds sdsprefix = prefix[j]->ptr;
|
||||||
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix));
|
enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user