Thread safety fixes
This commit is contained in:
parent
e9c1d30749
commit
0914b52ffa
27
src/aof.c
27
src/aof.c
@ -96,6 +96,8 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
listNode *ln;
|
listNode *ln;
|
||||||
aofrwblock *block;
|
aofrwblock *block;
|
||||||
ssize_t nwritten;
|
ssize_t nwritten;
|
||||||
|
serverAssert(el == server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el);
|
||||||
|
|
||||||
UNUSED(el);
|
UNUSED(el);
|
||||||
UNUSED(fd);
|
UNUSED(fd);
|
||||||
UNUSED(privdata);
|
UNUSED(privdata);
|
||||||
@ -105,7 +107,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
ln = listFirst(server.aof_rewrite_buf_blocks);
|
ln = listFirst(server.aof_rewrite_buf_blocks);
|
||||||
block = ln ? ln->value : NULL;
|
block = ln ? ln->value : NULL;
|
||||||
if (server.aof_stop_sending_diff || !block) {
|
if (server.aof_stop_sending_diff || !block) {
|
||||||
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child,
|
aeDeleteFileEvent(el,server.aof_pipe_write_data_to_child,
|
||||||
AE_WRITABLE);
|
AE_WRITABLE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -121,6 +123,15 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void QueueAofPipeWrite(void *arg)
|
||||||
|
{
|
||||||
|
UNUSED(arg);
|
||||||
|
if (aeGetFileEvents(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child) == 0) {
|
||||||
|
aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, server.aof_pipe_write_data_to_child,
|
||||||
|
AE_WRITABLE, aofChildWriteDiffData, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
|
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
|
||||||
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
||||||
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
|
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
|
||||||
@ -162,10 +173,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
|||||||
|
|
||||||
/* Install a file event to send data to the rewrite child if there is
|
/* Install a file event to send data to the rewrite child if there is
|
||||||
* not one already. */
|
* not one already. */
|
||||||
if (aeGetFileEvents(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child) == 0) {
|
aePostFunction(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, QueueAofPipeWrite, NULL);
|
||||||
aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.aof_pipe_write_data_to_child,
|
|
||||||
AE_WRITABLE, aofChildWriteDiffData, NULL);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Write the buffer (possibly composed of multiple blocks) into the specified
|
/* Write the buffer (possibly composed of multiple blocks) into the specified
|
||||||
@ -631,6 +639,7 @@ struct client *createFakeClient(void) {
|
|||||||
|
|
||||||
selectDb(c,0);
|
selectDb(c,0);
|
||||||
c->fd = -1;
|
c->fd = -1;
|
||||||
|
c->iel = IDX_EVENT_LOOP_MAIN;
|
||||||
c->name = NULL;
|
c->name = NULL;
|
||||||
c->querybuf = sdsempty();
|
c->querybuf = sdsempty();
|
||||||
c->querybuf_peak = 0;
|
c->querybuf_peak = 0;
|
||||||
@ -1470,7 +1479,7 @@ void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
/* Remove the handler since this can be called only one time during a
|
/* Remove the handler since this can be called only one time during a
|
||||||
* rewrite. */
|
* rewrite. */
|
||||||
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_read_ack_from_child,AE_READABLE);
|
aeDeleteFileEventAsync(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_read_ack_from_child,AE_READABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Create the pipes used for parent - child process IPC during rewrite.
|
/* Create the pipes used for parent - child process IPC during rewrite.
|
||||||
@ -1488,7 +1497,7 @@ int aofCreatePipes(void) {
|
|||||||
/* Parent -> children data is non blocking. */
|
/* Parent -> children data is non blocking. */
|
||||||
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
|
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
|
||||||
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
|
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
|
||||||
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
|
if (aeCreateRemoteFileEventSync(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
|
||||||
|
|
||||||
server.aof_pipe_write_data_to_child = fds[1];
|
server.aof_pipe_write_data_to_child = fds[1];
|
||||||
server.aof_pipe_read_data_from_parent = fds[0];
|
server.aof_pipe_read_data_from_parent = fds[0];
|
||||||
@ -1507,8 +1516,8 @@ error:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void aofClosePipes(void) {
|
void aofClosePipes(void) {
|
||||||
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_read_ack_from_child,AE_READABLE);
|
aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_read_ack_from_child,AE_READABLE);
|
||||||
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child,AE_WRITABLE);
|
aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
|
||||||
close(server.aof_pipe_write_data_to_child);
|
close(server.aof_pipe_write_data_to_child);
|
||||||
close(server.aof_pipe_read_data_from_parent);
|
close(server.aof_pipe_read_data_from_parent);
|
||||||
close(server.aof_pipe_write_ack_to_parent);
|
close(server.aof_pipe_write_ack_to_parent);
|
||||||
|
@ -112,12 +112,14 @@ void blockClient(client *c, int btype) {
|
|||||||
void processUnblockedClients(int iel) {
|
void processUnblockedClients(int iel) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
client *c;
|
client *c;
|
||||||
|
list *unblocked_clients = server.rgthreadvar[iel].unblocked_clients;
|
||||||
|
serverAssert(iel == (serverTL - server.rgthreadvar));
|
||||||
|
|
||||||
while (listLength(server.rgunblocked_clients[iel])) {
|
while (listLength(unblocked_clients)) {
|
||||||
ln = listFirst(server.rgunblocked_clients[iel]);
|
ln = listFirst(unblocked_clients);
|
||||||
serverAssert(ln != NULL);
|
serverAssert(ln != NULL);
|
||||||
c = ln->value;
|
c = ln->value;
|
||||||
listDelNode(server.rgunblocked_clients[iel],ln);
|
listDelNode(unblocked_clients,ln);
|
||||||
c->flags &= ~CLIENT_UNBLOCKED;
|
c->flags &= ~CLIENT_UNBLOCKED;
|
||||||
|
|
||||||
/* Process remaining data in the input buffer, unless the client
|
/* Process remaining data in the input buffer, unless the client
|
||||||
@ -125,9 +127,11 @@ void processUnblockedClients(int iel) {
|
|||||||
* client is not blocked before to proceed, but things may change and
|
* client is not blocked before to proceed, but things may change and
|
||||||
* the code is conceptually more correct this way. */
|
* the code is conceptually more correct this way. */
|
||||||
if (!(c->flags & CLIENT_BLOCKED)) {
|
if (!(c->flags & CLIENT_BLOCKED)) {
|
||||||
|
aeAcquireLock();
|
||||||
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
||||||
processInputBufferAndReplicate(c);
|
processInputBufferAndReplicate(c);
|
||||||
}
|
}
|
||||||
|
aeReleaseLock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -151,9 +155,10 @@ void processUnblockedClients(int iel) {
|
|||||||
void queueClientForReprocessing(client *c) {
|
void queueClientForReprocessing(client *c) {
|
||||||
/* The client may already be into the unblocked list because of a previous
|
/* The client may already be into the unblocked list because of a previous
|
||||||
* blocking operation, don't add back it into the list multiple times. */
|
* blocking operation, don't add back it into the list multiple times. */
|
||||||
|
AssertCorrectThread(c);
|
||||||
if (!(c->flags & CLIENT_UNBLOCKED)) {
|
if (!(c->flags & CLIENT_UNBLOCKED)) {
|
||||||
c->flags |= CLIENT_UNBLOCKED;
|
c->flags |= CLIENT_UNBLOCKED;
|
||||||
listAddNodeTail(server.rgunblocked_clients[c->iel],c);
|
listAddNodeTail(server.rgthreadvar[c->iel].unblocked_clients,c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,7 +218,7 @@ void disconnectAllBlockedClients(void) {
|
|||||||
client *c = listNodeValue(ln);
|
client *c = listNodeValue(ln);
|
||||||
|
|
||||||
if (c->flags & CLIENT_BLOCKED) {
|
if (c->flags & CLIENT_BLOCKED) {
|
||||||
addReplySds(c,sdsnew(
|
addReplySdsAsync(c,sdsnew(
|
||||||
"-UNBLOCKED force unblock from blocking operation, "
|
"-UNBLOCKED force unblock from blocking operation, "
|
||||||
"instance state changed (master -> replica?)\r\n"));
|
"instance state changed (master -> replica?)\r\n"));
|
||||||
unblockClient(c);
|
unblockClient(c);
|
||||||
@ -407,7 +412,7 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
/* If the group was not found, send an error
|
/* If the group was not found, send an error
|
||||||
* to the consumer. */
|
* to the consumer. */
|
||||||
if (!group) {
|
if (!group) {
|
||||||
addReplyError(receiver,
|
addReplyErrorAsync(receiver,
|
||||||
"-NOGROUP the consumer group this client "
|
"-NOGROUP the consumer group this client "
|
||||||
"was blocked on no longer exists");
|
"was blocked on no longer exists");
|
||||||
unblockClient(receiver);
|
unblockClient(receiver);
|
||||||
@ -437,12 +442,12 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
* extracted from it. Wrapped in a single-item
|
* extracted from it. Wrapped in a single-item
|
||||||
* array, since we have just one key. */
|
* array, since we have just one key. */
|
||||||
if (receiver->resp == 2) {
|
if (receiver->resp == 2) {
|
||||||
addReplyArrayLen(receiver,1);
|
addReplyArrayLenAsync(receiver,1);
|
||||||
addReplyArrayLen(receiver,2);
|
addReplyArrayLenAsync(receiver,2);
|
||||||
} else {
|
} else {
|
||||||
addReplyMapLen(receiver,1);
|
addReplyMapLenAsync(receiver,1);
|
||||||
}
|
}
|
||||||
addReplyBulk(receiver,rl->key);
|
addReplyBulkAsync(receiver,rl->key);
|
||||||
|
|
||||||
streamPropInfo pi = {
|
streamPropInfo pi = {
|
||||||
rl->key,
|
rl->key,
|
||||||
|
@ -493,7 +493,7 @@ void clusterInit(void) {
|
|||||||
int j;
|
int j;
|
||||||
|
|
||||||
for (j = 0; j < server.cfd_count; j++) {
|
for (j = 0; j < server.cfd_count; j++) {
|
||||||
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.cfd[j], AE_READABLE,
|
if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, server.cfd[j], AE_READABLE,
|
||||||
clusterAcceptHandler, NULL) == AE_ERR)
|
clusterAcceptHandler, NULL) == AE_ERR)
|
||||||
serverPanic("Unrecoverable error creating Redis Cluster "
|
serverPanic("Unrecoverable error creating Redis Cluster "
|
||||||
"file event.");
|
"file event.");
|
||||||
@ -601,7 +601,7 @@ clusterLink *createClusterLink(clusterNode *node) {
|
|||||||
* with this link will have the 'link' field set to NULL. */
|
* with this link will have the 'link' field set to NULL. */
|
||||||
void freeClusterLink(clusterLink *link) {
|
void freeClusterLink(clusterLink *link) {
|
||||||
if (link->fd != -1) {
|
if (link->fd != -1) {
|
||||||
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], link->fd, AE_READABLE|AE_WRITABLE);
|
aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_READABLE|AE_WRITABLE);
|
||||||
}
|
}
|
||||||
sdsfree(link->sndbuf);
|
sdsfree(link->sndbuf);
|
||||||
sdsfree(link->rcvbuf);
|
sdsfree(link->rcvbuf);
|
||||||
@ -645,7 +645,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* node identity. */
|
* node identity. */
|
||||||
link = createClusterLink(NULL);
|
link = createClusterLink(NULL);
|
||||||
link->fd = cfd;
|
link->fd = cfd;
|
||||||
aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],cfd,AE_READABLE,clusterReadHandler,link);
|
aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,cfd,AE_READABLE,clusterReadHandler,link);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2132,7 +2132,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
}
|
}
|
||||||
sdsrange(link->sndbuf,nwritten,-1);
|
sdsrange(link->sndbuf,nwritten,-1);
|
||||||
if (sdslen(link->sndbuf) == 0)
|
if (sdslen(link->sndbuf) == 0)
|
||||||
aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], link->fd, AE_WRITABLE);
|
aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, link->fd, AE_WRITABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Read data. Try to read the first field of the header first to check the
|
/* Read data. Try to read the first field of the header first to check the
|
||||||
@ -2208,7 +2208,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
* from event handlers that will do stuff with the same link later. */
|
* from event handlers that will do stuff with the same link later. */
|
||||||
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
|
||||||
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
if (sdslen(link->sndbuf) == 0 && msglen != 0)
|
||||||
aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],link->fd,AE_WRITABLE|AE_BARRIER,
|
aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_WRITABLE|AE_BARRIER,
|
||||||
clusterWriteHandler,link);
|
clusterWriteHandler,link);
|
||||||
|
|
||||||
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
|
||||||
@ -3402,7 +3402,7 @@ void clusterCron(void) {
|
|||||||
link = createClusterLink(node);
|
link = createClusterLink(node);
|
||||||
link->fd = fd;
|
link->fd = fd;
|
||||||
node->link = link;
|
node->link = link;
|
||||||
aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],link->fd,AE_READABLE,
|
aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->fd,AE_READABLE,
|
||||||
clusterReadHandler,link);
|
clusterReadHandler,link);
|
||||||
/* Queue a PING in the new connection ASAP: this is crucial
|
/* Queue a PING in the new connection ASAP: this is crucial
|
||||||
* to avoid false positives in failure detection.
|
* to avoid false positives in failure detection.
|
||||||
|
@ -968,12 +968,12 @@ void configSetCommand(client *c) {
|
|||||||
server.maxclients = orig_value;
|
server.maxclients = orig_value;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if ((unsigned int) aeGetSetSize(server.rgel[IDX_EVENT_LOOP_MAIN]) <
|
if ((unsigned int) aeGetSetSize(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el) <
|
||||||
server.maxclients + CONFIG_FDSET_INCR)
|
server.maxclients + CONFIG_FDSET_INCR)
|
||||||
{
|
{
|
||||||
for (int iel = 0; iel < server.cel; ++iel)
|
for (int iel = 0; iel < server.cthreads; ++iel)
|
||||||
{
|
{
|
||||||
if (aeResizeSetSize(server.rgel[iel],
|
if (aeResizeSetSize(server.rgthreadvar[iel].el,
|
||||||
server.maxclients + CONFIG_FDSET_INCR) == AE_ERR)
|
server.maxclients + CONFIG_FDSET_INCR) == AE_ERR)
|
||||||
{
|
{
|
||||||
addReplyError(c,"The event loop API used by Redis is not able to handle the specified number of clients");
|
addReplyError(c,"The event loop API used by Redis is not able to handle the specified number of clients");
|
||||||
|
@ -51,3 +51,9 @@ extern "C" void fastlock_free(struct fastlock *lock)
|
|||||||
// NOP
|
// NOP
|
||||||
(void)lock;
|
(void)lock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool fastlock::fOwnLock()
|
||||||
|
{
|
||||||
|
return gettid() == m_pidOwner;
|
||||||
|
}
|
@ -37,5 +37,7 @@ struct fastlock
|
|||||||
{
|
{
|
||||||
fastlock_unlock(this);
|
fastlock_unlock(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
14
src/module.c
14
src/module.c
@ -3633,9 +3633,15 @@ void moduleHandleBlockedClients(void) {
|
|||||||
ln = listFirst(moduleUnblockedClients);
|
ln = listFirst(moduleUnblockedClients);
|
||||||
bc = ln->value;
|
bc = ln->value;
|
||||||
client *c = bc->client;
|
client *c = bc->client;
|
||||||
|
serverAssert(c->iel == IDX_EVENT_LOOP_MAIN);
|
||||||
listDelNode(moduleUnblockedClients,ln);
|
listDelNode(moduleUnblockedClients,ln);
|
||||||
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
||||||
|
|
||||||
|
if (c)
|
||||||
|
{
|
||||||
|
AssertCorrectThread(c);
|
||||||
|
}
|
||||||
|
|
||||||
/* Release the lock during the loop, as long as we don't
|
/* Release the lock during the loop, as long as we don't
|
||||||
* touch the shared list. */
|
* touch the shared list. */
|
||||||
|
|
||||||
@ -3688,11 +3694,11 @@ void moduleHandleBlockedClients(void) {
|
|||||||
/* Put the client in the list of clients that need to write
|
/* Put the client in the list of clients that need to write
|
||||||
* if there are pending replies here. This is needed since
|
* if there are pending replies here. This is needed since
|
||||||
* during a non blocking command the client may receive output. */
|
* during a non blocking command the client may receive output. */
|
||||||
if (clientHasPendingReplies(c) &&
|
if (clientHasPendingReplies(c, TRUE) &&
|
||||||
!(c->flags & CLIENT_PENDING_WRITE))
|
!(c->flags & CLIENT_PENDING_WRITE))
|
||||||
{
|
{
|
||||||
c->flags |= CLIENT_PENDING_WRITE;
|
c->flags |= CLIENT_PENDING_WRITE;
|
||||||
listAddNodeHead(server.rgclients_pending_write[IDX_EVENT_LOOP_MAIN],c);
|
listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4300,7 +4306,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
|
|||||||
if (memcmp(ri.key,&key,sizeof(key)) == 0) {
|
if (memcmp(ri.key,&key,sizeof(key)) == 0) {
|
||||||
/* This is the first key, we need to re-install the timer according
|
/* This is the first key, we need to re-install the timer according
|
||||||
* to the just added event. */
|
* to the just added event. */
|
||||||
aeDeleteTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN],aeTimer);
|
aeDeleteTimeEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
|
||||||
aeTimer = -1;
|
aeTimer = -1;
|
||||||
}
|
}
|
||||||
raxStop(&ri);
|
raxStop(&ri);
|
||||||
@ -4309,7 +4315,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
|
|||||||
/* If we have no main timer (the old one was invalidated, or this is the
|
/* If we have no main timer (the old one was invalidated, or this is the
|
||||||
* first module timer we have), install one. */
|
* first module timer we have), install one. */
|
||||||
if (aeTimer == -1)
|
if (aeTimer == -1)
|
||||||
aeTimer = aeCreateTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN],period,moduleTimerHandler,NULL,NULL);
|
aeTimer = aeCreateTimeEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL);
|
||||||
|
|
||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,38 @@
|
|||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
|
|
||||||
static void setProtocolError(const char *errstr, client *c);
|
static void setProtocolError(const char *errstr, client *c);
|
||||||
|
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync);
|
||||||
|
void addReplyBulkCStringCore(client *c, const char *s, bool fAsync);
|
||||||
|
|
||||||
|
class AeLocker
|
||||||
|
{
|
||||||
|
bool m_fArmed = false;
|
||||||
|
|
||||||
|
public:
|
||||||
|
AeLocker(bool fArm = false)
|
||||||
|
{
|
||||||
|
if (fArm)
|
||||||
|
arm();
|
||||||
|
}
|
||||||
|
|
||||||
|
void arm()
|
||||||
|
{
|
||||||
|
m_fArmed = true;
|
||||||
|
aeAcquireLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
void disarm()
|
||||||
|
{
|
||||||
|
m_fArmed = false;
|
||||||
|
aeReleaseLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
~AeLocker()
|
||||||
|
{
|
||||||
|
if (m_fArmed)
|
||||||
|
aeReleaseLock();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/* Return the size consumed from the allocator, for the specified SDS string,
|
/* Return the size consumed from the allocator, for the specified SDS string,
|
||||||
* including internal fragmentation. This function is used in order to compute
|
* including internal fragmentation. This function is used in order to compute
|
||||||
@ -94,7 +126,7 @@ client *createClient(int fd, int iel) {
|
|||||||
anetEnableTcpNoDelay(NULL,fd);
|
anetEnableTcpNoDelay(NULL,fd);
|
||||||
if (server.tcpkeepalive)
|
if (server.tcpkeepalive)
|
||||||
anetKeepAlive(NULL,fd,server.tcpkeepalive);
|
anetKeepAlive(NULL,fd,server.tcpkeepalive);
|
||||||
if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_READ_THREADSAFE,
|
if (aeCreateFileEvent(server.rgthreadvar[iel].el,fd,AE_READABLE|AE_READ_THREADSAFE,
|
||||||
readQueryFromClient, c) == AE_ERR)
|
readQueryFromClient, c) == AE_ERR)
|
||||||
{
|
{
|
||||||
close(fd);
|
close(fd);
|
||||||
@ -124,6 +156,7 @@ client *createClient(int fd, int iel) {
|
|||||||
c->multibulklen = 0;
|
c->multibulklen = 0;
|
||||||
c->bulklen = -1;
|
c->bulklen = -1;
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
|
c->sentlenAsync = 0;
|
||||||
c->flags = 0;
|
c->flags = 0;
|
||||||
c->ctime = c->lastinteraction = server.unixtime;
|
c->ctime = c->lastinteraction = server.unixtime;
|
||||||
/* If the default user does not require authentication, the user is
|
/* If the default user does not require authentication, the user is
|
||||||
@ -158,6 +191,13 @@ client *createClient(int fd, int iel) {
|
|||||||
c->pubsub_patterns = listCreate();
|
c->pubsub_patterns = listCreate();
|
||||||
c->peerid = NULL;
|
c->peerid = NULL;
|
||||||
c->client_list_node = NULL;
|
c->client_list_node = NULL;
|
||||||
|
c->bufAsync = NULL;
|
||||||
|
c->buflenAsync = 0;
|
||||||
|
c->bufposAsync = 0;
|
||||||
|
c->listbufferDoneAsync = listCreate();
|
||||||
|
listSetFreeMethod(c->listbufferDoneAsync,freeClientReplyValue);
|
||||||
|
listSetDupMethod(c->listbufferDoneAsync,dupClientReplyValue);
|
||||||
|
|
||||||
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
||||||
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
|
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
|
||||||
if (fd != -1) linkClient(c);
|
if (fd != -1) linkClient(c);
|
||||||
@ -180,6 +220,7 @@ void clientInstallWriteHandler(client *c) {
|
|||||||
(c->replstate == REPL_STATE_NONE ||
|
(c->replstate == REPL_STATE_NONE ||
|
||||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||||
{
|
{
|
||||||
|
AssertCorrectThread(c);
|
||||||
/* Here instead of installing the write handler, we just flag the
|
/* Here instead of installing the write handler, we just flag the
|
||||||
* client and put it into a list of clients that have something
|
* client and put it into a list of clients that have something
|
||||||
* to write to the socket. This way before re-entering the event
|
* to write to the socket. This way before re-entering the event
|
||||||
@ -187,10 +228,20 @@ void clientInstallWriteHandler(client *c) {
|
|||||||
* a system call. We'll only really install the write handler if
|
* a system call. We'll only really install the write handler if
|
||||||
* we'll not be able to write the whole reply at once. */
|
* we'll not be able to write the whole reply at once. */
|
||||||
c->flags |= CLIENT_PENDING_WRITE;
|
c->flags |= CLIENT_PENDING_WRITE;
|
||||||
listAddNodeHead(server.rgclients_pending_write[c->iel],c);
|
listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void clientInstallAsyncWriteHandler(client *c) {
|
||||||
|
UNUSED(c);
|
||||||
|
#if 0 // Not yet
|
||||||
|
if (!(c->flags & CLIENT_PENDING_ASYNCWRITE)) {
|
||||||
|
c->flags |= CLIENT_PENDING_ASYNCWRITE;
|
||||||
|
listAddNodeHead(serverTL->clients_pending_asyncwrite,c);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
/* This function is called every time we are going to transmit new data
|
/* This function is called every time we are going to transmit new data
|
||||||
* to the client. The behavior is the following:
|
* to the client. The behavior is the following:
|
||||||
*
|
*
|
||||||
@ -213,7 +264,9 @@ void clientInstallWriteHandler(client *c) {
|
|||||||
* Typically gets called every time a reply is built, before adding more
|
* Typically gets called every time a reply is built, before adding more
|
||||||
* data to the clients output buffers. If the function returns C_ERR no
|
* data to the clients output buffers. If the function returns C_ERR no
|
||||||
* data should be appended to the output buffers. */
|
* data should be appended to the output buffers. */
|
||||||
int prepareClientToWrite(client *c) {
|
int prepareClientToWrite(client *c, bool fAsync) {
|
||||||
|
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||||
|
|
||||||
/* If it's the Lua client we always return ok without installing any
|
/* If it's the Lua client we always return ok without installing any
|
||||||
* handler since there is no socket at all. */
|
* handler since there is no socket at all. */
|
||||||
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
|
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
|
||||||
@ -230,7 +283,8 @@ int prepareClientToWrite(client *c) {
|
|||||||
|
|
||||||
/* Schedule the client to write the output buffers to the socket, unless
|
/* Schedule the client to write the output buffers to the socket, unless
|
||||||
* it should already be setup to do so (it has already pending data). */
|
* it should already be setup to do so (it has already pending data). */
|
||||||
if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
if (!fAsync && !clientHasPendingReplies(c, FALSE)) clientInstallWriteHandler(c);
|
||||||
|
if (fAsync && !(c->flags & CLIENT_PENDING_ASYNCWRITE)) clientInstallAsyncWriteHandler(c);
|
||||||
|
|
||||||
/* Authorize the caller to queue in the output buffer of this client. */
|
/* Authorize the caller to queue in the output buffer of this client. */
|
||||||
return C_OK;
|
return C_OK;
|
||||||
@ -240,25 +294,42 @@ int prepareClientToWrite(client *c) {
|
|||||||
* Low level functions to add more data to output buffers.
|
* Low level functions to add more data to output buffers.
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) {
|
||||||
size_t available = sizeof(c->buf)-c->bufpos;
|
|
||||||
|
|
||||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
|
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
|
||||||
|
|
||||||
/* If there already are entries in the reply list, we cannot
|
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||||
* add anything more to the static buffer. */
|
if (fAsync)
|
||||||
if (listLength(c->reply) > 0) return C_ERR;
|
{
|
||||||
|
serverAssert(aeThreadOwnsLock());
|
||||||
|
if ((c->buflenAsync - c->bufposAsync) < (int)len)
|
||||||
|
{
|
||||||
|
int minsize = len + c->bufposAsync;
|
||||||
|
c->buflenAsync = std::max(minsize, c->buflenAsync*2);
|
||||||
|
c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL);
|
||||||
|
}
|
||||||
|
memcpy(c->bufAsync+c->bufposAsync,s,len);
|
||||||
|
c->bufposAsync += len;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
size_t available = sizeof(c->buf)-c->bufpos;
|
||||||
|
|
||||||
/* Check that the buffer has enough space available for this string. */
|
/* If there already are entries in the reply list, we cannot
|
||||||
if (len > available) return C_ERR;
|
* add anything more to the static buffer. */
|
||||||
|
if (listLength(c->reply) > 0) return C_ERR;
|
||||||
|
|
||||||
memcpy(c->buf+c->bufpos,s,len);
|
/* Check that the buffer has enough space available for this string. */
|
||||||
c->bufpos+=len;
|
if (len > available) return C_ERR;
|
||||||
|
|
||||||
|
memcpy(c->buf+c->bufpos,s,len);
|
||||||
|
c->bufpos+=len;
|
||||||
|
}
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
||||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
||||||
|
AssertCorrectThread(c);
|
||||||
|
|
||||||
listNode *ln = listLast(c->reply);
|
listNode *ln = listLast(c->reply);
|
||||||
clientReplyBlock *tail = (clientReplyBlock*) (ln? listNodeValue(ln): NULL);
|
clientReplyBlock *tail = (clientReplyBlock*) (ln? listNodeValue(ln): NULL);
|
||||||
@ -297,13 +368,11 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
|||||||
* Higher level functions to queue data on the client output buffer.
|
* Higher level functions to queue data on the client output buffer.
|
||||||
* The following functions are the ones that commands implementations will call.
|
* The following functions are the ones that commands implementations will call.
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
void addReplyCore(client *c, robj *obj, bool fAsync) {
|
||||||
/* Add the object 'obj' string representation to the client output buffer. */
|
if (prepareClientToWrite(c, fAsync) != C_OK) return;
|
||||||
void addReply(client *c, robj *obj) {
|
|
||||||
if (prepareClientToWrite(c) != C_OK) return;
|
|
||||||
|
|
||||||
if (sdsEncodedObject(obj)) {
|
if (sdsEncodedObject(obj)) {
|
||||||
if (_addReplyToBuffer(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj))) != C_OK)
|
if (_addReplyToBuffer(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj)),fAsync) != C_OK)
|
||||||
_addReplyProtoToList(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj)));
|
_addReplyProtoToList(c,(const char*)ptrFromObj(obj),sdslen((sds)ptrFromObj(obj)));
|
||||||
} else if (obj->encoding == OBJ_ENCODING_INT) {
|
} else if (obj->encoding == OBJ_ENCODING_INT) {
|
||||||
/* For integer encoded strings we just convert it into a string
|
/* For integer encoded strings we just convert it into a string
|
||||||
@ -311,26 +380,44 @@ void addReply(client *c, robj *obj) {
|
|||||||
* to the output buffer. */
|
* to the output buffer. */
|
||||||
char buf[32];
|
char buf[32];
|
||||||
size_t len = ll2string(buf,sizeof(buf),(long)ptrFromObj(obj));
|
size_t len = ll2string(buf,sizeof(buf),(long)ptrFromObj(obj));
|
||||||
if (_addReplyToBuffer(c,buf,len) != C_OK)
|
if (_addReplyToBuffer(c,buf,len,fAsync) != C_OK)
|
||||||
_addReplyProtoToList(c,buf,len);
|
_addReplyProtoToList(c,buf,len);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Wrong obj->encoding in addReply()");
|
serverPanic("Wrong obj->encoding in addReply()");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Add the object 'obj' string representation to the client output buffer. */
|
||||||
|
void addReply(client *c, robj *obj)
|
||||||
|
{
|
||||||
|
addReplyCore(c, obj, false);
|
||||||
|
}
|
||||||
|
void addReplyAsync(client *c, robj *obj)
|
||||||
|
{
|
||||||
|
addReplyCore(c, obj, true);
|
||||||
|
}
|
||||||
|
|
||||||
/* Add the SDS 's' string to the client output buffer, as a side effect
|
/* Add the SDS 's' string to the client output buffer, as a side effect
|
||||||
* the SDS string is freed. */
|
* the SDS string is freed. */
|
||||||
void addReplySds(client *c, sds s) {
|
void addReplySdsCore(client *c, sds s, bool fAsync) {
|
||||||
if (prepareClientToWrite(c) != C_OK) {
|
if (prepareClientToWrite(c, fAsync) != C_OK) {
|
||||||
/* The caller expects the sds to be free'd. */
|
/* The caller expects the sds to be free'd. */
|
||||||
sdsfree(s);
|
sdsfree(s);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
|
if (_addReplyToBuffer(c,s,sdslen(s), fAsync) != C_OK)
|
||||||
_addReplyProtoToList(c,s,sdslen(s));
|
_addReplyProtoToList(c,s,sdslen(s));
|
||||||
sdsfree(s);
|
sdsfree(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplySds(client *c, sds s) {
|
||||||
|
addReplySdsCore(c, s, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplySdsAsync(client *c, sds s) {
|
||||||
|
addReplySdsCore(c, s, true);
|
||||||
|
}
|
||||||
|
|
||||||
/* This low level function just adds whatever protocol you send it to the
|
/* This low level function just adds whatever protocol you send it to the
|
||||||
* client buffer, trying the static buffer initially, and using the string
|
* client buffer, trying the static buffer initially, and using the string
|
||||||
* of objects if not possible.
|
* of objects if not possible.
|
||||||
@ -339,12 +426,16 @@ void addReplySds(client *c, sds s) {
|
|||||||
* if not needed. The object will only be created by calling
|
* if not needed. The object will only be created by calling
|
||||||
* _addReplyProtoToList() if we fail to extend the existing tail object
|
* _addReplyProtoToList() if we fail to extend the existing tail object
|
||||||
* in the list of objects. */
|
* in the list of objects. */
|
||||||
void addReplyProto(client *c, const char *s, size_t len) {
|
void addReplyProtoCore(client *c, const char *s, size_t len, bool fAsync) {
|
||||||
if (prepareClientToWrite(c) != C_OK) return;
|
if (prepareClientToWrite(c, fAsync) != C_OK) return;
|
||||||
if (_addReplyToBuffer(c,s,len) != C_OK)
|
if (_addReplyToBuffer(c,s,len,fAsync) != C_OK)
|
||||||
_addReplyProtoToList(c,s,len);
|
_addReplyProtoToList(c,s,len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplyProto(client *c, const char *s, size_t len) {
|
||||||
|
addReplyProtoCore(c, s, len, false);
|
||||||
|
}
|
||||||
|
|
||||||
/* Low level function called by the addReplyError...() functions.
|
/* Low level function called by the addReplyError...() functions.
|
||||||
* It emits the protocol for a Redis error, in the form:
|
* It emits the protocol for a Redis error, in the form:
|
||||||
*
|
*
|
||||||
@ -353,12 +444,12 @@ void addReplyProto(client *c, const char *s, size_t len) {
|
|||||||
* If the error code is already passed in the string 's', the error
|
* If the error code is already passed in the string 's', the error
|
||||||
* code provided is used, otherwise the string "-ERR " for the generic
|
* code provided is used, otherwise the string "-ERR " for the generic
|
||||||
* error code is automatically added. */
|
* error code is automatically added. */
|
||||||
void addReplyErrorLength(client *c, const char *s, size_t len) {
|
void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync) {
|
||||||
/* If the string already starts with "-..." then the error code
|
/* If the string already starts with "-..." then the error code
|
||||||
* is provided by the caller. Otherwise we use "-ERR". */
|
* is provided by the caller. Otherwise we use "-ERR". */
|
||||||
if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5);
|
if (!len || s[0] != '-') addReplyProtoCore(c,"-ERR ",5,fAsync);
|
||||||
addReplyProto(c,s,len);
|
addReplyProtoCore(c,s,len,fAsync);
|
||||||
addReplyProto(c,"\r\n",2);
|
addReplyProtoCore(c,"\r\n",2,fAsync);
|
||||||
|
|
||||||
/* Sometimes it could be normal that a slave replies to a master with
|
/* Sometimes it could be normal that a slave replies to a master with
|
||||||
* an error and this function gets called. Actually the error will never
|
* an error and this function gets called. Actually the error will never
|
||||||
@ -380,8 +471,17 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplyErrorLength(client *c, const char *s, size_t len)
|
||||||
|
{
|
||||||
|
addReplyErrorLengthCore(c, s, len, false);
|
||||||
|
}
|
||||||
|
|
||||||
void addReplyError(client *c, const char *err) {
|
void addReplyError(client *c, const char *err) {
|
||||||
addReplyErrorLength(c,err,strlen(err));
|
addReplyErrorLengthCore(c,err,strlen(err), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyErrorAsync(client *c, const char *err) {
|
||||||
|
addReplyErrorLengthCore(c, err, strlen(err), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyErrorFormat(client *c, const char *fmt, ...) {
|
void addReplyErrorFormat(client *c, const char *fmt, ...) {
|
||||||
@ -425,11 +525,18 @@ void *addReplyDeferredLen(client *c) {
|
|||||||
/* Note that we install the write event here even if the object is not
|
/* Note that we install the write event here even if the object is not
|
||||||
* ready to be sent, since we are sure that before returning to the
|
* ready to be sent, since we are sure that before returning to the
|
||||||
* event loop setDeferredAggregateLen() will be called. */
|
* event loop setDeferredAggregateLen() will be called. */
|
||||||
if (prepareClientToWrite(c) != C_OK) return NULL;
|
if (prepareClientToWrite(c, false) != C_OK) return NULL;
|
||||||
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
|
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
|
||||||
return listLast(c->reply);
|
return listLast(c->reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *addReplyDeferredLenAsync(client *c) {
|
||||||
|
if (FCorrectThread(c))
|
||||||
|
return addReplyDeferredLen(c);
|
||||||
|
|
||||||
|
return (void*)((ssize_t)c->bufposAsync);
|
||||||
|
}
|
||||||
|
|
||||||
/* Populate the length object and try gluing it to the next chunk. */
|
/* Populate the length object and try gluing it to the next chunk. */
|
||||||
void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
|
void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
|
||||||
listNode *ln = (listNode*)node;
|
listNode *ln = (listNode*)node;
|
||||||
@ -472,10 +579,37 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
|
|||||||
asyncCloseClientOnOutputBufferLimitReached(c);
|
asyncCloseClientOnOutputBufferLimitReached(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setDeferredAggregateLenAsync(client *c, void *node, long length, char prefix)
|
||||||
|
{
|
||||||
|
if (FCorrectThread(c)) {
|
||||||
|
setDeferredAggregateLen(c, node, length, prefix);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
char lenstr[128];
|
||||||
|
int lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
|
||||||
|
|
||||||
|
ssize_t idxSplice = (ssize_t)node;
|
||||||
|
serverAssert(idxSplice <= c->bufposAsync);
|
||||||
|
if (c->buflenAsync < (c->bufposAsync + lenstr_len))
|
||||||
|
{
|
||||||
|
c->buflenAsync = std::max((int)(c->bufposAsync+lenstr_len), c->buflenAsync*2);
|
||||||
|
c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
memmove(c->bufAsync + idxSplice + lenstr_len, c->bufAsync + idxSplice, c->bufposAsync - idxSplice);
|
||||||
|
memcpy(c->bufAsync + idxSplice, lenstr, lenstr_len);
|
||||||
|
c->bufposAsync += lenstr_len;
|
||||||
|
}
|
||||||
|
|
||||||
void setDeferredArrayLen(client *c, void *node, long length) {
|
void setDeferredArrayLen(client *c, void *node, long length) {
|
||||||
setDeferredAggregateLen(c,node,length,'*');
|
setDeferredAggregateLen(c,node,length,'*');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setDeferredArrayLenAsync(client *c, void *node, long length) {
|
||||||
|
setDeferredAggregateLenAsync(c, node, length, '*');
|
||||||
|
}
|
||||||
|
|
||||||
void setDeferredMapLen(client *c, void *node, long length) {
|
void setDeferredMapLen(client *c, void *node, long length) {
|
||||||
int prefix = c->resp == 2 ? '*' : '%';
|
int prefix = c->resp == 2 ? '*' : '%';
|
||||||
if (c->resp == 2) length *= 2;
|
if (c->resp == 2) length *= 2;
|
||||||
@ -499,15 +633,15 @@ void setDeferredPushLen(client *c, void *node, long length) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Add a double as a bulk reply */
|
/* Add a double as a bulk reply */
|
||||||
void addReplyDouble(client *c, double d) {
|
void addReplyDoubleCore(client *c, double d, bool fAsync) {
|
||||||
if (isinf(d)) {
|
if (isinf(d)) {
|
||||||
/* Libc in odd systems (Hi Solaris!) will format infinite in a
|
/* Libc in odd systems (Hi Solaris!) will format infinite in a
|
||||||
* different way, so better to handle it in an explicit way. */
|
* different way, so better to handle it in an explicit way. */
|
||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
|
addReplyBulkCStringCore(c, d > 0 ? "inf" : "-inf", fAsync);
|
||||||
} else {
|
} else {
|
||||||
addReplyProto(c, d > 0 ? ",inf\r\n" : "-inf\r\n",
|
addReplyProtoCore(c, d > 0 ? ",inf\r\n" : "-inf\r\n",
|
||||||
d > 0 ? 6 : 7);
|
d > 0 ? 6 : 7, fAsync);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
char dbuf[MAX_LONG_DOUBLE_CHARS+3],
|
char dbuf[MAX_LONG_DOUBLE_CHARS+3],
|
||||||
@ -516,14 +650,22 @@ void addReplyDouble(client *c, double d) {
|
|||||||
if (c->resp == 2) {
|
if (c->resp == 2) {
|
||||||
dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
|
dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
|
||||||
slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
|
slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
|
||||||
addReplyProto(c,sbuf,slen);
|
addReplyProtoCore(c,sbuf,slen,fAsync);
|
||||||
} else {
|
} else {
|
||||||
dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
|
dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
|
||||||
addReplyProto(c,dbuf,dlen);
|
addReplyProtoCore(c,dbuf,dlen,fAsync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplyDouble(client *c, double d) {
|
||||||
|
addReplyDoubleCore(c, d, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyDoubleAsync(client *c, double d) {
|
||||||
|
addReplyDoubleCore(c, d, true);
|
||||||
|
}
|
||||||
|
|
||||||
/* Add a long double as a bulk reply, but uses a human readable formatting
|
/* Add a long double as a bulk reply, but uses a human readable formatting
|
||||||
* of the double instead of exposing the crude behavior of doubles to the
|
* of the double instead of exposing the crude behavior of doubles to the
|
||||||
* dear user. */
|
* dear user. */
|
||||||
@ -543,7 +685,7 @@ void addReplyHumanLongDouble(client *c, long double d) {
|
|||||||
|
|
||||||
/* Add a long long as integer reply or bulk len / multi bulk count.
|
/* Add a long long as integer reply or bulk len / multi bulk count.
|
||||||
* Basically this is used to output <prefix><long long><crlf>. */
|
* Basically this is used to output <prefix><long long><crlf>. */
|
||||||
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
|
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) {
|
||||||
char buf[128];
|
char buf[128];
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
@ -551,10 +693,10 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
|
|||||||
* so we have a few shared objects to use if the integer is small
|
* so we have a few shared objects to use if the integer is small
|
||||||
* like it is most of the times. */
|
* like it is most of the times. */
|
||||||
if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
|
if (prefix == '*' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
|
||||||
addReply(c,shared.mbulkhdr[ll]);
|
addReplyCore(c,shared.mbulkhdr[ll], fAsync);
|
||||||
return;
|
return;
|
||||||
} else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
|
} else if (prefix == '$' && ll < OBJ_SHARED_BULKHDR_LEN && ll >= 0) {
|
||||||
addReply(c,shared.bulkhdr[ll]);
|
addReplyCore(c,shared.bulkhdr[ll], fAsync);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -562,7 +704,11 @@ void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
|
|||||||
len = ll2string(buf+1,sizeof(buf)-1,ll);
|
len = ll2string(buf+1,sizeof(buf)-1,ll);
|
||||||
buf[len+1] = '\r';
|
buf[len+1] = '\r';
|
||||||
buf[len+2] = '\n';
|
buf[len+2] = '\n';
|
||||||
addReplyProto(c,buf,len+3);
|
addReplyProtoCore(c,buf,len+3, fAsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix) {
|
||||||
|
addReplyLongLongWithPrefixCore(c, ll, prefix, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyLongLong(client *c, long long ll) {
|
void addReplyLongLong(client *c, long long ll) {
|
||||||
@ -574,21 +720,41 @@ void addReplyLongLong(client *c, long long ll) {
|
|||||||
addReplyLongLongWithPrefix(c,ll,':');
|
addReplyLongLongWithPrefix(c,ll,':');
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyAggregateLen(client *c, long length, int prefix) {
|
void addReplyAggregateLenCore(client *c, long length, int prefix, bool fAsync) {
|
||||||
if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN)
|
if (prefix == '*' && length < OBJ_SHARED_BULKHDR_LEN)
|
||||||
addReply(c,shared.mbulkhdr[length]);
|
addReplyCore(c,shared.mbulkhdr[length], fAsync);
|
||||||
else
|
else
|
||||||
addReplyLongLongWithPrefix(c,length,prefix);
|
addReplyLongLongWithPrefixCore(c,length,prefix, fAsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyAggregateLen(client *c, long length, int prefix) {
|
||||||
|
addReplyAggregateLenCore(c, length, prefix, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyArrayLenCore(client *c, long length, bool fAsync) {
|
||||||
|
addReplyAggregateLenCore(c,length,'*', fAsync);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyArrayLen(client *c, long length) {
|
void addReplyArrayLen(client *c, long length) {
|
||||||
addReplyAggregateLen(c,length,'*');
|
addReplyArrayLenCore(c, length, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyArrayLenAsync(client *c, long length) {
|
||||||
|
addReplyArrayLenCore(c, length, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyMapLenCore(client *c, long length, bool fAsync) {
|
||||||
|
int prefix = c->resp == 2 ? '*' : '%';
|
||||||
|
if (c->resp == 2) length *= 2;
|
||||||
|
addReplyAggregateLenCore(c,length,prefix,fAsync);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyMapLen(client *c, long length) {
|
void addReplyMapLen(client *c, long length) {
|
||||||
int prefix = c->resp == 2 ? '*' : '%';
|
addReplyMapLenCore(c, length, false);
|
||||||
if (c->resp == 2) length *= 2;
|
}
|
||||||
addReplyAggregateLen(c,length,prefix);
|
|
||||||
|
void addReplyMapLenAsync(client *c, long length) {
|
||||||
|
addReplyMapLenCore(c, length, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplySetLen(client *c, long length) {
|
void addReplySetLen(client *c, long length) {
|
||||||
@ -602,17 +768,33 @@ void addReplyAttributeLen(client *c, long length) {
|
|||||||
addReplyAggregateLen(c,length,prefix);
|
addReplyAggregateLen(c,length,prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyPushLen(client *c, long length) {
|
void addReplyPushLenCore(client *c, long length, bool fAsync) {
|
||||||
int prefix = c->resp == 2 ? '*' : '>';
|
int prefix = c->resp == 2 ? '*' : '>';
|
||||||
addReplyAggregateLen(c,length,prefix);
|
addReplyAggregateLenCore(c,length,prefix, fAsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyPushLen(client *c, long length) {
|
||||||
|
addReplyPushLenCore(c, length, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyPushLenAsync(client *c, long length) {
|
||||||
|
addReplyPushLenCore(c, length, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyNullCore(client *c, bool fAsync) {
|
||||||
|
if (c->resp == 2) {
|
||||||
|
addReplyProtoCore(c,"$-1\r\n",5,fAsync);
|
||||||
|
} else {
|
||||||
|
addReplyProtoCore(c,"_\r\n",3,fAsync);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyNull(client *c) {
|
void addReplyNull(client *c) {
|
||||||
if (c->resp == 2) {
|
addReplyNullCore(c, false);
|
||||||
addReplyProto(c,"$-1\r\n",5);
|
}
|
||||||
} else {
|
|
||||||
addReplyProto(c,"_\r\n",3);
|
void addReplyNullAsync(client *c) {
|
||||||
}
|
addReplyNullCore(c, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addReplyBool(client *c, int b) {
|
void addReplyBool(client *c, int b) {
|
||||||
@ -636,7 +818,7 @@ void addReplyNullArray(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Create the length prefix of a bulk reply, example: $2234 */
|
/* Create the length prefix of a bulk reply, example: $2234 */
|
||||||
void addReplyBulkLen(client *c, robj *obj) {
|
void addReplyBulkLenCore(client *c, robj *obj, bool fAsync) {
|
||||||
size_t len;
|
size_t len;
|
||||||
|
|
||||||
if (sdsEncodedObject(obj)) {
|
if (sdsEncodedObject(obj)) {
|
||||||
@ -656,41 +838,76 @@ void addReplyBulkLen(client *c, robj *obj) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (len < OBJ_SHARED_BULKHDR_LEN)
|
if (len < OBJ_SHARED_BULKHDR_LEN)
|
||||||
addReply(c,shared.bulkhdr[len]);
|
addReplyCore(c,shared.bulkhdr[len], fAsync);
|
||||||
else
|
else
|
||||||
addReplyLongLongWithPrefix(c,len,'$');
|
addReplyLongLongWithPrefixCore(c,len,'$', fAsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyBulkLen(client *c, robj *obj)
|
||||||
|
{
|
||||||
|
addReplyBulkLenCore(c, obj, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a Redis Object as a bulk reply */
|
/* Add a Redis Object as a bulk reply */
|
||||||
void addReplyBulk(client *c, robj *obj) {
|
void addReplyBulkCore(client *c, robj *obj, bool fAsync) {
|
||||||
addReplyBulkLen(c,obj);
|
addReplyBulkLenCore(c,obj,fAsync);
|
||||||
addReply(c,obj);
|
addReplyCore(c,obj,fAsync);
|
||||||
addReply(c,shared.crlf);
|
addReplyCore(c,shared.crlf,fAsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyBulk(client *c, robj *obj)
|
||||||
|
{
|
||||||
|
addReplyBulkCore(c, obj, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyBulkAsync(client *c, robj *obj)
|
||||||
|
{
|
||||||
|
addReplyBulkCore(c, obj, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a C buffer as bulk reply */
|
/* Add a C buffer as bulk reply */
|
||||||
|
void addReplyBulkCBufferCore(client *c, const void *p, size_t len, bool fAsync) {
|
||||||
|
addReplyLongLongWithPrefixCore(c,len,'$',fAsync);
|
||||||
|
addReplyProtoCore(c,(const char*)p,len,fAsync);
|
||||||
|
addReplyCore(c,shared.crlf,fAsync);
|
||||||
|
}
|
||||||
|
|
||||||
void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
|
void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
|
||||||
addReplyLongLongWithPrefix(c,len,'$');
|
addReplyBulkCBufferCore(c, p, len, false);
|
||||||
addReplyProto(c,(const char*)p,len);
|
}
|
||||||
addReply(c,shared.crlf);
|
|
||||||
|
void addReplyBulkCBufferAsync(client *c, const void *p, size_t len) {
|
||||||
|
addReplyBulkCBufferCore(c, p, len, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add sds to reply (takes ownership of sds and frees it) */
|
/* Add sds to reply (takes ownership of sds and frees it) */
|
||||||
void addReplyBulkSds(client *c, sds s) {
|
void addReplyBulkSdsCore(client *c, sds s, bool fAsync) {
|
||||||
addReplyLongLongWithPrefix(c,sdslen(s),'$');
|
addReplyLongLongWithPrefixCore(c,sdslen(s),'$', fAsync);
|
||||||
addReplySds(c,s);
|
addReplySdsCore(c,s,fAsync);
|
||||||
addReply(c,shared.crlf);
|
addReplyCore(c,shared.crlf,fAsync);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyBulkSds(client *c, sds s) {
|
||||||
|
addReplyBulkSdsCore(c, s, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void addReplyBulkSdsAsync(client *c, sds s) {
|
||||||
|
addReplyBulkSdsCore(c, s, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add a C null term string as bulk reply */
|
/* Add a C null term string as bulk reply */
|
||||||
void addReplyBulkCString(client *c, const char *s) {
|
void addReplyBulkCStringCore(client *c, const char *s, bool fAsync) {
|
||||||
if (s == NULL) {
|
if (s == NULL) {
|
||||||
addReplyNull(c);
|
addReplyNullCore(c,fAsync);
|
||||||
} else {
|
} else {
|
||||||
addReplyBulkCBuffer(c,s,strlen(s));
|
addReplyBulkCBufferCore(c,s,strlen(s),fAsync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void addReplyBulkCString(client *c, const char *s) {
|
||||||
|
addReplyBulkCStringCore(c, s, false);
|
||||||
|
}
|
||||||
|
|
||||||
/* Add a long long as a bulk reply */
|
/* Add a long long as a bulk reply */
|
||||||
void addReplyBulkLongLong(client *c, long long ll) {
|
void addReplyBulkLongLong(client *c, long long ll) {
|
||||||
char buf[64];
|
char buf[64];
|
||||||
@ -775,8 +992,8 @@ void copyClientOutputBuffer(client *dst, client *src) {
|
|||||||
|
|
||||||
/* Return true if the specified client has pending reply buffers to write to
|
/* Return true if the specified client has pending reply buffers to write to
|
||||||
* the socket. */
|
* the socket. */
|
||||||
int clientHasPendingReplies(client *c) {
|
int clientHasPendingReplies(client *c, int fIncludeAsync) {
|
||||||
return c->bufpos || listLength(c->reply);
|
return c->bufpos || listLength(c->reply) || (fIncludeAsync && listLength(c->listbufferDoneAsync));
|
||||||
}
|
}
|
||||||
|
|
||||||
#define MAX_ACCEPTS_PER_CALL 1000
|
#define MAX_ACCEPTS_PER_CALL 1000
|
||||||
@ -929,6 +1146,8 @@ void unlinkClient(client *c) {
|
|||||||
* If the client was already unlinked or if it's a "fake client" the
|
* If the client was already unlinked or if it's a "fake client" the
|
||||||
* fd is already set to -1. */
|
* fd is already set to -1. */
|
||||||
if (c->fd != -1) {
|
if (c->fd != -1) {
|
||||||
|
AssertCorrectThread(c);
|
||||||
|
|
||||||
/* Remove from the list of active clients. */
|
/* Remove from the list of active clients. */
|
||||||
if (c->client_list_node) {
|
if (c->client_list_node) {
|
||||||
uint64_t id = htonu64(c->id);
|
uint64_t id = htonu64(c->id);
|
||||||
@ -938,32 +1157,35 @@ void unlinkClient(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Unregister async I/O handlers and close the socket. */
|
/* Unregister async I/O handlers and close the socket. */
|
||||||
aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_READABLE);
|
aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE);
|
||||||
aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE);
|
||||||
close(c->fd);
|
close(c->fd);
|
||||||
c->fd = -1;
|
c->fd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remove from the list of pending writes if needed. */
|
/* Remove from the list of pending writes if needed. */
|
||||||
if (c->flags & CLIENT_PENDING_WRITE) {
|
if (c->flags & CLIENT_PENDING_WRITE) {
|
||||||
ln = listSearchKey(server.rgclients_pending_write[c->iel],c);
|
ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_write,c);
|
||||||
serverAssert(ln != NULL);
|
serverAssert(ln != NULL);
|
||||||
listDelNode(server.rgclients_pending_write[c->iel],ln);
|
listDelNode(server.rgthreadvar[c->iel].clients_pending_write,ln);
|
||||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* When client was just unblocked because of a blocking operation,
|
/* When client was just unblocked because of a blocking operation,
|
||||||
* remove it from the list of unblocked clients. */
|
* remove it from the list of unblocked clients. */
|
||||||
if (c->flags & CLIENT_UNBLOCKED) {
|
if (c->flags & CLIENT_UNBLOCKED) {
|
||||||
ln = listSearchKey(server.rgunblocked_clients[c->iel],c);
|
AssertCorrectThread(c);
|
||||||
|
ln = listSearchKey(server.rgthreadvar[c->iel].unblocked_clients,c);
|
||||||
serverAssert(ln != NULL);
|
serverAssert(ln != NULL);
|
||||||
listDelNode(server.rgunblocked_clients[c->iel],ln);
|
listDelNode(server.rgthreadvar[c->iel].unblocked_clients,ln);
|
||||||
c->flags &= ~CLIENT_UNBLOCKED;
|
c->flags &= ~CLIENT_UNBLOCKED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void freeClient(client *c) {
|
void freeClient(client *c) {
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
|
serverAssert(aeThreadOwnsLock());
|
||||||
|
AssertCorrectThread(c);
|
||||||
|
|
||||||
/* If a client is protected, yet we need to free it right now, make sure
|
/* If a client is protected, yet we need to free it right now, make sure
|
||||||
* to at least use asynchronous freeing. */
|
* to at least use asynchronous freeing. */
|
||||||
@ -1055,6 +1277,8 @@ void freeClient(client *c) {
|
|||||||
|
|
||||||
/* Release other dynamically allocated client structure fields,
|
/* Release other dynamically allocated client structure fields,
|
||||||
* and finally release the client structure itself. */
|
* and finally release the client structure itself. */
|
||||||
|
zfree(c->bufAsync);
|
||||||
|
listRelease(c->listbufferDoneAsync);
|
||||||
if (c->name) decrRefCount(c->name);
|
if (c->name) decrRefCount(c->name);
|
||||||
zfree(c->argv);
|
zfree(c->argv);
|
||||||
freeClientMultiState(c);
|
freeClientMultiState(c);
|
||||||
@ -1069,17 +1293,25 @@ void freeClient(client *c) {
|
|||||||
void freeClientAsync(client *c) {
|
void freeClientAsync(client *c) {
|
||||||
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
|
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
|
||||||
c->flags |= CLIENT_CLOSE_ASAP;
|
c->flags |= CLIENT_CLOSE_ASAP;
|
||||||
|
aeAcquireLock();
|
||||||
listAddNodeTail(server.clients_to_close,c);
|
listAddNodeTail(server.clients_to_close,c);
|
||||||
|
aeReleaseLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void freeClientsInAsyncFreeQueue(void) {
|
void freeClientsInAsyncFreeQueue(int iel) {
|
||||||
while (listLength(server.clients_to_close)) {
|
listIter li;
|
||||||
listNode *ln = listFirst(server.clients_to_close);
|
listNode *ln;
|
||||||
|
listRewind(server.clients_to_close,&li);
|
||||||
|
|
||||||
|
while((ln = listNext(&li))) {
|
||||||
client *c = (client*)listNodeValue(ln);
|
client *c = (client*)listNodeValue(ln);
|
||||||
|
if (c->iel != iel)
|
||||||
|
continue; // wrong thread
|
||||||
|
|
||||||
c->flags &= ~CLIENT_CLOSE_ASAP;
|
c->flags &= ~CLIENT_CLOSE_ASAP;
|
||||||
freeClient(c);
|
freeClient(c);
|
||||||
listDelNode(server.clients_to_close,ln);
|
listDelNode(server.clients_to_close,ln);
|
||||||
|
listRewind(server.clients_to_close,&li);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1096,11 +1328,45 @@ client *lookupClientByID(uint64_t id) {
|
|||||||
* is still valid after the call, C_ERR if it was freed. */
|
* is still valid after the call, C_ERR if it was freed. */
|
||||||
int writeToClient(int fd, client *c, int handler_installed) {
|
int writeToClient(int fd, client *c, int handler_installed) {
|
||||||
ssize_t nwritten = 0, totwritten = 0;
|
ssize_t nwritten = 0, totwritten = 0;
|
||||||
size_t objlen;
|
|
||||||
clientReplyBlock *o;
|
clientReplyBlock *o;
|
||||||
|
AssertCorrectThread(c);
|
||||||
|
|
||||||
while(clientHasPendingReplies(c)) {
|
// Decide up front if we are sending the done buffer. This prevents us from completing
|
||||||
if (c->bufpos > 0) {
|
// a transmission on another thread while transmitting the thread local buffer, resulting in us
|
||||||
|
// overlapping messages
|
||||||
|
AeLocker locker(true);
|
||||||
|
int fSendAsyncBuffer = listLength(c->listbufferDoneAsync) && (c->sentlen == 0 || c->sentlenAsync > 0);
|
||||||
|
if (!fSendAsyncBuffer)
|
||||||
|
locker.disarm();
|
||||||
|
|
||||||
|
while(fSendAsyncBuffer || clientHasPendingReplies(c, FALSE)) {
|
||||||
|
if (fSendAsyncBuffer) {
|
||||||
|
o = (clientReplyBlock*)listNodeValue(listFirst(c->listbufferDoneAsync));
|
||||||
|
if (o->used == 0) {
|
||||||
|
listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
nwritten = write(fd, o->buf() + c->sentlen, o->used - c->sentlen);
|
||||||
|
if (nwritten <= 0)
|
||||||
|
break;
|
||||||
|
c->sentlenAsync += nwritten;
|
||||||
|
totwritten += nwritten;
|
||||||
|
|
||||||
|
/* If we fully sent the object on head go to the next one */
|
||||||
|
if (c->sentlenAsync == o->used) {
|
||||||
|
listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync));
|
||||||
|
c->sentlenAsync = 0;
|
||||||
|
/* If there are no longer objects in the list, we expect
|
||||||
|
* the count of reply bytes to be exactly zero. */
|
||||||
|
if (listLength(c->listbufferDoneAsync) == 0)
|
||||||
|
{
|
||||||
|
fSendAsyncBuffer = 0;
|
||||||
|
locker.disarm();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (c->bufpos > 0) {
|
||||||
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
||||||
|
|
||||||
if (nwritten <= 0) break;
|
if (nwritten <= 0) break;
|
||||||
@ -1115,27 +1381,26 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
o = (clientReplyBlock*)listNodeValue(listFirst(c->reply));
|
o = (clientReplyBlock*)listNodeValue(listFirst(c->reply));
|
||||||
objlen = o->used;
|
if (o->used == 0) {
|
||||||
|
|
||||||
if (objlen == 0) {
|
|
||||||
c->reply_bytes -= o->size;
|
c->reply_bytes -= o->size;
|
||||||
listDelNode(c->reply,listFirst(c->reply));
|
listDelNode(c->reply,listFirst(c->reply));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
nwritten = write(fd, o->buf() + c->sentlen, objlen - c->sentlen);
|
nwritten = write(fd, o->buf() + c->sentlen, o->used - c->sentlen);
|
||||||
|
if (nwritten <= 0)
|
||||||
|
break;
|
||||||
|
|
||||||
if (nwritten <= 0) break;
|
|
||||||
c->sentlen += nwritten;
|
c->sentlen += nwritten;
|
||||||
totwritten += nwritten;
|
totwritten += nwritten;
|
||||||
|
|
||||||
/* If we fully sent the object on head go to the next one */
|
/* If we fully sent the object on head go to the next one */
|
||||||
if (c->sentlen == objlen) {
|
if (c->sentlen == o->used) {
|
||||||
c->reply_bytes -= o->size;
|
c->reply_bytes -= o->size;
|
||||||
listDelNode(c->reply,listFirst(c->reply));
|
listDelNode(c->reply,listFirst(c->reply));
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
/* If there are no longer objects in the list, we expect
|
/* If there are no longer objects in the list, we expect
|
||||||
* the count of reply bytes to be exactly zero. */
|
* the count of reply bytes to be exactly zero. */
|
||||||
if (listLength(c->reply) == 0)
|
if (listLength(c->reply) == 0)
|
||||||
serverAssert(c->reply_bytes == 0);
|
serverAssert(c->reply_bytes == 0);
|
||||||
}
|
}
|
||||||
@ -1165,7 +1430,9 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
} else {
|
} else {
|
||||||
serverLog(LL_VERBOSE,
|
serverLog(LL_VERBOSE,
|
||||||
"Error writing to client: %s", strerror(errno));
|
"Error writing to client: %s", strerror(errno));
|
||||||
|
aeAcquireLock();
|
||||||
freeClient(c);
|
freeClient(c);
|
||||||
|
aeReleaseLock();
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1176,13 +1443,15 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
* We just rely on data / pings received for timeout detection. */
|
* We just rely on data / pings received for timeout detection. */
|
||||||
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
|
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
|
||||||
}
|
}
|
||||||
if (!clientHasPendingReplies(c)) {
|
if (!clientHasPendingReplies(c, TRUE)) {
|
||||||
c->sentlen = 0;
|
c->sentlen = 0;
|
||||||
if (handler_installed) aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE);
|
if (handler_installed) aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE);
|
||||||
|
|
||||||
/* Close connection after entire reply has been sent. */
|
/* Close connection after entire reply has been sent. */
|
||||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
||||||
|
aeAcquireLock();
|
||||||
freeClient(c);
|
freeClient(c);
|
||||||
|
aeReleaseLock();
|
||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1191,9 +1460,16 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
|||||||
|
|
||||||
/* Write event handler. Just send data to the client. */
|
/* Write event handler. Just send data to the client. */
|
||||||
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
UNUSED(el);
|
|
||||||
UNUSED(mask);
|
UNUSED(mask);
|
||||||
writeToClient(fd,(client*)privdata,1);
|
AeLocker locker;
|
||||||
|
client *c = (client*)privdata;
|
||||||
|
|
||||||
|
serverAssert(ielFromEventLoop(el) == c->iel);
|
||||||
|
|
||||||
|
if (c->flags | CLIENT_SLAVE)
|
||||||
|
locker.arm();
|
||||||
|
|
||||||
|
writeToClient(fd,c,1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called just before entering the event loop, in the hope
|
/* This function is called just before entering the event loop, in the hope
|
||||||
@ -1203,14 +1479,16 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
int handleClientsWithPendingWrites(int iel) {
|
int handleClientsWithPendingWrites(int iel) {
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
list *pending_writes = server.rgclients_pending_write[iel];
|
|
||||||
int processed = listLength(pending_writes);
|
|
||||||
|
|
||||||
listRewind(pending_writes,&li);
|
list *list = server.rgthreadvar[iel].clients_pending_write;
|
||||||
|
int processed = listLength(list);
|
||||||
|
|
||||||
|
listRewind(list,&li);
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *c = (client*)listNodeValue(ln);
|
client *c = (client*)listNodeValue(ln);
|
||||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||||
listDelNode(pending_writes,ln);
|
listDelNode(list,ln);
|
||||||
|
AssertCorrectThread(c);
|
||||||
|
|
||||||
/* If a client is protected, don't do anything,
|
/* If a client is protected, don't do anything,
|
||||||
* that may trigger write error or recreate handler. */
|
* that may trigger write error or recreate handler. */
|
||||||
@ -1221,7 +1499,7 @@ int handleClientsWithPendingWrites(int iel) {
|
|||||||
|
|
||||||
/* If after the synchronous writes above we still have data to
|
/* If after the synchronous writes above we still have data to
|
||||||
* output to the client, we need to install the writable handler. */
|
* output to the client, we need to install the writable handler. */
|
||||||
if (clientHasPendingReplies(c)) {
|
if (clientHasPendingReplies(c, TRUE)) {
|
||||||
int ae_flags = AE_WRITABLE;
|
int ae_flags = AE_WRITABLE;
|
||||||
/* For the fsync=always policy, we want that a given FD is never
|
/* For the fsync=always policy, we want that a given FD is never
|
||||||
* served for reading and writing in the same event loop iteration,
|
* served for reading and writing in the same event loop iteration,
|
||||||
@ -1233,11 +1511,9 @@ int handleClientsWithPendingWrites(int iel) {
|
|||||||
{
|
{
|
||||||
ae_flags |= AE_BARRIER;
|
ae_flags |= AE_BARRIER;
|
||||||
}
|
}
|
||||||
if (aeCreateFileEvent(server.rgel[c->iel], c->fd, ae_flags,
|
|
||||||
sendReplyToClient, c) == AE_ERR)
|
if (aeCreateFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR)
|
||||||
{
|
freeClientAsync(c);
|
||||||
freeClientAsync(c);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return processed;
|
return processed;
|
||||||
@ -1282,16 +1558,18 @@ void resetClient(client *c) {
|
|||||||
* path, it is not really released, but only marked for later release. */
|
* path, it is not really released, but only marked for later release. */
|
||||||
void protectClient(client *c) {
|
void protectClient(client *c) {
|
||||||
c->flags |= CLIENT_PROTECTED;
|
c->flags |= CLIENT_PROTECTED;
|
||||||
aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_READABLE);
|
AssertCorrectThread(c);
|
||||||
aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE);
|
aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE);
|
||||||
|
aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This will undo the client protection done by protectClient() */
|
/* This will undo the client protection done by protectClient() */
|
||||||
void unprotectClient(client *c) {
|
void unprotectClient(client *c) {
|
||||||
|
AssertCorrectThread(c);
|
||||||
if (c->flags & CLIENT_PROTECTED) {
|
if (c->flags & CLIENT_PROTECTED) {
|
||||||
c->flags &= ~CLIENT_PROTECTED;
|
c->flags &= ~CLIENT_PROTECTED;
|
||||||
aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c);
|
aeCreateFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c);
|
||||||
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
if (clientHasPendingReplies(c, TRUE)) clientInstallWriteHandler(c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1646,6 +1924,10 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
serverAssert(mask & AE_READ_THREADSAFE);
|
serverAssert(mask & AE_READ_THREADSAFE);
|
||||||
serverAssert(c->iel == ielFromEventLoop(el));
|
serverAssert(c->iel == ielFromEventLoop(el));
|
||||||
|
|
||||||
|
AeLocker lockerSlave;
|
||||||
|
if (c->flags & CLIENT_SLAVE) // slaves are not async capable
|
||||||
|
lockerSlave.arm();
|
||||||
|
|
||||||
readlen = PROTO_IOBUF_LEN;
|
readlen = PROTO_IOBUF_LEN;
|
||||||
/* If this is a multi bulk request, and we are processing a bulk reply
|
/* If this is a multi bulk request, and we are processing a bulk reply
|
||||||
* that is large enough, try to maximize the probability that the query
|
* that is large enough, try to maximize the probability that the query
|
||||||
@ -1801,7 +2083,7 @@ sds catClientInfoString(sds s, client *client) {
|
|||||||
if (p == flags) *p++ = 'N';
|
if (p == flags) *p++ = 'N';
|
||||||
*p++ = '\0';
|
*p++ = '\0';
|
||||||
|
|
||||||
emask = client->fd == -1 ? 0 : aeGetFileEvents(server.rgel[client->iel],client->fd);
|
emask = client->fd == -1 ? 0 : aeGetFileEvents(server.rgthreadvar[client->iel].el,client->fd);
|
||||||
p = events;
|
p = events;
|
||||||
if (emask & AE_READABLE) *p++ = 'r';
|
if (emask & AE_READABLE) *p++ = 'r';
|
||||||
if (emask & AE_WRITABLE) *p++ = 'w';
|
if (emask & AE_WRITABLE) *p++ = 'w';
|
||||||
@ -1969,7 +2251,7 @@ NULL
|
|||||||
if (c == client) {
|
if (c == client) {
|
||||||
close_this_client = 1;
|
close_this_client = 1;
|
||||||
} else {
|
} else {
|
||||||
freeClient(client);
|
freeClientAsync(client);
|
||||||
}
|
}
|
||||||
killed++;
|
killed++;
|
||||||
}
|
}
|
||||||
@ -2349,10 +2631,10 @@ void flushSlavesOutputBuffers(void) {
|
|||||||
* of put_online_on_ack is to postpone the moment it is installed.
|
* of put_online_on_ack is to postpone the moment it is installed.
|
||||||
* This is what we want since slaves in this state should not receive
|
* This is what we want since slaves in this state should not receive
|
||||||
* writes before the first ACK. */
|
* writes before the first ACK. */
|
||||||
events = aeGetFileEvents(server.rgel[IDX_EVENT_LOOP_MAIN],slave->fd);
|
events = aeGetFileEvents(server.rgthreadvar[slave->iel].el,slave->fd);
|
||||||
if (events & AE_WRITABLE &&
|
if (events & AE_WRITABLE &&
|
||||||
slave->replstate == SLAVE_STATE_ONLINE &&
|
slave->replstate == SLAVE_STATE_ONLINE &&
|
||||||
clientHasPendingReplies(slave))
|
clientHasPendingReplies(slave, TRUE))
|
||||||
{
|
{
|
||||||
writeToClient(slave->fd,slave,0);
|
writeToClient(slave->fd,slave,0);
|
||||||
}
|
}
|
||||||
@ -2426,7 +2708,7 @@ int processEventsWhileBlocked(int iel) {
|
|||||||
int count = 0;
|
int count = 0;
|
||||||
while (iterations--) {
|
while (iterations--) {
|
||||||
int events = 0;
|
int events = 0;
|
||||||
events += aeProcessEvents(server.rgel[iel], AE_FILE_EVENTS|AE_DONT_WAIT);
|
events += aeProcessEvents(server.rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
||||||
events += handleClientsWithPendingWrites(iel);
|
events += handleClientsWithPendingWrites(iel);
|
||||||
if (!events) break;
|
if (!events) break;
|
||||||
count += events;
|
count += events;
|
||||||
|
10
src/pubsub.c
10
src/pubsub.c
@ -38,12 +38,12 @@ int clientSubscriptionsCount(client *c);
|
|||||||
/* Send a pubsub message of type "message" to the client. */
|
/* Send a pubsub message of type "message" to the client. */
|
||||||
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
|
||||||
if (c->resp == 2)
|
if (c->resp == 2)
|
||||||
addReply(c,shared.mbulkhdr[3]);
|
addReplyAsync(c,shared.mbulkhdr[3]);
|
||||||
else
|
else
|
||||||
addReplyPushLen(c,3);
|
addReplyPushLenAsync(c,3);
|
||||||
addReply(c,shared.messagebulk);
|
addReplyAsync(c,shared.messagebulk);
|
||||||
addReplyBulk(c,channel);
|
addReplyBulkAsync(c,channel);
|
||||||
addReplyBulk(c,msg);
|
addReplyBulkAsync(c,msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a pubsub message of type "pmessage" to the client. The difference
|
/* Send a pubsub message of type "pmessage" to the client. The difference
|
||||||
|
2720
src/replication.c
2720
src/replication.c
File diff suppressed because it is too large
Load Diff
@ -856,8 +856,8 @@ void putSlaveOnline(client *slave) {
|
|||||||
slave->replstate = SLAVE_STATE_ONLINE;
|
slave->replstate = SLAVE_STATE_ONLINE;
|
||||||
slave->repl_put_online_on_ack = 0;
|
slave->repl_put_online_on_ack = 0;
|
||||||
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
||||||
AssertCorrectThread(slave);
|
//AssertCorrectThread(slave);
|
||||||
if (aeCreateFileEvent(serverTL->el, slave->fd, AE_WRITABLE,
|
if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE,
|
||||||
sendReplyToClient, slave) == AE_ERR) {
|
sendReplyToClient, slave) == AE_ERR) {
|
||||||
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
|
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
|
||||||
freeClient(slave);
|
freeClient(slave);
|
||||||
@ -954,7 +954,6 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
|||||||
startbgsave = 1;
|
startbgsave = 1;
|
||||||
mincapa = (mincapa == -1) ? slave->slave_capa :
|
mincapa = (mincapa == -1) ? slave->slave_capa :
|
||||||
(mincapa & slave->slave_capa);
|
(mincapa & slave->slave_capa);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1559,7 +1559,7 @@ NULL
|
|||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
}
|
}
|
||||||
} else if (c->argc == 3 && !strcasecmp(ptrFromObj(c->argv[1]),"debug")) {
|
} else if (c->argc == 3 && !strcasecmp(ptrFromObj(c->argv[1]),"debug")) {
|
||||||
if (clientHasPendingReplies(c)) {
|
if (clientHasPendingReplies(c, TRUE)) {
|
||||||
addReplyError(c,"SCRIPT DEBUG must be called outside a pipeline");
|
addReplyError(c,"SCRIPT DEBUG must be called outside a pipeline");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2013,7 +2013,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
|||||||
link->pending_commands = 0;
|
link->pending_commands = 0;
|
||||||
link->cc_conn_time = mstime();
|
link->cc_conn_time = mstime();
|
||||||
link->cc->data = link;
|
link->cc->data = link;
|
||||||
redisAeAttach(server.rgel[IDX_EVENT_LOOP_MAIN],link->cc);
|
redisAeAttach(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->cc);
|
||||||
redisAsyncSetConnectCallback(link->cc,
|
redisAsyncSetConnectCallback(link->cc,
|
||||||
sentinelLinkEstablishedCallback);
|
sentinelLinkEstablishedCallback);
|
||||||
redisAsyncSetDisconnectCallback(link->cc,
|
redisAsyncSetDisconnectCallback(link->cc,
|
||||||
@ -2037,7 +2037,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
|||||||
|
|
||||||
link->pc_conn_time = mstime();
|
link->pc_conn_time = mstime();
|
||||||
link->pc->data = link;
|
link->pc->data = link;
|
||||||
redisAeAttach(server.rgel[IDX_EVENT_LOOP_MAIN],link->pc);
|
redisAeAttach(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,link->pc);
|
||||||
redisAsyncSetConnectCallback(link->pc,
|
redisAsyncSetConnectCallback(link->pc,
|
||||||
sentinelLinkEstablishedCallback);
|
sentinelLinkEstablishedCallback);
|
||||||
redisAsyncSetDisconnectCallback(link->pc,
|
redisAsyncSetDisconnectCallback(link->pc,
|
||||||
|
157
src/server.c
157
src/server.c
@ -71,6 +71,7 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
|
|||||||
|
|
||||||
/* Global vars */
|
/* Global vars */
|
||||||
struct redisServer server; /* Server global state */
|
struct redisServer server; /* Server global state */
|
||||||
|
__thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars
|
||||||
volatile unsigned long lru_clock; /* Server global current LRU time. */
|
volatile unsigned long lru_clock; /* Server global current LRU time. */
|
||||||
|
|
||||||
/* Our command table.
|
/* Our command table.
|
||||||
@ -1682,7 +1683,7 @@ void clientsCron(void) {
|
|||||||
}
|
}
|
||||||
else if (IDX_EVENT_LOOP_MAIN > 1)
|
else if (IDX_EVENT_LOOP_MAIN > 1)
|
||||||
{
|
{
|
||||||
aePostFunction(server.rgel[c->iel], AsyncClientCron, c);
|
aePostFunction(server.rgthreadvar[c->iel].el, AsyncClientCron, c);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -2002,7 +2003,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Close clients that need to be closed asynchronous */
|
/* Close clients that need to be closed asynchronous */
|
||||||
freeClientsInAsyncFreeQueue();
|
freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN);
|
||||||
|
|
||||||
/* Clear the paused clients flag if needed. */
|
/* Clear the paused clients flag if needed. */
|
||||||
clientsArePaused(); /* Don't check return value, just use the side effect.*/
|
clientsArePaused(); /* Don't check return value, just use the side effect.*/
|
||||||
@ -2046,6 +2047,19 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
return 1000/server.hz;
|
return 1000/server.hz;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serverCron for worker threads other than the main thread
|
||||||
|
int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData)
|
||||||
|
{
|
||||||
|
UNUSED(id);
|
||||||
|
UNUSED(clientData);
|
||||||
|
|
||||||
|
int iel = ielFromEventLoop(eventLoop);
|
||||||
|
serverAssert(iel != IDX_EVENT_LOOP_MAIN);
|
||||||
|
freeClientsInAsyncFreeQueue(iel);
|
||||||
|
|
||||||
|
return 1000/server.hz;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function gets called every time Redis is entering the
|
/* This function gets called every time Redis is entering the
|
||||||
* main loop of the event driven library, that is, before to sleep
|
* main loop of the event driven library, that is, before to sleep
|
||||||
* for ready file descriptors. */
|
* for ready file descriptors. */
|
||||||
@ -2088,7 +2102,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
moduleHandleBlockedClients();
|
moduleHandleBlockedClients();
|
||||||
|
|
||||||
/* Try to process pending commands for clients that were just unblocked. */
|
/* Try to process pending commands for clients that were just unblocked. */
|
||||||
if (listLength(server.rgunblocked_clients[IDX_EVENT_LOOP_MAIN]))
|
if (listLength(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients))
|
||||||
{
|
{
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
processUnblockedClients(IDX_EVENT_LOOP_MAIN);
|
processUnblockedClients(IDX_EVENT_LOOP_MAIN);
|
||||||
@ -2114,8 +2128,9 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
|||||||
int iel = ielFromEventLoop(eventLoop);
|
int iel = ielFromEventLoop(eventLoop);
|
||||||
|
|
||||||
/* Try to process pending commands for clients that were just unblocked. */
|
/* Try to process pending commands for clients that were just unblocked. */
|
||||||
if (listLength(server.rgunblocked_clients[iel]))
|
if (listLength(server.rgthreadvar[iel].unblocked_clients)) {
|
||||||
processUnblockedClients(iel);
|
processUnblockedClients(iel);
|
||||||
|
}
|
||||||
|
|
||||||
/* Handle writes with pending output buffers. */
|
/* Handle writes with pending output buffers. */
|
||||||
handleClientsWithPendingWrites(iel);
|
handleClientsWithPendingWrites(iel);
|
||||||
@ -2257,10 +2272,6 @@ void initServerConfig(void) {
|
|||||||
server.bindaddr_count = 0;
|
server.bindaddr_count = 0;
|
||||||
server.unixsocket = NULL;
|
server.unixsocket = NULL;
|
||||||
server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
|
server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
|
||||||
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
|
||||||
{
|
|
||||||
server.rgbindinfo[iel].ipfd_count = 0;
|
|
||||||
}
|
|
||||||
server.sofd = -1;
|
server.sofd = -1;
|
||||||
server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
|
server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
|
||||||
server.dbnum = CONFIG_DEFAULT_DBNUM;
|
server.dbnum = CONFIG_DEFAULT_DBNUM;
|
||||||
@ -2450,7 +2461,7 @@ void initServerConfig(void) {
|
|||||||
server.lua_always_replicate_commands = 1;
|
server.lua_always_replicate_commands = 1;
|
||||||
|
|
||||||
/* Multithreading */
|
/* Multithreading */
|
||||||
server.cel = CONFIG_DEFAULT_THREADS;
|
server.cthreads = CONFIG_DEFAULT_THREADS;
|
||||||
}
|
}
|
||||||
|
|
||||||
extern char **environ;
|
extern char **environ;
|
||||||
@ -2741,12 +2752,12 @@ static void initNetworkingThread(int iel, int fReusePort)
|
|||||||
{
|
{
|
||||||
/* Open the TCP listening socket for the user commands. */
|
/* Open the TCP listening socket for the user commands. */
|
||||||
if (server.port != 0 &&
|
if (server.port != 0 &&
|
||||||
listenToPort(server.port,server.rgbindinfo[iel].ipfd,&server.rgbindinfo[iel].ipfd_count, fReusePort) == C_ERR)
|
listenToPort(server.port,server.rgthreadvar[iel].ipfd,&server.rgthreadvar[iel].ipfd_count, fReusePort) == C_ERR)
|
||||||
exit(1);
|
exit(1);
|
||||||
|
|
||||||
/* Create an event handler for accepting new connections in TCP */
|
/* Create an event handler for accepting new connections in TCP */
|
||||||
for (int j = 0; j < server.rgbindinfo[iel].ipfd_count; j++) {
|
for (int j = 0; j < server.rgthreadvar[iel].ipfd_count; j++) {
|
||||||
if (aeCreateFileEvent(server.rgel[iel], server.rgbindinfo[iel].ipfd[j], AE_READABLE|AE_READ_THREADSAFE,
|
if (aeCreateFileEvent(server.rgthreadvar[iel].el, server.rgthreadvar[iel].ipfd[j], AE_READABLE|AE_READ_THREADSAFE,
|
||||||
acceptTcpHandler,NULL) == AE_ERR)
|
acceptTcpHandler,NULL) == AE_ERR)
|
||||||
{
|
{
|
||||||
serverPanic(
|
serverPanic(
|
||||||
@ -2757,7 +2768,7 @@ static void initNetworkingThread(int iel, int fReusePort)
|
|||||||
|
|
||||||
static void initNetworking(int fReusePort)
|
static void initNetworking(int fReusePort)
|
||||||
{
|
{
|
||||||
int celListen = (fReusePort) ? server.cel : 1;
|
int celListen = (fReusePort) ? server.cthreads : 1;
|
||||||
for (int iel = 0; iel < celListen; ++iel)
|
for (int iel = 0; iel < celListen; ++iel)
|
||||||
initNetworkingThread(iel, fReusePort);
|
initNetworkingThread(iel, fReusePort);
|
||||||
|
|
||||||
@ -2774,15 +2785,38 @@ static void initNetworking(int fReusePort)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Abort if there are no listening sockets at all. */
|
/* Abort if there are no listening sockets at all. */
|
||||||
if (server.rgbindinfo[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && server.sofd < 0) {
|
if (server.rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && server.sofd < 0) {
|
||||||
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
|
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE|AE_READ_THREADSAFE,
|
if (server.sofd > 0 && aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,server.sofd,AE_READABLE|AE_READ_THREADSAFE,
|
||||||
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
|
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
||||||
|
{
|
||||||
|
pvar->clients_pending_write = listCreate();
|
||||||
|
pvar->unblocked_clients = listCreate();
|
||||||
|
pvar->clients_pending_asyncwrite = listCreate();
|
||||||
|
pvar->ipfd_count = 0;
|
||||||
|
pvar->el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
|
||||||
|
if (pvar->el == NULL) {
|
||||||
|
serverLog(LL_WARNING,
|
||||||
|
"Failed creating the event loop. Error message: '%s'",
|
||||||
|
strerror(errno));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!fMain)
|
||||||
|
{
|
||||||
|
if (aeCreateTimeEvent(pvar->el, 1, serverCronLite, NULL, NULL) == AE_ERR) {
|
||||||
|
serverPanic("Can't create event loop timers.");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void initServer(void) {
|
void initServer(void) {
|
||||||
signal(SIGHUP, SIG_IGN);
|
signal(SIGHUP, SIG_IGN);
|
||||||
signal(SIGPIPE, SIG_IGN);
|
signal(SIGPIPE, SIG_IGN);
|
||||||
@ -2803,11 +2837,6 @@ void initServer(void) {
|
|||||||
server.clients_to_close = listCreate();
|
server.clients_to_close = listCreate();
|
||||||
server.slaves = listCreate();
|
server.slaves = listCreate();
|
||||||
server.monitors = listCreate();
|
server.monitors = listCreate();
|
||||||
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
|
||||||
{
|
|
||||||
server.rgclients_pending_write[iel] = listCreate();
|
|
||||||
server.rgunblocked_clients[iel] = listCreate();
|
|
||||||
}
|
|
||||||
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
|
||||||
server.ready_keys = listCreate();
|
server.ready_keys = listCreate();
|
||||||
server.clients_waiting_acks = listCreate();
|
server.clients_waiting_acks = listCreate();
|
||||||
@ -2817,16 +2846,7 @@ void initServer(void) {
|
|||||||
|
|
||||||
createSharedObjects();
|
createSharedObjects();
|
||||||
adjustOpenFilesLimit();
|
adjustOpenFilesLimit();
|
||||||
for (int i = 0; i < MAX_EVENT_LOOPS; ++i)
|
|
||||||
{
|
|
||||||
server.rgel[i] = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
|
|
||||||
if (server.rgel[i] == NULL) {
|
|
||||||
serverLog(LL_WARNING,
|
|
||||||
"Failed creating the event loop. Error message: '%s'",
|
|
||||||
strerror(errno));
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
server.db = zmalloc(sizeof(redisDb)*server.dbnum, MALLOC_LOCAL);
|
server.db = zmalloc(sizeof(redisDb)*server.dbnum, MALLOC_LOCAL);
|
||||||
|
|
||||||
/* Create the Redis databases, and initialize other internal state. */
|
/* Create the Redis databases, and initialize other internal state. */
|
||||||
@ -2879,14 +2899,14 @@ void initServer(void) {
|
|||||||
/* Create the timer callback, this is our way to process many background
|
/* Create the timer callback, this is our way to process many background
|
||||||
* operations incrementally, like clients timeout, eviction of unaccessed
|
* operations incrementally, like clients timeout, eviction of unaccessed
|
||||||
* expired keys and so forth. */
|
* expired keys and so forth. */
|
||||||
if (aeCreateTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN], 1, serverCron, NULL, NULL) == AE_ERR) {
|
if (aeCreateTimeEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, 1, serverCron, NULL, NULL) == AE_ERR) {
|
||||||
serverPanic("Can't create event loop timers.");
|
serverPanic("Can't create event loop timers.");
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Register a readable event for the pipe used to awake the event loop
|
/* Register a readable event for the pipe used to awake the event loop
|
||||||
* when a blocked client in a module needs attention. */
|
* when a blocked client in a module needs attention. */
|
||||||
if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.module_blocked_pipe[0], AE_READABLE,
|
if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, server.module_blocked_pipe[0], AE_READABLE,
|
||||||
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
|
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
|
||||||
serverPanic(
|
serverPanic(
|
||||||
"Error registering the readable event for the module "
|
"Error registering the readable event for the module "
|
||||||
@ -3160,6 +3180,44 @@ void preventCommandReplication(client *c) {
|
|||||||
c->flags |= CLIENT_PREVENT_REPL_PROP;
|
c->flags |= CLIENT_PREVENT_REPL_PROP;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ProcessPendingAsyncWrites()
|
||||||
|
{
|
||||||
|
while(listLength(serverTL->clients_pending_asyncwrite)) {
|
||||||
|
client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite));
|
||||||
|
listDelNode(serverTL->clients_pending_asyncwrite, listFirst(serverTL->clients_pending_asyncwrite));
|
||||||
|
|
||||||
|
serverAssert(c->flags & CLIENT_PENDING_ASYNCWRITE);
|
||||||
|
|
||||||
|
// TODO: Append to end of reply block?
|
||||||
|
|
||||||
|
size_t size = c->bufposAsync;
|
||||||
|
clientReplyBlock *reply = (clientReplyBlock*)zmalloc(size + sizeof(clientReplyBlock), MALLOC_LOCAL);
|
||||||
|
/* take over the allocation's internal fragmentation */
|
||||||
|
reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock);
|
||||||
|
reply->used = c->bufposAsync;
|
||||||
|
memcpy(reply->buf, c->bufAsync, c->bufposAsync);
|
||||||
|
listAddNodeTail(c->listbufferDoneAsync, reply);
|
||||||
|
c->bufposAsync = 0;
|
||||||
|
c->flags &= ~CLIENT_PENDING_ASYNCWRITE;
|
||||||
|
|
||||||
|
// Now install the write event handler
|
||||||
|
int ae_flags = AE_WRITABLE;
|
||||||
|
/* For the fsync=always policy, we want that a given FD is never
|
||||||
|
* served for reading and writing in the same event loop iteration,
|
||||||
|
* so that in the middle of receiving the query, and serving it
|
||||||
|
* to the client, we'll call beforeSleep() that will do the
|
||||||
|
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
|
||||||
|
if (server.aof_state == AOF_ON &&
|
||||||
|
server.aof_fsync == AOF_FSYNC_ALWAYS)
|
||||||
|
{
|
||||||
|
ae_flags |= AE_BARRIER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aeCreateRemoteFileEventSync(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR)
|
||||||
|
freeClientAsync(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Call() is the core of Redis execution of a command.
|
/* Call() is the core of Redis execution of a command.
|
||||||
*
|
*
|
||||||
* The following flags can be passed:
|
* The following flags can be passed:
|
||||||
@ -3314,6 +3372,9 @@ void call(client *c, int flags) {
|
|||||||
}
|
}
|
||||||
redisOpArrayFree(&server.also_propagate);
|
redisOpArrayFree(&server.also_propagate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ProcessPendingAsyncWrites();
|
||||||
|
|
||||||
server.also_propagate = prev_also_propagate;
|
server.also_propagate = prev_also_propagate;
|
||||||
server.stat_numcommands++;
|
server.stat_numcommands++;
|
||||||
}
|
}
|
||||||
@ -3337,6 +3398,9 @@ int processCommand(client *c) {
|
|||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AssertCorrectThread(c);
|
||||||
|
serverAssert(aeThreadOwnsLock());
|
||||||
|
|
||||||
/* Now lookup the command and check ASAP about trivial error conditions
|
/* Now lookup the command and check ASAP about trivial error conditions
|
||||||
* such as wrong arity, bad command name and so forth. */
|
* such as wrong arity, bad command name and so forth. */
|
||||||
c->cmd = c->lastcmd = lookupCommand(ptrFromObj(c->argv[0]));
|
c->cmd = c->lastcmd = lookupCommand(ptrFromObj(c->argv[0]));
|
||||||
@ -3547,10 +3611,10 @@ int processCommand(client *c) {
|
|||||||
void closeListeningSockets(int unlink_unix_socket) {
|
void closeListeningSockets(int unlink_unix_socket) {
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
for (int iel = 0; iel < server.cel; ++iel)
|
for (int iel = 0; iel < server.cthreads; ++iel)
|
||||||
{
|
{
|
||||||
for (j = 0; j < server.rgbindinfo[iel].ipfd_count; j++)
|
for (j = 0; j < server.rgthreadvar[iel].ipfd_count; j++)
|
||||||
close(server.rgbindinfo[iel].ipfd[j]);
|
close(server.rgthreadvar[iel].ipfd[j]);
|
||||||
}
|
}
|
||||||
if (server.sofd != -1) close(server.sofd);
|
if (server.sofd != -1) close(server.sofd);
|
||||||
if (server.cluster_enabled)
|
if (server.cluster_enabled)
|
||||||
@ -4803,12 +4867,14 @@ void *workerThreadMain(void *parg)
|
|||||||
{
|
{
|
||||||
int iel = (int)((int64_t)parg);
|
int iel = (int)((int64_t)parg);
|
||||||
serverLog(LOG_INFO, "Thread %d alive.", iel);
|
serverLog(LOG_INFO, "Thread %d alive.", iel);
|
||||||
|
serverTL = server.rgthreadvar+iel; // set the TLS threadsafe global
|
||||||
|
|
||||||
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
|
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
|
||||||
aeSetBeforeSleepProc(server.rgel[iel], isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
|
aeEventLoop *el = server.rgthreadvar[iel].el;
|
||||||
aeSetAfterSleepProc(server.rgel[iel], isMainThread ? afterSleep : NULL, 0);
|
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
|
||||||
aeMain(server.rgel[iel]);
|
aeSetAfterSleepProc(el, isMainThread ? afterSleep : NULL, 0);
|
||||||
aeDeleteEventLoop(server.rgel[iel]);
|
aeMain(el);
|
||||||
|
aeDeleteEventLoop(el);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4861,6 +4927,10 @@ int main(int argc, char **argv) {
|
|||||||
dictSetHashFunctionSeed((uint8_t*)hashseed);
|
dictSetHashFunctionSeed((uint8_t*)hashseed);
|
||||||
server.sentinel_mode = checkForSentinelMode(argc,argv);
|
server.sentinel_mode = checkForSentinelMode(argc,argv);
|
||||||
initServerConfig();
|
initServerConfig();
|
||||||
|
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
|
{
|
||||||
|
initServerThread(server.rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN);
|
||||||
|
}
|
||||||
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
|
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
|
||||||
basic networking code and client creation depends on it. */
|
basic networking code and client creation depends on it. */
|
||||||
moduleInitModulesSystem();
|
moduleInitModulesSystem();
|
||||||
@ -4974,7 +5044,8 @@ int main(int argc, char **argv) {
|
|||||||
if (background) daemonize();
|
if (background) daemonize();
|
||||||
|
|
||||||
initServer();
|
initServer();
|
||||||
server.cel = 4; //testing
|
|
||||||
|
server.cthreads = 1; //testing
|
||||||
initNetworking(1 /* fReusePort */);
|
initNetworking(1 /* fReusePort */);
|
||||||
|
|
||||||
if (background || server.pidfile) createPidFile();
|
if (background || server.pidfile) createPidFile();
|
||||||
@ -5003,7 +5074,7 @@ int main(int argc, char **argv) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (server.rgbindinfo[IDX_EVENT_LOOP_MAIN].ipfd_count > 0)
|
if (server.rgthreadvar[IDX_EVENT_LOOP_MAIN].ipfd_count > 0)
|
||||||
serverLog(LL_NOTICE,"Ready to accept connections");
|
serverLog(LL_NOTICE,"Ready to accept connections");
|
||||||
if (server.sofd > 0)
|
if (server.sofd > 0)
|
||||||
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
|
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
|
||||||
@ -5016,9 +5087,9 @@ int main(int argc, char **argv) {
|
|||||||
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
|
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
|
||||||
}
|
}
|
||||||
|
|
||||||
serverAssert(server.cel > 0 && server.cel <= MAX_EVENT_LOOPS);
|
serverAssert(server.cthreads > 0 && server.cthreads <= MAX_EVENT_LOOPS);
|
||||||
pthread_t rgthread[MAX_EVENT_LOOPS];
|
pthread_t rgthread[MAX_EVENT_LOOPS];
|
||||||
for (int iel = 0; iel < server.cel; ++iel)
|
for (int iel = 0; iel < server.cthreads; ++iel)
|
||||||
{
|
{
|
||||||
pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel));
|
pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel));
|
||||||
}
|
}
|
||||||
|
62
src/server.h
62
src/server.h
@ -30,6 +30,9 @@
|
|||||||
#ifndef __REDIS_H
|
#ifndef __REDIS_H
|
||||||
#define __REDIS_H
|
#define __REDIS_H
|
||||||
|
|
||||||
|
#define TRUE 1
|
||||||
|
#define FALSE 0
|
||||||
|
|
||||||
#include "fmacros.h"
|
#include "fmacros.h"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
#include "solarisfixes.h"
|
#include "solarisfixes.h"
|
||||||
@ -290,6 +293,7 @@ extern "C" {
|
|||||||
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
|
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
|
||||||
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
|
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
|
||||||
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
|
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
|
||||||
|
#define CLIENT_PENDING_ASYNCWRITE (1<<29) /* client is in the async write list */
|
||||||
|
|
||||||
/* Client block type (btype field in client structure)
|
/* Client block type (btype field in client structure)
|
||||||
* if CLIENT_BLOCKED flag is set. */
|
* if CLIENT_BLOCKED flag is set. */
|
||||||
@ -824,6 +828,7 @@ typedef struct client {
|
|||||||
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
|
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
|
||||||
size_t sentlen; /* Amount of bytes already sent in the current
|
size_t sentlen; /* Amount of bytes already sent in the current
|
||||||
buffer or object being sent. */
|
buffer or object being sent. */
|
||||||
|
size_t sentlenAsync; /* same as sentlen buf for async buffers (which are a different stream) */
|
||||||
time_t ctime; /* Client creation time. */
|
time_t ctime; /* Client creation time. */
|
||||||
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
||||||
time_t obuf_soft_limit_reached_time;
|
time_t obuf_soft_limit_reached_time;
|
||||||
@ -860,6 +865,13 @@ typedef struct client {
|
|||||||
int bufpos;
|
int bufpos;
|
||||||
char buf[PROTO_REPLY_CHUNK_BYTES];
|
char buf[PROTO_REPLY_CHUNK_BYTES];
|
||||||
|
|
||||||
|
/* Async Response Buffer - other threads write here */
|
||||||
|
int bufposAsync;
|
||||||
|
int buflenAsync;
|
||||||
|
char *bufAsync;
|
||||||
|
/* Async Done Buffer, moved after a thread is done async writing */
|
||||||
|
list *listbufferDoneAsync;
|
||||||
|
|
||||||
int iel; /* the event loop index we're registered with */
|
int iel; /* the event loop index we're registered with */
|
||||||
} client;
|
} client;
|
||||||
|
|
||||||
@ -1026,9 +1038,14 @@ struct clusterState;
|
|||||||
#define MAX_EVENT_LOOPS 16
|
#define MAX_EVENT_LOOPS 16
|
||||||
#define IDX_EVENT_LOOP_MAIN 0
|
#define IDX_EVENT_LOOP_MAIN 0
|
||||||
|
|
||||||
struct bindinfo {
|
// Per-thread variabels that may be accessed without a lock
|
||||||
|
struct redisServerThreadVars {
|
||||||
|
aeEventLoop *el;
|
||||||
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
||||||
int ipfd_count; /* Used slots in ipfd[] */
|
int ipfd_count; /* Used slots in ipfd[] */
|
||||||
|
list *clients_pending_write; /* There is to write or install handler. */
|
||||||
|
list *unblocked_clients; /* list of clients to unblock before next loop */
|
||||||
|
list *clients_pending_asyncwrite;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisServer {
|
struct redisServer {
|
||||||
@ -1045,8 +1062,10 @@ struct redisServer {
|
|||||||
redisDb *db;
|
redisDb *db;
|
||||||
dict *commands; /* Command table */
|
dict *commands; /* Command table */
|
||||||
dict *orig_commands; /* Command table before command renaming. */
|
dict *orig_commands; /* Command table before command renaming. */
|
||||||
int cel;
|
|
||||||
aeEventLoop *rgel[MAX_EVENT_LOOPS];
|
int cthreads; /* Number of main worker threads */
|
||||||
|
struct redisServerThreadVars rgthreadvar[MAX_EVENT_LOOPS];
|
||||||
|
|
||||||
unsigned int lruclock; /* Clock for LRU eviction */
|
unsigned int lruclock; /* Clock for LRU eviction */
|
||||||
int shutdown_asap; /* SHUTDOWN needed ASAP */
|
int shutdown_asap; /* SHUTDOWN needed ASAP */
|
||||||
int activerehashing; /* Incremental rehash in serverCron() */
|
int activerehashing; /* Incremental rehash in serverCron() */
|
||||||
@ -1071,13 +1090,11 @@ struct redisServer {
|
|||||||
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
|
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
|
||||||
char *unixsocket; /* UNIX socket path */
|
char *unixsocket; /* UNIX socket path */
|
||||||
mode_t unixsocketperm; /* UNIX socket permission */
|
mode_t unixsocketperm; /* UNIX socket permission */
|
||||||
struct bindinfo rgbindinfo[MAX_EVENT_LOOPS];
|
|
||||||
int sofd; /* Unix socket file descriptor */
|
int sofd; /* Unix socket file descriptor */
|
||||||
int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */
|
int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */
|
||||||
int cfd_count; /* Used slots in cfd[] */
|
int cfd_count; /* Used slots in cfd[] */
|
||||||
list *clients; /* List of active clients */
|
list *clients; /* List of active clients */
|
||||||
list *clients_to_close; /* Clients to close asynchronously */
|
list *clients_to_close; /* Clients to close asynchronously */
|
||||||
list *rgclients_pending_write[MAX_EVENT_LOOPS]; /* There is to write or install handler. */
|
|
||||||
list *slaves, *monitors; /* List of slaves and MONITORs */
|
list *slaves, *monitors; /* List of slaves and MONITORs */
|
||||||
client *current_client; /* Current client, only used on crash report */
|
client *current_client; /* Current client, only used on crash report */
|
||||||
rax *clients_index; /* Active clients dictionary by client ID. */
|
rax *clients_index; /* Active clients dictionary by client ID. */
|
||||||
@ -1298,7 +1315,6 @@ struct redisServer {
|
|||||||
/* Blocked clients */
|
/* Blocked clients */
|
||||||
unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/
|
unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/
|
||||||
unsigned int blocked_clients_by_type[BLOCKED_NUM];
|
unsigned int blocked_clients_by_type[BLOCKED_NUM];
|
||||||
list *rgunblocked_clients[MAX_EVENT_LOOPS]; /* list of clients to unblock before next loop */
|
|
||||||
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
list *ready_keys; /* List of readyList structures for BLPOP & co */
|
||||||
/* Sort parameters - qsort_r() is only available under BSD so we
|
/* Sort parameters - qsort_r() is only available under BSD so we
|
||||||
* have to take this state global, in order to pass it to sortCompare() */
|
* have to take this state global, in order to pass it to sortCompare() */
|
||||||
@ -1484,6 +1500,7 @@ typedef struct {
|
|||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
extern struct redisServer server;
|
extern struct redisServer server;
|
||||||
|
extern __thread struct redisServerThreadVars *serverTL; // thread local server vars
|
||||||
extern struct sharedObjectsStruct shared;
|
extern struct sharedObjectsStruct shared;
|
||||||
extern dictType objectKeyPointerValueDictType;
|
extern dictType objectKeyPointerValueDictType;
|
||||||
extern dictType objectKeyHeapPointerValueDictType;
|
extern dictType objectKeyHeapPointerValueDictType;
|
||||||
@ -1589,7 +1606,7 @@ void rewriteClientCommandVector(client *c, int argc, ...);
|
|||||||
void rewriteClientCommandArgument(client *c, int i, robj *newval);
|
void rewriteClientCommandArgument(client *c, int i, robj *newval);
|
||||||
void replaceClientCommandVector(client *c, int argc, robj **argv);
|
void replaceClientCommandVector(client *c, int argc, robj **argv);
|
||||||
unsigned long getClientOutputBufferMemoryUsage(client *c);
|
unsigned long getClientOutputBufferMemoryUsage(client *c);
|
||||||
void freeClientsInAsyncFreeQueue(void);
|
void freeClientsInAsyncFreeQueue(int iel);
|
||||||
void asyncCloseClientOnOutputBufferLimitReached(client *c);
|
void asyncCloseClientOnOutputBufferLimitReached(client *c);
|
||||||
int getClientType(client *c);
|
int getClientType(client *c);
|
||||||
int getClientTypeByName(const char *name);
|
int getClientTypeByName(const char *name);
|
||||||
@ -1601,13 +1618,28 @@ void pauseClients(mstime_t duration);
|
|||||||
int clientsArePaused(void);
|
int clientsArePaused(void);
|
||||||
int processEventsWhileBlocked(int iel);
|
int processEventsWhileBlocked(int iel);
|
||||||
int handleClientsWithPendingWrites(int iel);
|
int handleClientsWithPendingWrites(int iel);
|
||||||
int clientHasPendingReplies(client *c);
|
int clientHasPendingReplies(client *c, int fIncludeAsync);
|
||||||
void unlinkClient(client *c);
|
void unlinkClient(client *c);
|
||||||
int writeToClient(int fd, client *c, int handler_installed);
|
int writeToClient(int fd, client *c, int handler_installed);
|
||||||
void linkClient(client *c);
|
void linkClient(client *c);
|
||||||
void protectClient(client *c);
|
void protectClient(client *c);
|
||||||
void unprotectClient(client *c);
|
void unprotectClient(client *c);
|
||||||
|
|
||||||
|
// Special Thread-safe addReply() commands for posting messages to clients from a different thread
|
||||||
|
void addReplyAsync(client *c, robj *obj);
|
||||||
|
void addReplyArrayLenAsync(client *c, long length);
|
||||||
|
void addReplyBulkAsync(client *c, robj *obj);
|
||||||
|
void addReplyBulkCBufferAsync(client *c, const void *p, size_t len);
|
||||||
|
void addReplyErrorAsync(client *c, const char *err);
|
||||||
|
void addReplyMapLenAsync(client *c, long length);
|
||||||
|
void addReplyNullAsync(client *c);
|
||||||
|
void addReplyDoubleAsync(client *c, double d);
|
||||||
|
void *addReplyDeferredLenAsync(client *c);
|
||||||
|
void setDeferredArrayLenAsync(client *c, void *node, long length);
|
||||||
|
void addReplySdsAsync(client *c, sds s);
|
||||||
|
void addReplyBulkSdsAsync(client *c, sds s);
|
||||||
|
void addReplyPushLenAsync(client *c, long length);
|
||||||
|
|
||||||
#ifdef __GNUC__
|
#ifdef __GNUC__
|
||||||
void addReplyErrorFormat(client *c, const char *fmt, ...)
|
void addReplyErrorFormat(client *c, const char *fmt, ...)
|
||||||
__attribute__((format(printf, 2, 3)));
|
__attribute__((format(printf, 2, 3)));
|
||||||
@ -1694,7 +1726,7 @@ unsigned long long estimateObjectIdleTime(robj *o);
|
|||||||
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
|
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
|
||||||
|
|
||||||
/* Synchronous I/O with timeout */
|
/* Synchronous I/O with timeout */
|
||||||
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout);
|
ssize_t syncWrite(int fd, const char *ptr, ssize_t size, long long timeout);
|
||||||
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
|
ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
|
||||||
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
|
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
|
||||||
|
|
||||||
@ -2301,15 +2333,21 @@ void xorDigest(unsigned char *digest, void *ptr, size_t len);
|
|||||||
inline int ielFromEventLoop(const aeEventLoop *eventLoop)
|
inline int ielFromEventLoop(const aeEventLoop *eventLoop)
|
||||||
{
|
{
|
||||||
int iel = 0;
|
int iel = 0;
|
||||||
for (; iel < server.cel; ++iel)
|
for (; iel < server.cthreads; ++iel)
|
||||||
{
|
{
|
||||||
if (server.rgel[iel] == eventLoop)
|
if (server.rgthreadvar[iel].el == eventLoop)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
serverAssert(iel < server.cel);
|
serverAssert(iel < server.cthreads);
|
||||||
return iel;
|
return iel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline int FCorrectThread(client *c)
|
||||||
|
{
|
||||||
|
return server.rgthreadvar[c->iel].el == serverTL->el;
|
||||||
|
}
|
||||||
|
#define AssertCorrectThread(c) serverAssert(FCorrectThread(c))
|
||||||
|
|
||||||
#define redisDebug(fmt, ...) \
|
#define redisDebug(fmt, ...) \
|
||||||
printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
|
printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
|
||||||
#define redisDebugMark() \
|
#define redisDebugMark() \
|
||||||
|
@ -46,7 +46,7 @@
|
|||||||
* done within 'timeout' milliseconds the operation succeeds and 'size' is
|
* done within 'timeout' milliseconds the operation succeeds and 'size' is
|
||||||
* returned. Otherwise the operation fails, -1 is returned, and an unspecified
|
* returned. Otherwise the operation fails, -1 is returned, and an unspecified
|
||||||
* partial write could be performed against the file descriptor. */
|
* partial write could be performed against the file descriptor. */
|
||||||
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) {
|
ssize_t syncWrite(int fd, const char *ptr, ssize_t size, long long timeout) {
|
||||||
ssize_t nwritten, ret = size;
|
ssize_t nwritten, ret = size;
|
||||||
long long start = mstime();
|
long long start = mstime();
|
||||||
long long remaining = timeout;
|
long long remaining = timeout;
|
||||||
|
10
src/t_list.c
10
src/t_list.c
@ -547,7 +547,7 @@ void lremCommand(client *c) {
|
|||||||
* as well. This command was originally proposed by Ezra Zygmuntowicz.
|
* as well. This command was originally proposed by Ezra Zygmuntowicz.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
|
static void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
|
||||||
/* Create the list if the key does not exist */
|
/* Create the list if the key does not exist */
|
||||||
if (!dstobj) {
|
if (!dstobj) {
|
||||||
dstobj = createQuicklistObject();
|
dstobj = createQuicklistObject();
|
||||||
@ -559,7 +559,7 @@ void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
|
|||||||
listTypePush(dstobj,value,LIST_HEAD);
|
listTypePush(dstobj,value,LIST_HEAD);
|
||||||
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
|
notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
|
||||||
/* Always send the pushed value to the client. */
|
/* Always send the pushed value to the client. */
|
||||||
addReplyBulk(c,value);
|
addReplyBulkAsync(c,value);
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpoplpushCommand(client *c) {
|
void rpoplpushCommand(client *c) {
|
||||||
@ -639,9 +639,9 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
|||||||
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
|
|
||||||
/* BRPOP/BLPOP */
|
/* BRPOP/BLPOP */
|
||||||
addReplyArrayLen(receiver,2);
|
addReplyArrayLenAsync(receiver,2);
|
||||||
addReplyBulk(receiver,key);
|
addReplyBulkAsync(receiver,key);
|
||||||
addReplyBulk(receiver,value);
|
addReplyBulkAsync(receiver,value);
|
||||||
|
|
||||||
/* Notify event. */
|
/* Notify event. */
|
||||||
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
|
||||||
|
@ -776,11 +776,16 @@ int streamDeleteItem(stream *s, streamID *id) {
|
|||||||
/* Emit a reply in the client output buffer by formatting a Stream ID
|
/* Emit a reply in the client output buffer by formatting a Stream ID
|
||||||
* in the standard <ms>-<seq> format, using the simple string protocol
|
* in the standard <ms>-<seq> format, using the simple string protocol
|
||||||
* of REPL. */
|
* of REPL. */
|
||||||
void addReplyStreamID(client *c, streamID *id) {
|
static void addReplyStreamID(client *c, streamID *id) {
|
||||||
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
|
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
|
||||||
addReplyBulkSds(c,replyid);
|
addReplyBulkSds(c,replyid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void addReplyStreamIDAsync(client *c, streamID *id) {
|
||||||
|
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
|
||||||
|
addReplyBulkSdsAsync(c,replyid);
|
||||||
|
}
|
||||||
|
|
||||||
/* Similar to the above function, but just creates an object, usually useful
|
/* Similar to the above function, but just creates an object, usually useful
|
||||||
* for replication purposes to create arguments. */
|
* for replication purposes to create arguments. */
|
||||||
robj *createObjectFromStreamID(streamID *id) {
|
robj *createObjectFromStreamID(streamID *id) {
|
||||||
@ -914,7 +919,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!(flags & STREAM_RWR_RAWENTRIES))
|
if (!(flags & STREAM_RWR_RAWENTRIES))
|
||||||
arraylen_ptr = addReplyDeferredLen(c);
|
arraylen_ptr = addReplyDeferredLenAsync(c);
|
||||||
streamIteratorStart(&si,s,start,end,rev);
|
streamIteratorStart(&si,s,start,end,rev);
|
||||||
while(streamIteratorGetID(&si,&id,&numfields)) {
|
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||||
/* Update the group last_id if needed. */
|
/* Update the group last_id if needed. */
|
||||||
@ -925,18 +930,18 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
|
|
||||||
/* Emit a two elements array for each item. The first is
|
/* Emit a two elements array for each item. The first is
|
||||||
* the ID, the second is an array of field-value pairs. */
|
* the ID, the second is an array of field-value pairs. */
|
||||||
addReplyArrayLen(c,2);
|
addReplyArrayLenAsync(c,2);
|
||||||
addReplyStreamID(c,&id);
|
addReplyStreamIDAsync(c,&id);
|
||||||
|
|
||||||
addReplyMapLen(c,numfields);
|
addReplyMapLenAsync(c,numfields);
|
||||||
|
|
||||||
/* Emit the field-value pairs. */
|
/* Emit the field-value pairs. */
|
||||||
while(numfields--) {
|
while(numfields--) {
|
||||||
unsigned char *key, *value;
|
unsigned char *key, *value;
|
||||||
int64_t key_len, value_len;
|
int64_t key_len, value_len;
|
||||||
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
|
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
|
||||||
addReplyBulkCBuffer(c,key,key_len);
|
addReplyBulkCBufferAsync(c,key,key_len);
|
||||||
addReplyBulkCBuffer(c,value,value_len);
|
addReplyBulkCBufferAsync(c,value,value_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If a group is passed, we need to create an entry in the
|
/* If a group is passed, we need to create an entry in the
|
||||||
@ -994,7 +999,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||||||
if (count && count == arraylen) break;
|
if (count && count == arraylen) break;
|
||||||
}
|
}
|
||||||
streamIteratorStop(&si);
|
streamIteratorStop(&si);
|
||||||
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
|
if (arraylen_ptr) setDeferredArrayLenAsync(c,arraylen_ptr,arraylen);
|
||||||
return arraylen;
|
return arraylen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
12
src/t_zset.c
12
src/t_zset.c
@ -3155,15 +3155,15 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
|||||||
|
|
||||||
/* No candidate for zpopping, return empty. */
|
/* No candidate for zpopping, return empty. */
|
||||||
if (!zobj) {
|
if (!zobj) {
|
||||||
addReplyNull(c);
|
addReplyNullAsync(c);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *arraylen_ptr = addReplyDeferredLen(c);
|
void *arraylen_ptr = addReplyDeferredLenAsync(c);
|
||||||
long arraylen = 0;
|
long arraylen = 0;
|
||||||
|
|
||||||
/* We emit the key only for the blocking variant. */
|
/* We emit the key only for the blocking variant. */
|
||||||
if (emitkey) addReplyBulk(c,key);
|
if (emitkey) addReplyBulkAsync(c,key);
|
||||||
|
|
||||||
/* Remove the element. */
|
/* Remove the element. */
|
||||||
do {
|
do {
|
||||||
@ -3213,8 +3213,8 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
|||||||
signalModifiedKey(c->db,key);
|
signalModifiedKey(c->db,key);
|
||||||
}
|
}
|
||||||
|
|
||||||
addReplyBulkCBuffer(c,ele,sdslen(ele));
|
addReplyBulkCBufferAsync(c,ele,sdslen(ele));
|
||||||
addReplyDouble(c,score);
|
addReplyDoubleAsync(c,score);
|
||||||
sdsfree(ele);
|
sdsfree(ele);
|
||||||
arraylen += 2;
|
arraylen += 2;
|
||||||
|
|
||||||
@ -3226,7 +3226,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey
|
|||||||
}
|
}
|
||||||
} while(--count);
|
} while(--count);
|
||||||
|
|
||||||
setDeferredArrayLen(c,arraylen_ptr,arraylen + (emitkey != 0));
|
setDeferredArrayLenAsync(c,arraylen_ptr,arraylen + (emitkey != 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ZPOPMIN key [<count>] */
|
/* ZPOPMIN key [<count>] */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user