From 62090d0a972348153570d4dee912e167783d1816 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 15 Feb 2019 14:11:34 -0500 Subject: [PATCH] make networking.c a C++ file --- src/Makefile | 2 +- src/ae.cpp | 6 +- src/config.c | 2 +- src/fmacros.h | 8 +- src/intset.h | 10 +- src/networking.c | 2433 -------------------------------------------- src/networking.cpp | 13 +- src/quicklist.h | 10 +- src/rax.h | 10 +- src/replication.c | 2 +- src/sds.h | 26 +- src/server.h | 17 +- 12 files changed, 53 insertions(+), 2486 deletions(-) delete mode 100644 src/networking.c diff --git a/src/Makefile b/src/Makefile index b2b5f5833..e25338116 100644 --- a/src/Makefile +++ b/src/Makefile @@ -27,7 +27,7 @@ ifneq (,$(findstring FreeBSD,$(uname_S))) STD+=-Wno-c11-extensions endif endif -WARN=-Wall -Werror -W -Wno-missing-field-initializers +WARN=-Wall -Wextra -Werror -W -Wno-missing-field-initializers OPT=$(OPTIMIZATION) PREFIX?=/usr/local diff --git a/src/ae.cpp b/src/ae.cpp index d1a220e7c..f02ad1e2c 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -473,7 +473,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { LOCK_IF_NECESSARY(fe, AE_READ_THREADSAFE); - fe->rfileProc(eventLoop,fd,fe->clientData,mask); + fe->rfileProc(eventLoop,fd,fe->clientData,mask | (fe->mask & AE_READ_THREADSAFE)); fired++; } @@ -481,7 +481,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { LOCK_IF_NECESSARY(fe, AE_WRITE_THREADSAFE); - fe->wfileProc(eventLoop,fd,fe->clientData,mask); + fe->wfileProc(eventLoop,fd,fe->clientData,mask | (fe->mask & AE_WRITE_THREADSAFE)); fired++; } } @@ -491,7 +491,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { LOCK_IF_NECESSARY(fe, AE_READ_THREADSAFE); - fe->rfileProc(eventLoop,fd,fe->clientData,mask); + fe->rfileProc(eventLoop,fd,fe->clientData,mask | (fe->mask & AE_READ_THREADSAFE)); fired++; } } diff --git a/src/config.c b/src/config.c index e0d7e4042..bc1b88cc9 100644 --- a/src/config.c +++ b/src/config.c @@ -2022,7 +2022,7 @@ void rewriteConfigClientoutputbufferlimitOption(struct rewriteConfigState *state rewriteConfigFormatMemory(soft,sizeof(soft), server.client_obuf_limits[j].soft_limit_bytes); - char *typename = getClientTypeName(j); + const char *typename = getClientTypeName(j); if (!strcmp(typename,"slave")) typename = "replica"; line = sdscatprintf(sdsempty(),"%s %s %s %s %ld", option, typename, hard, soft, diff --git a/src/fmacros.h b/src/fmacros.h index 3b1bc5eb8..a56bb9331 100644 --- a/src/fmacros.h +++ b/src/fmacros.h @@ -30,13 +30,11 @@ #ifndef _REDIS_FMACRO_H #define _REDIS_FMACRO_H -#define _BSD_SOURCE +#define _DEFAULT_SOURCE 1 #if defined(__linux__) -#ifndef __cplusplus -#define _GNU_SOURCE -#define _DEFAULT_SOURCE -#endif +#define _GNU_SOURCE 1 +#define _DEFAULT_SOURCE 1 #endif #if defined(_AIX) diff --git a/src/intset.h b/src/intset.h index 6849abff9..2431f6c7a 100644 --- a/src/intset.h +++ b/src/intset.h @@ -32,16 +32,12 @@ #define __INTSET_H #include -#ifdef __cplusplus -#define ZERO_LENGTH_ARRAY_LENGTH 1 -#else -#define ZERO_LENGTH_ARRAY_LENGTH -#endif - typedef struct intset { uint32_t encoding; uint32_t length; - int8_t contents[ZERO_LENGTH_ARRAY_LENGTH]; +#ifndef __cplusplus + int8_t contents[]; +#endif } intset; intset *intsetNew(void); diff --git a/src/networking.c b/src/networking.c deleted file mode 100644 index ada1fe662..000000000 --- a/src/networking.c +++ /dev/null @@ -1,2433 +0,0 @@ -/* - * Copyright (c) 2009-2012, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "server.h" -#include "atomicvar.h" -#include -#include -#include - -static void setProtocolError(const char *errstr, client *c); - -/* Return the size consumed from the allocator, for the specified SDS string, - * including internal fragmentation. This function is used in order to compute - * the client output buffer size. */ -size_t sdsZmallocSize(sds s) { - void *sh = sdsAllocPtr(s); - return zmalloc_size(sh); -} - -/* Return the amount of memory used by the sds string at object->ptr - * for a string object. */ -size_t getStringObjectSdsUsedMemory(robj *o) { - serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); - switch(o->encoding) { - case OBJ_ENCODING_RAW: return sdsZmallocSize(ptrFromObj(o)); - case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj); - default: return 0; /* Just integer encoding for now. */ - } -} - -/* Client.reply list dup and free methods. */ -void *dupClientReplyValue(void *o) { - clientReplyBlock *old = o; - clientReplyBlock *buf = zmalloc(sizeof(clientReplyBlock) + old->size, MALLOC_LOCAL); - memcpy(buf, o, sizeof(clientReplyBlock) + old->size); - return buf; -} - -void freeClientReplyValue(void *o) { - zfree(o); -} - -int listMatchObjects(void *a, void *b) { - return equalStringObjects(a,b); -} - -/* This function links the client to the global linked list of clients. - * unlinkClient() does the opposite, among other things. */ -void linkClient(client *c) { - listAddNodeTail(server.clients,c); - /* Note that we remember the linked list node where the client is stored, - * this way removing the client in unlinkClient() will not require - * a linear scan, but just a constant time operation. */ - c->client_list_node = listLast(server.clients); - uint64_t id = htonu64(c->id); - raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); -} - -client *createClient(int fd, int iel) { - client *c = zmalloc(sizeof(client), MALLOC_LOCAL); - - /* passing -1 as fd it is possible to create a non connected client. - * This is useful since all the commands needs to be executed - * in the context of a client. When commands are executed in other - * contexts (for instance a Lua script) we need a non connected client. */ - if (fd != -1) { - anetNonBlock(NULL,fd); - anetEnableTcpNoDelay(NULL,fd); - if (server.tcpkeepalive) - anetKeepAlive(NULL,fd,server.tcpkeepalive); - if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_READ_THREADSAFE, - readQueryFromClient, c) == AE_ERR) - { - close(fd); - zfree(c); - return NULL; - } - } - - selectDb(c,0); - uint64_t client_id; - atomicGetIncr(server.next_client_id,client_id,1); - c->iel = iel; - c->id = client_id; - c->resp = 2; - c->fd = fd; - c->name = NULL; - c->bufpos = 0; - c->qb_pos = 0; - c->querybuf = sdsempty(); - c->pending_querybuf = sdsempty(); - c->querybuf_peak = 0; - c->reqtype = 0; - c->argc = 0; - c->argv = NULL; - c->cmd = c->lastcmd = NULL; - c->puser = DefaultUser; - c->multibulklen = 0; - c->bulklen = -1; - c->sentlen = 0; - c->flags = 0; - c->ctime = c->lastinteraction = server.unixtime; - /* If the default user does not require authentication, the user is - * directly authenticated. */ - c->authenticated = (c->puser->flags & USER_FLAG_NOPASS) != 0; - c->replstate = REPL_STATE_NONE; - c->repl_put_online_on_ack = 0; - c->reploff = 0; - c->read_reploff = 0; - c->repl_ack_off = 0; - c->repl_ack_time = 0; - c->slave_listening_port = 0; - c->slave_ip[0] = '\0'; - c->slave_capa = SLAVE_CAPA_NONE; - c->reply = listCreate(); - c->reply_bytes = 0; - c->obuf_soft_limit_reached_time = 0; - listSetFreeMethod(c->reply,freeClientReplyValue); - listSetDupMethod(c->reply,dupClientReplyValue); - c->btype = BLOCKED_NONE; - c->bpop.timeout = 0; - c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL); - c->bpop.target = NULL; - c->bpop.xread_group = NULL; - c->bpop.xread_consumer = NULL; - c->bpop.xread_group_noack = 0; - c->bpop.numreplicas = 0; - c->bpop.reploffset = 0; - c->woff = 0; - c->watched_keys = listCreate(); - c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL); - c->pubsub_patterns = listCreate(); - c->peerid = NULL; - c->client_list_node = NULL; - listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); - listSetMatchMethod(c->pubsub_patterns,listMatchObjects); - if (fd != -1) linkClient(c); - initClientMultiState(c); - return c; -} - -/* This funciton puts the client in the queue of clients that should write - * their output buffers to the socket. Note that it does not *yet* install - * the write handler, to start clients are put in a queue of clients that need - * to write, so we try to do that before returning in the event loop (see the - * handleClientsWithPendingWrites() function). - * If we fail and there is more data to write, compared to what the socket - * buffers can hold, then we'll really install the handler. */ -void clientInstallWriteHandler(client *c) { - /* Schedule the client to write the output buffers to the socket only - * if not already done and, for slaves, if the slave can actually receive - * writes at this stage. */ - if (!(c->flags & CLIENT_PENDING_WRITE) && - (c->replstate == REPL_STATE_NONE || - (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) - { - /* Here instead of installing the write handler, we just flag the - * client and put it into a list of clients that have something - * to write to the socket. This way before re-entering the event - * loop, we can try to directly write to the client sockets avoiding - * a system call. We'll only really install the write handler if - * we'll not be able to write the whole reply at once. */ - c->flags |= CLIENT_PENDING_WRITE; - listAddNodeHead(server.rgclients_pending_write[c->iel],c); - } -} - -/* This function is called every time we are going to transmit new data - * to the client. The behavior is the following: - * - * If the client should receive new data (normal clients will) the function - * returns C_OK, and make sure to install the write handler in our event - * loop so that when the socket is writable new data gets written. - * - * If the client should not receive new data, because it is a fake client - * (used to load AOF in memory), a master or because the setup of the write - * handler failed, the function returns C_ERR. - * - * The function may return C_OK without actually installing the write - * event handler in the following cases: - * - * 1) The event handler should already be installed since the output buffer - * already contains something. - * 2) The client is a slave but not yet online, so we want to just accumulate - * writes in the buffer but not actually sending them yet. - * - * Typically gets called every time a reply is built, before adding more - * data to the clients output buffers. If the function returns C_ERR no - * data should be appended to the output buffers. */ -int prepareClientToWrite(client *c) { - /* If it's the Lua client we always return ok without installing any - * handler since there is no socket at all. */ - if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; - - /* CLIENT REPLY OFF / SKIP handling: don't send replies. */ - if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; - - /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag - * is set. */ - if ((c->flags & CLIENT_MASTER) && - !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR; - - if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */ - - /* Schedule the client to write the output buffers to the socket, unless - * it should already be setup to do so (it has already pending data). */ - if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c); - - /* Authorize the caller to queue in the output buffer of this client. */ - return C_OK; -} - -/* ----------------------------------------------------------------------------- - * Low level functions to add more data to output buffers. - * -------------------------------------------------------------------------- */ - -int _addReplyToBuffer(client *c, const char *s, size_t len) { - size_t available = sizeof(c->buf)-c->bufpos; - - if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK; - - /* If there already are entries in the reply list, we cannot - * add anything more to the static buffer. */ - if (listLength(c->reply) > 0) return C_ERR; - - /* Check that the buffer has enough space available for this string. */ - if (len > available) return C_ERR; - - memcpy(c->buf+c->bufpos,s,len); - c->bufpos+=len; - return C_OK; -} - -void _addReplyProtoToList(client *c, const char *s, size_t len) { - if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; - - listNode *ln = listLast(c->reply); - clientReplyBlock *tail = ln? listNodeValue(ln): NULL; - - /* Note that 'tail' may be NULL even if we have a tail node, becuase when - * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just - * fo fill it later, when the size of the bulk length is set. */ - - /* Append to tail string when possible. */ - if (tail) { - /* Copy the part we can fit into the tail, and leave the rest for a - * new node */ - size_t avail = tail->size - tail->used; - size_t copy = avail >= len? len: avail; - memcpy(tail->buf + tail->used, s, copy); - tail->used += copy; - s += copy; - len -= copy; - } - if (len) { - /* Create a new node, make sure it is allocated to at - * least PROTO_REPLY_CHUNK_BYTES */ - size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len; - tail = zmalloc(size + sizeof(clientReplyBlock), MALLOC_LOCAL); - /* take over the allocation's internal fragmentation */ - tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); - tail->used = len; - memcpy(tail->buf, s, len); - listAddNodeTail(c->reply, tail); - c->reply_bytes += tail->size; - } - asyncCloseClientOnOutputBufferLimitReached(c); -} - -/* ----------------------------------------------------------------------------- - * Higher level functions to queue data on the client output buffer. - * The following functions are the ones that commands implementations will call. - * -------------------------------------------------------------------------- */ - -/* Add the object 'obj' string representation to the client output buffer. */ -void addReply(client *c, robj *obj) { - if (prepareClientToWrite(c) != C_OK) return; - - if (sdsEncodedObject(obj)) { - if (_addReplyToBuffer(c,ptrFromObj(obj),sdslen(ptrFromObj(obj))) != C_OK) - _addReplyProtoToList(c,ptrFromObj(obj),sdslen(ptrFromObj(obj))); - } else if (obj->encoding == OBJ_ENCODING_INT) { - /* For integer encoded strings we just convert it into a string - * using our optimized function, and attach the resulting string - * to the output buffer. */ - char buf[32]; - size_t len = ll2string(buf,sizeof(buf),(long)ptrFromObj(obj)); - if (_addReplyToBuffer(c,buf,len) != C_OK) - _addReplyProtoToList(c,buf,len); - } else { - serverPanic("Wrong obj->encoding in addReply()"); - } -} - -/* Add the SDS 's' string to the client output buffer, as a side effect - * the SDS string is freed. */ -void addReplySds(client *c, sds s) { - if (prepareClientToWrite(c) != C_OK) { - /* The caller expects the sds to be free'd. */ - sdsfree(s); - return; - } - if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK) - _addReplyProtoToList(c,s,sdslen(s)); - sdsfree(s); -} - -/* This low level function just adds whatever protocol you send it to the - * client buffer, trying the static buffer initially, and using the string - * of objects if not possible. - * - * It is efficient because does not create an SDS object nor an Redis object - * if not needed. The object will only be created by calling - * _addReplyProtoToList() if we fail to extend the existing tail object - * in the list of objects. */ -void addReplyProto(client *c, const char *s, size_t len) { - if (prepareClientToWrite(c) != C_OK) return; - if (_addReplyToBuffer(c,s,len) != C_OK) - _addReplyProtoToList(c,s,len); -} - -/* Low level function called by the addReplyError...() functions. - * It emits the protocol for a Redis error, in the form: - * - * -ERRORCODE Error Message - * - * If the error code is already passed in the string 's', the error - * code provided is used, otherwise the string "-ERR " for the generic - * error code is automatically added. */ -void addReplyErrorLength(client *c, const char *s, size_t len) { - /* If the string already starts with "-..." then the error code - * is provided by the caller. Otherwise we use "-ERR". */ - if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5); - addReplyProto(c,s,len); - addReplyProto(c,"\r\n",2); - - /* Sometimes it could be normal that a slave replies to a master with - * an error and this function gets called. Actually the error will never - * be sent because addReply*() against master clients has no effect... - * A notable example is: - * - * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x - * - * Where the master must propagate the first change even if the second - * will produce an error. However it is useful to log such events since - * they are rare and may hint at errors in a script or a bug in Redis. */ - if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { - char* to = c->flags & CLIENT_MASTER? "master": "replica"; - char* from = c->flags & CLIENT_MASTER? "replica": "master"; - char *cmdname = c->lastcmd ? c->lastcmd->name : ""; - serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error " - "to its %s: '%s' after processing the command " - "'%s'", from, to, s, cmdname); - } -} - -void addReplyError(client *c, const char *err) { - addReplyErrorLength(c,err,strlen(err)); -} - -void addReplyErrorFormat(client *c, const char *fmt, ...) { - size_t l, j; - va_list ap; - va_start(ap,fmt); - sds s = sdscatvprintf(sdsempty(),fmt,ap); - va_end(ap); - /* Make sure there are no newlines in the string, otherwise invalid protocol - * is emitted. */ - l = sdslen(s); - for (j = 0; j < l; j++) { - if (s[j] == '\r' || s[j] == '\n') s[j] = ' '; - } - addReplyErrorLength(c,s,sdslen(s)); - sdsfree(s); -} - -void addReplyStatusLength(client *c, const char *s, size_t len) { - addReplyProto(c,"+",1); - addReplyProto(c,s,len); - addReplyProto(c,"\r\n",2); -} - -void addReplyStatus(client *c, const char *status) { - addReplyStatusLength(c,status,strlen(status)); -} - -void addReplyStatusFormat(client *c, const char *fmt, ...) { - va_list ap; - va_start(ap,fmt); - sds s = sdscatvprintf(sdsempty(),fmt,ap); - va_end(ap); - addReplyStatusLength(c,s,sdslen(s)); - sdsfree(s); -} - -/* Adds an empty object to the reply list that will contain the multi bulk - * length, which is not known when this function is called. */ -void *addReplyDeferredLen(client *c) { - /* Note that we install the write event here even if the object is not - * ready to be sent, since we are sure that before returning to the - * event loop setDeferredAggregateLen() will be called. */ - if (prepareClientToWrite(c) != C_OK) return NULL; - listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ - return listLast(c->reply); -} - -/* Populate the length object and try gluing it to the next chunk. */ -void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { - listNode *ln = (listNode*)node; - clientReplyBlock *next; - char lenstr[128]; - size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length); - - /* Abort when *node is NULL: when the client should not accept writes - * we return NULL in addReplyDeferredLen() */ - if (node == NULL) return; - serverAssert(!listNodeValue(ln)); - - /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(), - * with a new buffer structure containing the protocol needed to specify - * the length of the array following. However sometimes when there is - * little memory to move, we may instead remove this NULL node, and prefix - * our protocol in the node immediately after to it, in order to save a - * write(2) syscall later. Conditions needed to do it: - * - * - The next node is non-NULL, - * - It has enough room already allocated - * - And not too large (avoid large memmove) */ - if (ln->next != NULL && (next = listNodeValue(ln->next)) && - next->size - next->used >= lenstr_len && - next->used < PROTO_REPLY_CHUNK_BYTES * 4) { - memmove(next->buf + lenstr_len, next->buf, next->used); - memcpy(next->buf, lenstr, lenstr_len); - next->used += lenstr_len; - listDelNode(c->reply,ln); - } else { - /* Create a new node */ - clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock), MALLOC_LOCAL); - /* Take over the allocation's internal fragmentation */ - buf->size = zmalloc_usable(buf) - sizeof(clientReplyBlock); - buf->used = lenstr_len; - memcpy(buf->buf, lenstr, lenstr_len); - listNodeValue(ln) = buf; - c->reply_bytes += buf->size; - } - asyncCloseClientOnOutputBufferLimitReached(c); -} - -void setDeferredArrayLen(client *c, void *node, long length) { - setDeferredAggregateLen(c,node,length,'*'); -} - -void setDeferredMapLen(client *c, void *node, long length) { - int prefix = c->resp == 2 ? '*' : '%'; - if (c->resp == 2) length *= 2; - setDeferredAggregateLen(c,node,length,prefix); -} - -void setDeferredSetLen(client *c, void *node, long length) { - int prefix = c->resp == 2 ? '*' : '~'; - setDeferredAggregateLen(c,node,length,prefix); -} - -void setDeferredAttributeLen(client *c, void *node, long length) { - int prefix = c->resp == 2 ? '*' : '|'; - if (c->resp == 2) length *= 2; - setDeferredAggregateLen(c,node,length,prefix); -} - -void setDeferredPushLen(client *c, void *node, long length) { - int prefix = c->resp == 2 ? '*' : '>'; - setDeferredAggregateLen(c,node,length,prefix); -} - -/* Add a double as a bulk reply */ -void addReplyDouble(client *c, double d) { - if (isinf(d)) { - /* Libc in odd systems (Hi Solaris!) will format infinite in a - * different way, so better to handle it in an explicit way. */ - if (c->resp == 2) { - addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); - } else { - addReplyProto(c, d > 0 ? ",inf\r\n" : "-inf\r\n", - d > 0 ? 6 : 7); - } - } else { - char dbuf[MAX_LONG_DOUBLE_CHARS+3], - sbuf[MAX_LONG_DOUBLE_CHARS+32]; - int dlen, slen; - if (c->resp == 2) { - dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); - slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); - addReplyProto(c,sbuf,slen); - } else { - dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d); - addReplyProto(c,dbuf,dlen); - } - } -} - -/* Add a long double as a bulk reply, but uses a human readable formatting - * of the double instead of exposing the crude behavior of doubles to the - * dear user. */ -void addReplyHumanLongDouble(client *c, long double d) { - if (c->resp == 2) { - robj *o = createStringObjectFromLongDouble(d,1); - addReplyBulk(c,o); - decrRefCount(o); - } else { - char buf[MAX_LONG_DOUBLE_CHARS]; - int len = ld2string(buf,sizeof(buf),d,1); - addReplyProto(c,",",1); - addReplyProto(c,buf,len); - addReplyProto(c,"\r\n",2); - } -} - -/* Add a long long as integer reply or bulk len / multi bulk count. - * Basically this is used to output . */ -void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) { - char buf[128]; - int len; - - /* Things like $3\r\n or *2\r\n are emitted very often by the protocol - * so we have a few shared objects to use if the integer is small - * like it is most of the times. */ - if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { - addReply(c,shared.mbulkhdr[ll]); - return; - } else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) { - addReply(c,shared.bulkhdr[ll]); - return; - } - - buf[0] = prefix; - len = ll2string(buf+1,sizeof(buf)-1,ll); - buf[len+1] = '\r'; - buf[len+2] = '\n'; - addReplyProto(c,buf,len+3); -} - -void addReplyLongLong(client *c, long long ll) { - if (ll == 0) - addReply(c,shared.czero); - else if (ll == 1) - addReply(c,shared.cone); - else - addReplyLongLongWithPrefix(c,ll,':'); -} - -void addReplyAggregateLen(client *c, long length, int prefix) { - if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN) - addReply(c,shared.mbulkhdr[length]); - else - addReplyLongLongWithPrefix(c,length,prefix); -} - -void addReplyArrayLen(client *c, long length) { - addReplyAggregateLen(c,length,'*'); -} - -void addReplyMapLen(client *c, long length) { - int prefix = c->resp == 2 ? '*' : '%'; - if (c->resp == 2) length *= 2; - addReplyAggregateLen(c,length,prefix); -} - -void addReplySetLen(client *c, long length) { - int prefix = c->resp == 2 ? '*' : '~'; - addReplyAggregateLen(c,length,prefix); -} - -void addReplyAttributeLen(client *c, long length) { - int prefix = c->resp == 2 ? '*' : '|'; - if (c->resp == 2) length *= 2; - addReplyAggregateLen(c,length,prefix); -} - -void addReplyPushLen(client *c, long length) { - int prefix = c->resp == 2 ? '*' : '>'; - addReplyAggregateLen(c,length,prefix); -} - -void addReplyNull(client *c) { - if (c->resp == 2) { - addReplyProto(c,"$-1\r\n",5); - } else { - addReplyProto(c,"_\r\n",3); - } -} - -void addReplyBool(client *c, int b) { - if (c->resp == 2) { - addReply(c, b ? shared.cone : shared.czero); - } else { - addReplyProto(c, b ? "#t\r\n" : "#f\r\n",4); - } -} - -/* A null array is a concept that no longer exists in RESP3. However - * RESP2 had it, so API-wise we have this call, that will emit the correct - * RESP2 protocol, however for RESP3 the reply will always be just the - * Null type "_\r\n". */ -void addReplyNullArray(client *c) { - if (c->resp == 2) { - addReplyProto(c,"*-1\r\n",5); - } else { - addReplyProto(c,"_\r\n",3); - } -} - -/* Create the length prefix of a bulk reply, example: $2234 */ -void addReplyBulkLen(client *c, robj *obj) { - size_t len; - - if (sdsEncodedObject(obj)) { - len = sdslen(ptrFromObj(obj)); - } else { - long n = (long)ptrFromObj(obj); - - /* Compute how many bytes will take this integer as a radix 10 string */ - len = 1; - if (n < 0) { - len++; - n = -n; - } - while((n = n/10) != 0) { - len++; - } - } - - if (len < OBJ_SHARED_BULKHDR_LEN) - addReply(c,shared.bulkhdr[len]); - else - addReplyLongLongWithPrefix(c,len,'$'); -} - -/* Add a Redis Object as a bulk reply */ -void addReplyBulk(client *c, robj *obj) { - addReplyBulkLen(c,obj); - addReply(c,obj); - addReply(c,shared.crlf); -} - -/* Add a C buffer as bulk reply */ -void addReplyBulkCBuffer(client *c, const void *p, size_t len) { - addReplyLongLongWithPrefix(c,len,'$'); - addReplyProto(c,p,len); - addReply(c,shared.crlf); -} - -/* Add sds to reply (takes ownership of sds and frees it) */ -void addReplyBulkSds(client *c, sds s) { - addReplyLongLongWithPrefix(c,sdslen(s),'$'); - addReplySds(c,s); - addReply(c,shared.crlf); -} - -/* Add a C null term string as bulk reply */ -void addReplyBulkCString(client *c, const char *s) { - if (s == NULL) { - addReplyNull(c); - } else { - addReplyBulkCBuffer(c,s,strlen(s)); - } -} - -/* Add a long long as a bulk reply */ -void addReplyBulkLongLong(client *c, long long ll) { - char buf[64]; - int len; - - len = ll2string(buf,64,ll); - addReplyBulkCBuffer(c,buf,len); -} - -/* Reply with a verbatim type having the specified extension. - * - * The 'ext' is the "extension" of the file, actually just a three - * character type that describes the format of the verbatim string. - * For instance "txt" means it should be interpreted as a text only - * file by the receiver, "md " as markdown, and so forth. Only the - * three first characters of the extension are used, and if the - * provided one is shorter than that, the remaining is filled with - * spaces. */ -void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { - if (c->resp == 2) { - addReplyBulkCBuffer(c,s,len); - } else { - char buf[32]; - size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); - char *p = buf+preflen-4; - for (int i = 0; i < 3; i++) { - if (*ext == '\0') { - p[i] = ' '; - } else { - p[i] = *ext++; - } - } - addReplyProto(c,buf,preflen); - addReplyProto(c,s,len); - addReplyProto(c,"\r\n",2); - } -} - -/* Add an array of C strings as status replies with a heading. - * This function is typically invoked by from commands that support - * subcommands in response to the 'help' subcommand. The help array - * is terminated by NULL sentinel. */ -void addReplyHelp(client *c, const char **help) { - sds cmd = sdsnew((char*) ptrFromObj(c->argv[0])); - void *blenp = addReplyDeferredLen(c); - int blen = 0; - - sdstoupper(cmd); - addReplyStatusFormat(c, - "%s arg arg ... arg. Subcommands are:",cmd); - sdsfree(cmd); - - while (help[blen]) addReplyStatus(c,help[blen++]); - - blen++; /* Account for the header line(s). */ - setDeferredArrayLen(c,blenp,blen); -} - -/* Add a suggestive error reply. - * This function is typically invoked by from commands that support - * subcommands in response to an unknown subcommand or argument error. */ -void addReplySubcommandSyntaxError(client *c) { - sds cmd = sdsnew((char*) ptrFromObj(c->argv[0])); - sdstoupper(cmd); - addReplyErrorFormat(c, - "Unknown subcommand or wrong number of arguments for '%s'. Try %s HELP.", - (char*)ptrFromObj(c->argv[1]),cmd); - sdsfree(cmd); -} - -/* Copy 'src' client output buffers into 'dst' client output buffers. - * The function takes care of freeing the old output buffers of the - * destination client. */ -void copyClientOutputBuffer(client *dst, client *src) { - listRelease(dst->reply); - dst->sentlen = 0; - dst->reply = listDup(src->reply); - memcpy(dst->buf,src->buf,src->bufpos); - dst->bufpos = src->bufpos; - dst->reply_bytes = src->reply_bytes; -} - -/* Return true if the specified client has pending reply buffers to write to - * the socket. */ -int clientHasPendingReplies(client *c) { - return c->bufpos || listLength(c->reply); -} - -#define MAX_ACCEPTS_PER_CALL 1000 -static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { - client *c; - if ((c = createClient(fd, iel)) == NULL) { - serverLog(LL_WARNING, - "Error registering fd event for the new client: %s (fd=%d)", - strerror(errno),fd); - close(fd); /* May be already closed, just ignore errors */ - return; - } - /* If maxclient directive is set and this is one client more... close the - * connection. Note that we create the client instead to check before - * for this condition, since now the socket is already set in non-blocking - * mode and we can send an error for free using the Kernel I/O */ - if (listLength(server.clients) > server.maxclients) { - char *err = "-ERR max number of clients reached\r\n"; - - /* That's a best effort error message, don't check write errors */ - if (write(c->fd,err,strlen(err)) == -1) { - /* Nothing to do, Just to avoid the warning... */ - } - server.stat_rejected_conn++; - freeClient(c); - return; - } - - /* If the server is running in protected mode (the default) and there - * is no password set, nor a specific interface is bound, we don't accept - * requests from non loopback interfaces. Instead we try to explain the - * user what to do to fix it if needed. */ - if (server.protected_mode && - server.bindaddr_count == 0 && - DefaultUser->flags & USER_FLAG_NOPASS && - !(flags & CLIENT_UNIX_SOCKET) && - ip != NULL) - { - if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) { - char *err = - "-DENIED Redis is running in protected mode because protected " - "mode is enabled, no bind address was specified, no " - "authentication password is requested to clients. In this mode " - "connections are only accepted from the loopback interface. " - "If you want to connect from external computers to Redis you " - "may adopt one of the following solutions: " - "1) Just disable protected mode sending the command " - "'CONFIG SET protected-mode no' from the loopback interface " - "by connecting to Redis from the same host the server is " - "running, however MAKE SURE Redis is not publicly accessible " - "from internet if you do so. Use CONFIG REWRITE to make this " - "change permanent. " - "2) Alternatively you can just disable the protected mode by " - "editing the Redis configuration file, and setting the protected " - "mode option to 'no', and then restarting the server. " - "3) If you started the server manually just for testing, restart " - "it with the '--protected-mode no' option. " - "4) Setup a bind address or an authentication password. " - "NOTE: You only need to do one of the above things in order for " - "the server to start accepting connections from the outside.\r\n"; - if (write(c->fd,err,strlen(err)) == -1) { - /* Nothing to do, Just to avoid the warning... */ - } - server.stat_rejected_conn++; - freeClient(c); - return; - } - } - - server.stat_numconnections++; - c->flags |= flags; -} - -void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd, max = MAX_ACCEPTS_PER_CALL; - char cip[NET_IP_STR_LEN]; - UNUSED(mask); - UNUSED(privdata); - - while(max--) { - cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); - if (cfd == ANET_ERR) { - if (errno != EWOULDBLOCK) - serverLog(LL_WARNING, - "Accepting client connection: %s", server.neterr); - return; - } - serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - int ielCur = ielFromEventLoop(el); - - // We always accept on the same thread - aeAcquireLock(); - acceptCommonHandler(cfd,0,cip, ielCur); - aeReleaseLock(); - } -} - -void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cfd, max = MAX_ACCEPTS_PER_CALL; - UNUSED(el); - UNUSED(mask); - UNUSED(privdata); - - while(max--) { - cfd = anetUnixAccept(server.neterr, fd); - if (cfd == ANET_ERR) { - if (errno != EWOULDBLOCK) - serverLog(LL_WARNING, - "Accepting client connection: %s", server.neterr); - return; - } - int ielCur = ielFromEventLoop(el); - serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); - - aeAcquireLock(); - acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur); - aeReleaseLock(); - - } -} - -static void freeClientArgv(client *c) { - int j; - for (j = 0; j < c->argc; j++) - decrRefCount(c->argv[j]); - c->argc = 0; - c->cmd = NULL; -} - -/* Close all the slaves connections. This is useful in chained replication - * when we resync with our own master and want to force all our slaves to - * resync with us as well. */ -void disconnectSlaves(void) { - while (listLength(server.slaves)) { - listNode *ln = listFirst(server.slaves); - freeClient((client*)ln->value); - } -} - -/* Remove the specified client from global lists where the client could - * be referenced, not including the Pub/Sub channels. - * This is used by freeClient() and replicationCacheMaster(). */ -void unlinkClient(client *c) { - listNode *ln; - - /* If this is marked as current client unset it. */ - if (server.current_client == c) server.current_client = NULL; - - /* Certain operations must be done only if the client has an active socket. - * If the client was already unlinked or if it's a "fake client" the - * fd is already set to -1. */ - if (c->fd != -1) { - /* Remove from the list of active clients. */ - if (c->client_list_node) { - uint64_t id = htonu64(c->id); - raxRemove(server.clients_index,(unsigned char*)&id,sizeof(id),NULL); - listDelNode(server.clients,c->client_list_node); - c->client_list_node = NULL; - } - - /* Unregister async I/O handlers and close the socket. */ - aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_READABLE); - aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_WRITABLE); - close(c->fd); - c->fd = -1; - } - - /* Remove from the list of pending writes if needed. */ - if (c->flags & CLIENT_PENDING_WRITE) { - ln = listSearchKey(server.rgclients_pending_write[c->iel],c); - serverAssert(ln != NULL); - listDelNode(server.rgclients_pending_write[c->iel],ln); - c->flags &= ~CLIENT_PENDING_WRITE; - } - - /* When client was just unblocked because of a blocking operation, - * remove it from the list of unblocked clients. */ - if (c->flags & CLIENT_UNBLOCKED) { - ln = listSearchKey(server.rgunblocked_clients[c->iel],c); - serverAssert(ln != NULL); - listDelNode(server.rgunblocked_clients[c->iel],ln); - c->flags &= ~CLIENT_UNBLOCKED; - } -} - -void freeClient(client *c) { - listNode *ln; - - /* If a client is protected, yet we need to free it right now, make sure - * to at least use asynchronous freeing. */ - if (c->flags & CLIENT_PROTECTED) { - freeClientAsync(c); - return; - } - - /* If it is our master that's beging disconnected we should make sure - * to cache the state to try a partial resynchronization later. - * - * Note that before doing this we make sure that the client is not in - * some unexpected state, by checking its flags. */ - if (server.master && c->flags & CLIENT_MASTER) { - serverLog(LL_WARNING,"Connection with master lost."); - if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY| - CLIENT_CLOSE_ASAP| - CLIENT_BLOCKED))) - { - replicationCacheMaster(c); - return; - } - } - - /* Log link disconnection with slave */ - if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) { - serverLog(LL_WARNING,"Connection with replica %s lost.", - replicationGetSlaveName(c)); - } - - /* Free the query buffer */ - sdsfree(c->querybuf); - sdsfree(c->pending_querybuf); - c->querybuf = NULL; - - /* Deallocate structures used to block on blocking ops. */ - if (c->flags & CLIENT_BLOCKED) unblockClient(c); - dictRelease(c->bpop.keys); - - /* UNWATCH all the keys */ - unwatchAllKeys(c); - listRelease(c->watched_keys); - - /* Unsubscribe from all the pubsub channels */ - pubsubUnsubscribeAllChannels(c,0); - pubsubUnsubscribeAllPatterns(c,0); - dictRelease(c->pubsub_channels); - listRelease(c->pubsub_patterns); - - /* Free data structures. */ - listRelease(c->reply); - freeClientArgv(c); - - /* Unlink the client: this will close the socket, remove the I/O - * handlers, and remove references of the client from different - * places where active clients may be referenced. */ - unlinkClient(c); - - /* Master/slave cleanup Case 1: - * we lost the connection with a slave. */ - if (c->flags & CLIENT_SLAVE) { - if (c->replstate == SLAVE_STATE_SEND_BULK) { - if (c->repldbfd != -1) close(c->repldbfd); - if (c->replpreamble) sdsfree(c->replpreamble); - } - list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves; - ln = listSearchKey(l,c); - serverAssert(ln != NULL); - listDelNode(l,ln); - /* We need to remember the time when we started to have zero - * attached slaves, as after some time we'll free the replication - * backlog. */ - if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0) - server.repl_no_slaves_since = server.unixtime; - refreshGoodSlavesCount(); - } - - /* Master/slave cleanup Case 2: - * we lost the connection with the master. */ - if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); - - /* If this client was scheduled for async freeing we need to remove it - * from the queue. */ - if (c->flags & CLIENT_CLOSE_ASAP) { - ln = listSearchKey(server.clients_to_close,c); - serverAssert(ln != NULL); - listDelNode(server.clients_to_close,ln); - } - - /* Release other dynamically allocated client structure fields, - * and finally release the client structure itself. */ - if (c->name) decrRefCount(c->name); - zfree(c->argv); - freeClientMultiState(c); - sdsfree(c->peerid); - zfree(c); -} - -/* Schedule a client to free it at a safe time in the serverCron() function. - * This function is useful when we need to terminate a client but we are in - * a context where calling freeClient() is not possible, because the client - * should be valid for the continuation of the flow of the program. */ -void freeClientAsync(client *c) { - if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; - c->flags |= CLIENT_CLOSE_ASAP; - listAddNodeTail(server.clients_to_close,c); -} - -void freeClientsInAsyncFreeQueue(void) { - while (listLength(server.clients_to_close)) { - listNode *ln = listFirst(server.clients_to_close); - client *c = listNodeValue(ln); - - c->flags &= ~CLIENT_CLOSE_ASAP; - freeClient(c); - listDelNode(server.clients_to_close,ln); - } -} - -/* Return a client by ID, or NULL if the client ID is not in the set - * of registered clients. Note that "fake clients", created with -1 as FD, - * are not registered clients. */ -client *lookupClientByID(uint64_t id) { - id = htonu64(id); - client *c = raxFind(server.clients_index,(unsigned char*)&id,sizeof(id)); - return (c == raxNotFound) ? NULL : c; -} - -/* Write data in output buffers to client. Return C_OK if the client - * is still valid after the call, C_ERR if it was freed. */ -int writeToClient(int fd, client *c, int handler_installed) { - ssize_t nwritten = 0, totwritten = 0; - size_t objlen; - clientReplyBlock *o; - - while(clientHasPendingReplies(c)) { - if (c->bufpos > 0) { - nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); - - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If the buffer was sent, set bufpos to zero to continue with - * the remainder of the reply. */ - if ((int)c->sentlen == c->bufpos) { - c->bufpos = 0; - c->sentlen = 0; - } - } else { - o = listNodeValue(listFirst(c->reply)); - objlen = o->used; - - if (objlen == 0) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - continue; - } - - nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); - - if (nwritten <= 0) break; - c->sentlen += nwritten; - totwritten += nwritten; - - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { - c->reply_bytes -= o->size; - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; - /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ - if (listLength(c->reply) == 0) - serverAssert(c->reply_bytes == 0); - } - } - /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT - * bytes, in a single threaded server it's a good idea to serve - * other clients as well, even if a very large request comes from - * super fast link that is always able to accept data (in real world - * scenario think about 'KEYS *' against the loopback interface). - * - * However if we are over the maxmemory limit we ignore that and - * just deliver as much data as it is possible to deliver. - * - * Moreover, we also send as much as possible if the client is - * a slave (otherwise, on high-speed traffic, the replication - * buffer will grow indefinitely) */ - if (totwritten > NET_MAX_WRITES_PER_EVENT && - (server.maxmemory == 0 || - zmalloc_used_memory() < server.maxmemory) && - !(c->flags & CLIENT_SLAVE)) break; - } - - __atomic_fetch_add(&server.stat_net_output_bytes, totwritten, __ATOMIC_RELAXED); - if (nwritten == -1) { - if (errno == EAGAIN) { - nwritten = 0; - } else { - serverLog(LL_VERBOSE, - "Error writing to client: %s", strerror(errno)); - freeClient(c); - return C_ERR; - } - } - if (totwritten > 0) { - /* For clients representing masters we don't count sending data - * as an interaction, since we always send REPLCONF ACK commands - * that take some time to just fill the socket output buffer. - * We just rely on data / pings received for timeout detection. */ - if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; - } - if (!clientHasPendingReplies(c)) { - c->sentlen = 0; - if (handler_installed) aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE); - - /* Close connection after entire reply has been sent. */ - if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { - freeClient(c); - return C_ERR; - } - } - return C_OK; -} - -/* Write event handler. Just send data to the client. */ -void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { - UNUSED(el); - UNUSED(mask); - writeToClient(fd,privdata,1); -} - -/* This function is called just before entering the event loop, in the hope - * we can just write the replies to the client output buffer without any - * need to use a syscall in order to install the writable event handler, - * get it called, and so forth. */ -int handleClientsWithPendingWrites(int iel) { - listIter li; - listNode *ln; - list *pending_writes = server.rgclients_pending_write[iel]; - int processed = listLength(pending_writes); - - listRewind(pending_writes,&li); - while((ln = listNext(&li))) { - client *c = listNodeValue(ln); - c->flags &= ~CLIENT_PENDING_WRITE; - listDelNode(pending_writes,ln); - - /* If a client is protected, don't do anything, - * that may trigger write error or recreate handler. */ - if (c->flags & CLIENT_PROTECTED) continue; - - /* Try to write buffers to the client socket. */ - if (writeToClient(c->fd,c,0) == C_ERR) continue; - - /* If after the synchronous writes above we still have data to - * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c)) { - int ae_flags = AE_WRITABLE; - /* For the fsync=always policy, we want that a given FD is never - * served for reading and writing in the same event loop iteration, - * so that in the middle of receiving the query, and serving it - * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. AE_BARRIER ensures that. */ - if (server.aof_state == AOF_ON && - server.aof_fsync == AOF_FSYNC_ALWAYS) - { - ae_flags |= AE_BARRIER; - } - if (aeCreateFileEvent(server.rgel[c->iel], c->fd, ae_flags, - sendReplyToClient, c) == AE_ERR) - { - freeClientAsync(c); - } - } - } - return processed; -} - -/* resetClient prepare the client to process the next command */ -void resetClient(client *c) { - redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL; - - freeClientArgv(c); - c->reqtype = 0; - c->multibulklen = 0; - c->bulklen = -1; - - /* We clear the ASKING flag as well if we are not inside a MULTI, and - * if what we just executed is not the ASKING command itself. */ - if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand) - c->flags &= ~CLIENT_ASKING; - - /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply - * to the next command will be sent, but set the flag if the command - * we just processed was "CLIENT REPLY SKIP". */ - c->flags &= ~CLIENT_REPLY_SKIP; - if (c->flags & CLIENT_REPLY_SKIP_NEXT) { - c->flags |= CLIENT_REPLY_SKIP; - c->flags &= ~CLIENT_REPLY_SKIP_NEXT; - } -} - -/* This funciton is used when we want to re-enter the event loop but there - * is the risk that the client we are dealing with will be freed in some - * way. This happens for instance in: - * - * * DEBUG RELOAD and similar. - * * When a Lua script is in -BUSY state. - * - * So the function will protect the client by doing two things: - * - * 1) It removes the file events. This way it is not possible that an - * error is signaled on the socket, freeing the client. - * 2) Moreover it makes sure that if the client is freed in a different code - * path, it is not really released, but only marked for later release. */ -void protectClient(client *c) { - c->flags |= CLIENT_PROTECTED; - aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_READABLE); - aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE); -} - -/* This will undo the client protection done by protectClient() */ -void unprotectClient(client *c) { - if (c->flags & CLIENT_PROTECTED) { - c->flags &= ~CLIENT_PROTECTED; - aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c); - if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); - } -} - -/* Like processMultibulkBuffer(), but for the inline protocol instead of RESP, - * this function consumes the client query buffer and creates a command ready - * to be executed inside the client structure. Returns C_OK if the command - * is ready to be executed, or C_ERR if there is still protocol to read to - * have a well formed command. The function also returns C_ERR when there is - * a protocol error: in such a case the client structure is setup to reply - * with the error and close the connection. */ -int processInlineBuffer(client *c) { - char *newline; - int argc, j, linefeed_chars = 1; - sds *argv, aux; - size_t querylen; - - /* Search for end of line */ - newline = strchr(c->querybuf+c->qb_pos,'\n'); - - /* Nothing to do without a \r\n */ - if (newline == NULL) { - if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - addReplyError(c,"Protocol error: too big inline request"); - setProtocolError("too big inline request",c); - } - return C_ERR; - } - - /* Handle the \r\n case. */ - if (newline && newline != c->querybuf+c->qb_pos && *(newline-1) == '\r') - newline--, linefeed_chars++; - - /* Split the input buffer up to the \r\n */ - querylen = newline-(c->querybuf+c->qb_pos); - aux = sdsnewlen(c->querybuf+c->qb_pos,querylen); - argv = sdssplitargs(aux,&argc); - sdsfree(aux); - if (argv == NULL) { - addReplyError(c,"Protocol error: unbalanced quotes in request"); - setProtocolError("unbalanced quotes in inline request",c); - return C_ERR; - } - - /* Newline from slaves can be used to refresh the last ACK time. - * This is useful for a slave to ping back while loading a big - * RDB file. */ - if (querylen == 0 && c->flags & CLIENT_SLAVE) - c->repl_ack_time = server.unixtime; - - /* Move querybuffer position to the next query in the buffer. */ - c->qb_pos += querylen+linefeed_chars; - - /* Setup argv array on client structure */ - if (argc) { - if (c->argv) zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*argc, MALLOC_LOCAL); - } - - /* Create redis objects for all arguments. */ - for (c->argc = 0, j = 0; j < argc; j++) { - if (sdslen(argv[j])) { - c->argv[c->argc] = createObject(OBJ_STRING,argv[j]); - c->argc++; - } else { - sdsfree(argv[j]); - } - } - sds_free(argv); - return C_OK; -} - -/* Helper function. Record protocol erro details in server log, - * and set the client as CLIENT_CLOSE_AFTER_REPLY. */ -#define PROTO_DUMP_LEN 128 -static void setProtocolError(const char *errstr, client *c) { - if (server.verbosity <= LL_VERBOSE) { - sds client = catClientInfoString(sdsempty(),c); - - /* Sample some protocol to given an idea about what was inside. */ - char buf[256]; - if (sdslen(c->querybuf)-c->qb_pos < PROTO_DUMP_LEN) { - snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%s'", c->querybuf+c->qb_pos); - } else { - snprintf(buf,sizeof(buf),"Query buffer during protocol error: '%.*s' (... more %zu bytes ...) '%.*s'", PROTO_DUMP_LEN/2, c->querybuf+c->qb_pos, sdslen(c->querybuf)-c->qb_pos-PROTO_DUMP_LEN, PROTO_DUMP_LEN/2, c->querybuf+sdslen(c->querybuf)-PROTO_DUMP_LEN/2); - } - - /* Remove non printable chars. */ - char *p = buf; - while (*p != '\0') { - if (!isprint(*p)) *p = '.'; - p++; - } - - /* Log all the client and protocol info. */ - serverLog(LL_VERBOSE, - "Protocol error (%s) from client: %s. %s", errstr, client, buf); - sdsfree(client); - } - c->flags |= CLIENT_CLOSE_AFTER_REPLY; -} - -/* Process the query buffer for client 'c', setting up the client argument - * vector for command execution. Returns C_OK if after running the function - * the client has a well-formed ready to be processed command, otherwise - * C_ERR if there is still to read more buffer to get the full command. - * The function also returns C_ERR when there is a protocol error: in such a - * case the client structure is setup to reply with the error and close - * the connection. - * - * This function is called if processInputBuffer() detects that the next - * command is in RESP format, so the first byte in the command is found - * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */ -int processMultibulkBuffer(client *c) { - char *newline = NULL; - int ok; - long long ll; - - if (c->multibulklen == 0) { - /* The client should have been reset */ - serverAssertWithInfo(c,NULL,c->argc == 0); - - /* Multi bulk length cannot be read without a \r\n */ - newline = strchr(c->querybuf+c->qb_pos,'\r'); - if (newline == NULL) { - if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - addReplyError(c,"Protocol error: too big mbulk count string"); - setProtocolError("too big mbulk count string",c); - } - return C_ERR; - } - - /* Buffer should also contain \n */ - if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) - return C_ERR; - - /* We know for sure there is a whole line since newline != NULL, - * so go ahead and find out the multi bulk length. */ - serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*'); - ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); - if (!ok || ll > 1024*1024) { - addReplyError(c,"Protocol error: invalid multibulk length"); - setProtocolError("invalid mbulk count",c); - return C_ERR; - } - - c->qb_pos = (newline-c->querybuf)+2; - - if (ll <= 0) return C_OK; - - c->multibulklen = ll; - - /* Setup argv array on client structure */ - if (c->argv) zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*c->multibulklen, MALLOC_LOCAL); - } - - serverAssertWithInfo(c,NULL,c->multibulklen > 0); - while(c->multibulklen) { - /* Read bulk length if unknown */ - if (c->bulklen == -1) { - newline = strchr(c->querybuf+c->qb_pos,'\r'); - if (newline == NULL) { - if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) { - addReplyError(c, - "Protocol error: too big bulk count string"); - setProtocolError("too big bulk count string",c); - return C_ERR; - } - break; - } - - /* Buffer should also contain \n */ - if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2)) - break; - - if (c->querybuf[c->qb_pos] != '$') { - addReplyErrorFormat(c, - "Protocol error: expected '$', got '%c'", - c->querybuf[c->qb_pos]); - setProtocolError("expected $ but got something else",c); - return C_ERR; - } - - ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); - if (!ok || ll < 0 || ll > server.proto_max_bulk_len) { - addReplyError(c,"Protocol error: invalid bulk length"); - setProtocolError("invalid bulk length",c); - return C_ERR; - } - - c->qb_pos = newline-c->querybuf+2; - if (ll >= PROTO_MBULK_BIG_ARG) { - /* If we are going to read a large object from network - * try to make it likely that it will start at c->querybuf - * boundary so that we can optimize object creation - * avoiding a large copy of data. - * - * But only when the data we have not parsed is less than - * or equal to ll+2. If the data length is greater than - * ll+2, trimming querybuf is just a waste of time, because - * at this time the querybuf contains not only our bulk. */ - if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) { - sdsrange(c->querybuf,c->qb_pos,-1); - c->qb_pos = 0; - /* Hint the sds library about the amount of bytes this string is - * going to contain. */ - c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2); - } - } - c->bulklen = ll; - } - - /* Read bulk argument */ - if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) { - /* Not enough data (+2 == trailing \r\n) */ - break; - } else { - /* Optimization: if the buffer contains JUST our bulk element - * instead of creating a new object by *copying* the sds we - * just use the current sds string. */ - if (c->qb_pos == 0 && - c->bulklen >= PROTO_MBULK_BIG_ARG && - sdslen(c->querybuf) == (size_t)(c->bulklen+2)) - { - c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf); - sdsIncrLen(c->querybuf,-2); /* remove CRLF */ - /* Assume that if we saw a fat argument we'll see another one - * likely... */ - c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2); - sdsclear(c->querybuf); - } else { - c->argv[c->argc++] = - createStringObject(c->querybuf+c->qb_pos,c->bulklen); - c->qb_pos += c->bulklen+2; - } - c->bulklen = -1; - c->multibulklen--; - } - } - - /* We're done when c->multibulk == 0 */ - if (c->multibulklen == 0) return C_OK; - - /* Still not ready to process the command */ - return C_ERR; -} - -/* This function is called every time, in the client structure 'c', there is - * more query buffer to process, because we read more data from the socket - * or because a client was blocked and later reactivated, so there could be - * pending query buffer, already representing a full command, to process. */ -void processInputBuffer(client *c) { - server.current_client = c; - - /* Keep processing while there is something in the input buffer */ - while(c->qb_pos < sdslen(c->querybuf)) { - /* Return if clients are paused. */ - if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; - - /* Immediately abort if the client is in the middle of something. */ - if (c->flags & CLIENT_BLOCKED) break; - - /* Don't process input from the master while there is a busy script - * condition on the slave. We want just to accumulate the replication - * stream (instead of replying -BUSY like we do with other clients) and - * later resume the processing. */ - if (server.lua_timedout && c->flags & CLIENT_MASTER) break; - - /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is - * written to the client. Make sure to not let the reply grow after - * this flag has been set (i.e. don't process more commands). - * - * The same applies for clients we want to terminate ASAP. */ - if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; - - /* Determine request type when unknown. */ - if (!c->reqtype) { - if (c->querybuf[c->qb_pos] == '*') { - c->reqtype = PROTO_REQ_MULTIBULK; - } else { - c->reqtype = PROTO_REQ_INLINE; - } - } - - if (c->reqtype == PROTO_REQ_INLINE) { - if (processInlineBuffer(c) != C_OK) break; - } else if (c->reqtype == PROTO_REQ_MULTIBULK) { - if (processMultibulkBuffer(c) != C_OK) break; - } else { - serverPanic("Unknown request type"); - } - - /* Multibulk processing could see a <= 0 length. */ - if (c->argc == 0) { - resetClient(c); - } else { - /* Only reset the client when the command was executed. */ - if (processCommand(c) == C_OK) { - if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { - /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; - } - - /* Don't reset the client structure for clients blocked in a - * module blocking command, so that the reply callback will - * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) - resetClient(c); - } - /* freeMemoryIfNeeded may flush slave output buffers. This may - * result into a slave, that may be the active client, to be - * freed. */ - if (server.current_client == NULL) break; - } - } - - /* Trim to pos */ - if (server.current_client != NULL && c->qb_pos) { - sdsrange(c->querybuf,c->qb_pos,-1); - c->qb_pos = 0; - } - - server.current_client = NULL; -} - -/* This is a wrapper for processInputBuffer that also cares about handling - * the replication forwarding to the sub-slaves, in case the client 'c' - * is flagged as master. Usually you want to call this instead of the - * raw processInputBuffer(). */ -void processInputBufferAndReplicate(client *c) { - if (!(c->flags & CLIENT_MASTER)) { - processInputBuffer(c); - } else { - size_t prev_offset = c->reploff; - processInputBuffer(c); - size_t applied = c->reploff - prev_offset; - if (applied) { - replicationFeedSlavesFromMasterStream(server.slaves, - c->pending_querybuf, applied); - sdsrange(c->pending_querybuf,applied,-1); - } - } -} - -void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { - client *c = (client*) privdata; - int nread, readlen; - size_t qblen; - UNUSED(el); - UNUSED(mask); - - readlen = PROTO_IOBUF_LEN; - /* If this is a multi bulk request, and we are processing a bulk reply - * that is large enough, try to maximize the probability that the query - * buffer contains exactly the SDS string representing the object, even - * at the risk of requiring more read(2) calls. This way the function - * processMultiBulkBuffer() can avoid copying buffers to create the - * Redis Object representing the argument. */ - if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 - && c->bulklen >= PROTO_MBULK_BIG_ARG) - { - ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); - - /* Note that the 'remaining' variable may be zero in some edge case, - * for example once we resume a blocked client after CLIENT PAUSE. */ - if (remaining > 0 && remaining < readlen) readlen = remaining; - } - - qblen = sdslen(c->querybuf); - if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; - c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); - - nread = read(fd, c->querybuf+qblen, readlen); - - if (nread == -1) { - if (errno == EAGAIN) { - return; - } else { - serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); - aeAcquireLock(); - freeClient(c); - aeReleaseLock(); - return; - } - } else if (nread == 0) { - serverLog(LL_VERBOSE, "Client closed connection"); - aeAcquireLock(); - freeClient(c); - aeReleaseLock(); - return; - } else if (c->flags & CLIENT_MASTER) { - /* Append the query buffer to the pending (not applied) buffer - * of the master. We'll use this buffer later in order to have a - * copy of the string applied by the last command executed. */ - c->pending_querybuf = sdscatlen(c->pending_querybuf, - c->querybuf+qblen,nread); - } - - sdsIncrLen(c->querybuf,nread); - c->lastinteraction = server.unixtime; - if (c->flags & CLIENT_MASTER) c->read_reploff += nread; - server.stat_net_input_bytes += nread; - if (sdslen(c->querybuf) > server.client_max_querybuf_len) { - sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); - - bytes = sdscatrepr(bytes,c->querybuf,64); - serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); - sdsfree(ci); - sdsfree(bytes); - aeAcquireLock(); - freeClient(c); - aeReleaseLock(); - return; - } - - /* Time to process the buffer. If the client is a master we need to - * compute the difference between the applied offset before and after - * processing the buffer, to understand how much of the replication stream - * was actually applied to the master state: this quantity, and its - * corresponding part of the replication stream, will be propagated to - * the sub-slaves and to the replication backlog. */ - aeAcquireLock(); - processInputBufferAndReplicate(c); - aeReleaseLock(); -} - -void getClientsMaxBuffers(unsigned long *longest_output_list, - unsigned long *biggest_input_buffer) { - client *c; - listNode *ln; - listIter li; - unsigned long lol = 0, bib = 0; - - listRewind(server.clients,&li); - while ((ln = listNext(&li)) != NULL) { - c = listNodeValue(ln); - - if (listLength(c->reply) > lol) lol = listLength(c->reply); - if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf); - } - *longest_output_list = lol; - *biggest_input_buffer = bib; -} - -/* A Redis "Peer ID" is a colon separated ip:port pair. - * For IPv4 it's in the form x.y.z.k:port, example: "127.0.0.1:1234". - * For IPv6 addresses we use [] around the IP part, like in "[::1]:1234". - * For Unix sockets we use path:0, like in "/tmp/redis:0". - * - * A Peer ID always fits inside a buffer of NET_PEER_ID_LEN bytes, including - * the null term. - * - * On failure the function still populates 'peerid' with the "?:0" string - * in case you want to relax error checking or need to display something - * anyway (see anetPeerToString implementation for more info). */ -void genClientPeerId(client *client, char *peerid, - size_t peerid_len) { - if (client->flags & CLIENT_UNIX_SOCKET) { - /* Unix socket client. */ - snprintf(peerid,peerid_len,"%s:0",server.unixsocket); - } else { - /* TCP client. */ - anetFormatPeer(client->fd,peerid,peerid_len); - } -} - -/* This function returns the client peer id, by creating and caching it - * if client->peerid is NULL, otherwise returning the cached value. - * The Peer ID never changes during the life of the client, however it - * is expensive to compute. */ -char *getClientPeerId(client *c) { - char peerid[NET_PEER_ID_LEN]; - - if (c->peerid == NULL) { - genClientPeerId(c,peerid,sizeof(peerid)); - c->peerid = sdsnew(peerid); - } - return c->peerid; -} - -/* Concatenate a string representing the state of a client in an human - * readable format, into the sds string 's'. */ -sds catClientInfoString(sds s, client *client) { - char flags[16], events[3], *p; - int emask; - - p = flags; - if (client->flags & CLIENT_SLAVE) { - if (client->flags & CLIENT_MONITOR) - *p++ = 'O'; - else - *p++ = 'S'; - } - if (client->flags & CLIENT_MASTER) *p++ = 'M'; - if (client->flags & CLIENT_PUBSUB) *p++ = 'P'; - if (client->flags & CLIENT_MULTI) *p++ = 'x'; - if (client->flags & CLIENT_BLOCKED) *p++ = 'b'; - if (client->flags & CLIENT_DIRTY_CAS) *p++ = 'd'; - if (client->flags & CLIENT_CLOSE_AFTER_REPLY) *p++ = 'c'; - if (client->flags & CLIENT_UNBLOCKED) *p++ = 'u'; - if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; - if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; - if (client->flags & CLIENT_READONLY) *p++ = 'r'; - if (p == flags) *p++ = 'N'; - *p++ = '\0'; - - emask = client->fd == -1 ? 0 : aeGetFileEvents(server.rgel[client->iel],client->fd); - p = events; - if (emask & AE_READABLE) *p++ = 'r'; - if (emask & AE_WRITABLE) *p++ = 'w'; - *p = '\0'; - return sdscatfmt(s, - "id=%U addr=%s fd=%i name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U obl=%U oll=%U omem=%U events=%s cmd=%s", - (unsigned long long) client->id, - getClientPeerId(client), - client->fd, - client->name ? (char*)ptrFromObj(client->name) : "", - (long long)(server.unixtime - client->ctime), - (long long)(server.unixtime - client->lastinteraction), - flags, - client->db->id, - (int) dictSize(client->pubsub_channels), - (int) listLength(client->pubsub_patterns), - (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, - (unsigned long long) sdslen(client->querybuf), - (unsigned long long) sdsavail(client->querybuf), - (unsigned long long) client->bufpos, - (unsigned long long) listLength(client->reply), - (unsigned long long) getClientOutputBufferMemoryUsage(client), - events, - client->lastcmd ? client->lastcmd->name : "NULL"); -} - -sds getAllClientsInfoString(int type) { - listNode *ln; - listIter li; - client *client; - sds o = sdsnewlen(SDS_NOINIT,200*listLength(server.clients)); - sdsclear(o); - listRewind(server.clients,&li); - while ((ln = listNext(&li)) != NULL) { - client = listNodeValue(ln); - if (type != -1 && getClientType(client) != type) continue; - o = catClientInfoString(o,client); - o = sdscatlen(o,"\n",1); - } - return o; -} - -void clientCommand(client *c) { - listNode *ln; - listIter li; - client *client; - - if (c->argc == 2 && !strcasecmp(ptrFromObj(c->argv[1]),"help")) { - const char *help[] = { -"id -- Return the ID of the current connection.", -"getname -- Return the name of the current connection.", -"kill -- Kill connection made from .", -"kill