Client side caching: fields and flags for tracking mode.
This commit is contained in:
parent
a28d7918d7
commit
45d64f229e
@ -158,6 +158,7 @@ client *createClient(int fd) {
|
|||||||
c->pubsub_patterns = listCreate();
|
c->pubsub_patterns = listCreate();
|
||||||
c->peerid = NULL;
|
c->peerid = NULL;
|
||||||
c->client_list_node = NULL;
|
c->client_list_node = NULL;
|
||||||
|
c->client_tracking_redirection = 0;
|
||||||
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
||||||
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
|
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
|
||||||
if (fd != -1) linkClient(c);
|
if (fd != -1) linkClient(c);
|
||||||
@ -966,6 +967,9 @@ void unlinkClient(client *c) {
|
|||||||
listDelNode(server.unblocked_clients,ln);
|
listDelNode(server.unblocked_clients,ln);
|
||||||
c->flags &= ~CLIENT_UNBLOCKED;
|
c->flags &= ~CLIENT_UNBLOCKED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Clear the tracking status. */
|
||||||
|
if (c->flags & CLIENT_TRACKING) disableTracking(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
void freeClient(client *c) {
|
void freeClient(client *c) {
|
||||||
@ -1849,6 +1853,7 @@ sds catClientInfoString(sds s, client *client) {
|
|||||||
if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
|
if (client->flags & CLIENT_PUBSUB) *p++ = 'P';
|
||||||
if (client->flags & CLIENT_MULTI) *p++ = 'x';
|
if (client->flags & CLIENT_MULTI) *p++ = 'x';
|
||||||
if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
|
if (client->flags & CLIENT_BLOCKED) *p++ = 'b';
|
||||||
|
if (client->flags & CLIENT_TRACKING) *p++ = 't';
|
||||||
if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
|
if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd';
|
||||||
if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
|
if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c';
|
||||||
if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
|
if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u';
|
||||||
|
21
src/server.h
21
src/server.h
@ -254,8 +254,8 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */
|
#define AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */
|
||||||
|
|
||||||
/* Client flags */
|
/* Client flags */
|
||||||
#define CLIENT_SLAVE (1<<0) /* This client is a slave server */
|
#define CLIENT_SLAVE (1<<0) /* This client is a repliaca */
|
||||||
#define CLIENT_MASTER (1<<1) /* This client is a master server */
|
#define CLIENT_MASTER (1<<1) /* This client is a master */
|
||||||
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
|
#define CLIENT_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
|
||||||
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
|
#define CLIENT_MULTI (1<<3) /* This client is in a MULTI context */
|
||||||
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
|
#define CLIENT_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
|
||||||
@ -289,7 +289,12 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
|
#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
|
||||||
in the list of clients we can read
|
in the list of clients we can read
|
||||||
from. */
|
from. */
|
||||||
#define CLIENT_PENDING_COMMAND (1<<30) /* */
|
#define CLIENT_PENDING_COMMAND (1<<30) /* Used in threaded I/O to signal after
|
||||||
|
we return single threaded that the
|
||||||
|
client has already pending commands
|
||||||
|
to be executed. */
|
||||||
|
#define CLIENT_TRACKING (1<<31) /* Client enabled keys tracking in order to
|
||||||
|
perform client side caching. */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
@ -845,6 +850,11 @@ typedef struct client {
|
|||||||
sds peerid; /* Cached peer ID. */
|
sds peerid; /* Cached peer ID. */
|
||||||
listNode *client_list_node; /* list node in client list */
|
listNode *client_list_node; /* list node in client list */
|
||||||
|
|
||||||
|
/* If this client is in tracking mode and this field is non zero,
|
||||||
|
* invalidation messages for keys fetched by this client will be send to
|
||||||
|
* the specified client ID. */
|
||||||
|
uint64_t client_tracking_redirection;
|
||||||
|
|
||||||
/* Response buffer */
|
/* Response buffer */
|
||||||
int bufpos;
|
int bufpos;
|
||||||
char buf[PROTO_REPLY_CHUNK_BYTES];
|
char buf[PROTO_REPLY_CHUNK_BYTES];
|
||||||
@ -1286,6 +1296,8 @@ struct redisServer {
|
|||||||
unsigned int blocked_clients_by_type[BLOCKED_NUM];
|
unsigned int blocked_clients_by_type[BLOCKED_NUM];
|
||||||
list *unblocked_clients; /* list of clients to unblock before next loop */
|
list *unblocked_clients; /* list of clients to unblock before next loop */
|
||||||
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
||||||
|
/* Client side caching. */
|
||||||
|
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
|
||||||
/* Sort parameters - qsort_r() is only available under BSD so we
|
/* Sort parameters - qsort_r() is only available under BSD so we
|
||||||
* have to take this state global, in order to pass it to sortCompare() */
|
* have to take this state global, in order to pass it to sortCompare() */
|
||||||
int sort_desc;
|
int sort_desc;
|
||||||
@ -1602,6 +1614,9 @@ void addReplyErrorFormat(client *c, const char *fmt, ...);
|
|||||||
void addReplyStatusFormat(client *c, const char *fmt, ...);
|
void addReplyStatusFormat(client *c, const char *fmt, ...);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/* Client side caching (tracking mode) */
|
||||||
|
void disableTracking(client *c);
|
||||||
|
|
||||||
/* List data type */
|
/* List data type */
|
||||||
void listTypeTryConversion(robj *subject, robj *value);
|
void listTypeTryConversion(robj *subject, robj *value);
|
||||||
void listTypePush(robj *subject, robj *value, int where);
|
void listTypePush(robj *subject, robj *value, int where);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user