MONITOR command implemented.

This commit is contained in:
antirez 2009-03-23 19:43:39 +01:00
parent 5a039e3b95
commit 87eca72788
2 changed files with 24 additions and 9 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@ redis-cli
redis-server redis-server
redis-benchmark redis-benchmark
doc-tools doc-tools
mkrelease.sh

32
redis.c
View File

@ -89,6 +89,7 @@
#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */ #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
#define REDIS_SLAVE 2 /* This client is a slave server */ #define REDIS_SLAVE 2 /* This client is a slave server */
#define REDIS_MASTER 4 /* This client is a master server */ #define REDIS_MASTER 4 /* This client is a master server */
#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
/* Server replication state */ /* Server replication state */
#define REDIS_REPL_NONE 0 /* No active replication */ #define REDIS_REPL_NONE 0 /* No active replication */
@ -138,7 +139,7 @@ typedef struct redisClient {
list *reply; list *reply;
int sentlen; int sentlen;
time_t lastinteraction; /* time of the last interaction, used for timeout */ time_t lastinteraction; /* time of the last interaction, used for timeout */
int flags; /* REDIS_CLOSE | REDIS_SLAVE */ int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
int slaveseldb; /* slave selected db, if this client is a slave */ int slaveseldb; /* slave selected db, if this client is a slave */
} redisClient; } redisClient;
@ -154,7 +155,7 @@ struct redisServer {
dict **dict; dict **dict;
long long dirty; /* changes to DB from the last save */ long long dirty; /* changes to DB from the last save */
list *clients; list *clients;
list *slaves; list *slaves, *monitors;
char neterr[ANET_ERR_LEN]; char neterr[ANET_ERR_LEN];
aeEventLoop *el; aeEventLoop *el;
int cronloops; /* number of times the cron function run */ int cronloops; /* number of times the cron function run */
@ -235,7 +236,7 @@ static void addReplySds(redisClient *c, sds s);
static void incrRefCount(robj *o); static void incrRefCount(robj *o);
static int saveDbBackground(char *filename); static int saveDbBackground(char *filename);
static robj *createStringObject(char *ptr, size_t len); static robj *createStringObject(char *ptr, size_t len);
static void replicationFeedSlaves(struct redisCommand *cmd, int dictid, robj **argv, int argc); static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
static int syncWithMaster(void); static int syncWithMaster(void);
static void pingCommand(redisClient *c); static void pingCommand(redisClient *c);
@ -283,6 +284,7 @@ static void sortCommand(redisClient *c);
static void lremCommand(redisClient *c); static void lremCommand(redisClient *c);
static void infoCommand(redisClient *c); static void infoCommand(redisClient *c);
static void mgetCommand(redisClient *c); static void mgetCommand(redisClient *c);
static void monitorCommand(redisClient *c);
/*================================= Globals ================================= */ /*================================= Globals ================================= */
@ -335,6 +337,7 @@ static struct redisCommand cmdTable[] = {
{"flushall",flushallCommand,1,REDIS_CMD_INLINE}, {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
{"sort",sortCommand,-2,REDIS_CMD_INLINE}, {"sort",sortCommand,-2,REDIS_CMD_INLINE},
{"info",infoCommand,1,REDIS_CMD_INLINE}, {"info",infoCommand,1,REDIS_CMD_INLINE},
{"monitor",monitorCommand,1,REDIS_CMD_INLINE},
{NULL,NULL,0,0} {NULL,NULL,0,0}
}; };
@ -739,11 +742,12 @@ static void initServer() {
server.clients = listCreate(); server.clients = listCreate();
server.slaves = listCreate(); server.slaves = listCreate();
server.monitors = listCreate();
server.objfreelist = listCreate(); server.objfreelist = listCreate();
createSharedObjects(); createSharedObjects();
server.el = aeCreateEventLoop(); server.el = aeCreateEventLoop();
server.dict = zmalloc(sizeof(dict*)*server.dbnum); server.dict = zmalloc(sizeof(dict*)*server.dbnum);
if (!server.dict || !server.clients || !server.slaves || !server.el || !server.objfreelist) if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
oom("server initialization"); /* Fatal OOM */ oom("server initialization"); /* Fatal OOM */
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
if (server.fd == -1) { if (server.fd == -1) {
@ -922,9 +926,10 @@ static void freeClient(redisClient *c) {
assert(ln != NULL); assert(ln != NULL);
listDelNode(server.clients,ln); listDelNode(server.clients,ln);
if (c->flags & REDIS_SLAVE) { if (c->flags & REDIS_SLAVE) {
ln = listSearchKey(server.slaves,c); list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
assert(ln != NULL); assert(ln != NULL);
listDelNode(server.slaves,ln); listDelNode(l,ln);
} }
if (c->flags & REDIS_MASTER) { if (c->flags & REDIS_MASTER) {
server.master = NULL; server.master = NULL;
@ -1083,7 +1088,9 @@ static int processCommand(redisClient *c) {
dirty = server.dirty; dirty = server.dirty;
cmd->proc(c); cmd->proc(c);
if (server.dirty-dirty != 0 && listLength(server.slaves)) if (server.dirty-dirty != 0 && listLength(server.slaves))
replicationFeedSlaves(cmd,c->dictid,c->argv,c->argc); replicationFeedSlaves(server.slaves,cmd,c->dictid,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedSlaves(server.monitors,cmd,c->dictid,c->argv,c->argc);
server.stat_numcommands++; server.stat_numcommands++;
/* Prepare the client for the next command */ /* Prepare the client for the next command */
@ -1095,8 +1102,8 @@ static int processCommand(redisClient *c) {
return 1; return 1;
} }
static void replicationFeedSlaves(struct redisCommand *cmd, int dictid, robj **argv, int argc) { static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
listNode *ln = server.slaves->head; listNode *ln = slaves->head;
robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */ robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */
int outc = 0, j; int outc = 0, j;
@ -3021,6 +3028,13 @@ static int syncWithMaster(void) {
return REDIS_OK; return REDIS_OK;
} }
static void monitorCommand(redisClient *c) {
c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
c->slaveseldb = 0;
if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
addReply(c,shared.ok);
}
/* =================================== Main! ================================ */ /* =================================== Main! ================================ */
static void daemonize(void) { static void daemonize(void) {