From 30e8a859c0300b244bbf3065fd460ae8192efd55 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 22 Feb 2019 01:24:16 -0500 Subject: [PATCH] Prevent mixed up client replies, and deadlocks --- src/aof.c | 4 ++ src/blocked.c | 13 +++- src/cluster.h | 8 +++ src/db.c | 1 + src/evict.c | 2 + src/module.c | 2 +- src/networking.cpp | 152 +++++++++++++++++++++----------------------- src/object.c | 2 + src/pubsub.c | 4 ++ src/rand.h | 8 +++ src/rdb.c | 2 + src/replication.cpp | 30 ++++++++- src/scripting.cpp | 122 ++++++++++++++++++----------------- src/server.c | 12 ++-- src/server.h | 10 ++- src/sha1.h | 9 +++ src/t_list.c | 1 + 17 files changed, 232 insertions(+), 150 deletions(-) diff --git a/src/aof.c b/src/aof.c index 133d1e7e9..f19affc64 100644 --- a/src/aof.c +++ b/src/aof.c @@ -655,6 +655,8 @@ struct client *createFakeClient(void) { c->puser = NULL; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); + fastlock_init(&c->lock); + fastlock_lock(&c->lock); initClientMultiState(c); return c; } @@ -672,6 +674,8 @@ void freeFakeClient(struct client *c) { listRelease(c->reply); listRelease(c->watched_keys); freeClientMultiState(c); + fastlock_unlock(&c->lock); + fastlock_free(&c->lock); zfree(c); } diff --git a/src/blocked.c b/src/blocked.c index 9344cfeb2..c467ebf2c 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -223,7 +223,8 @@ void disconnectAllBlockedClients(void) { listRewind(server.clients,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); - + + fastlock_lock(&c->lock); if (c->flags & CLIENT_BLOCKED) { addReplySdsAsync(c,sdsnew( "-UNBLOCKED force unblock from blocking operation, " @@ -231,6 +232,7 @@ void disconnectAllBlockedClients(void) { unblockClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; } + fastlock_unlock(&c->lock); } } @@ -309,6 +311,7 @@ void handleClientsBlockedOnKeys(void) { * freed by the next unblockClient() * call. */ if (dstkey) incrRefCount(dstkey); + fastlock_lock(&receiver->lock); unblockClient(receiver); if (serveClientBlockedOnList(receiver, @@ -321,6 +324,7 @@ void handleClientsBlockedOnKeys(void) { } if (dstkey) decrRefCount(dstkey); + fastlock_unlock(&receiver->lock); decrRefCount(value); } else { break; @@ -360,6 +364,7 @@ void handleClientsBlockedOnKeys(void) { continue; } + fastlock_lock(&receiver->lock); int where = (receiver->lastcmd && receiver->lastcmd->proc == bzpopminCommand) ? ZSET_MIN : ZSET_MAX; @@ -377,6 +382,7 @@ void handleClientsBlockedOnKeys(void) { incrRefCount(rl->key); propagate(cmd,receiver->db->id, argv,2,PROPAGATE_AOF|PROPAGATE_REPL); + fastlock_unlock(&receiver->lock); decrRefCount(argv[0]); decrRefCount(argv[1]); } @@ -419,10 +425,12 @@ void handleClientsBlockedOnKeys(void) { /* If the group was not found, send an error * to the consumer. */ if (!group) { + fastlock_lock(&receiver->lock); addReplyErrorAsync(receiver, "-NOGROUP the consumer group this client " "was blocked on no longer exists"); unblockClient(receiver); + fastlock_unlock(&receiver->lock); continue; } else { *gt = group->last_id; @@ -444,6 +452,8 @@ void handleClientsBlockedOnKeys(void) { noack = receiver->bpop.xread_group_noack; } + fastlock_lock(&receiver->lock); + /* Emit the two elements sub-array consisting of * the name of the stream and the data we * extracted from it. Wrapped in a single-item @@ -469,6 +479,7 @@ void handleClientsBlockedOnKeys(void) { * valid, so we must do the setup above before * this call. */ unblockClient(receiver); + fastlock_unlock(&receiver->lock); } } } diff --git a/src/cluster.h b/src/cluster.h index 571b9c543..ea4f51c78 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -1,6 +1,10 @@ #ifndef __CLUSTER_H #define __CLUSTER_H +#ifdef __cplusplus +extern "C" { +#endif + /*----------------------------------------------------------------------------- * Redis cluster data structures, defines, exported API. *----------------------------------------------------------------------------*/ @@ -287,4 +291,8 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in int clusterRedirectBlockedClientIfNeeded(client *c); void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code); +#ifdef __cplusplus +} +#endif + #endif /* __CLUSTER_H */ diff --git a/src/db.c b/src/db.c index c31d4898e..67631a597 100644 --- a/src/db.c +++ b/src/db.c @@ -1110,6 +1110,7 @@ long long getExpire(redisDb *db, robj *key) { * will be consistent even if we allow write operations against expiring * keys. */ void propagateExpire(redisDb *db, robj *key, int lazy) { + serverAssert(aeThreadOwnsLock()); robj *argv[2]; argv[0] = lazy ? shared.unlink : shared.del; diff --git a/src/evict.c b/src/evict.c index 28cd73f6f..48d6d0387 100644 --- a/src/evict.c +++ b/src/evict.c @@ -350,6 +350,7 @@ unsigned long LFUDecrAndReturn(robj *o) { * used memory: the eviction should use mostly data size. This function * returns the sum of AOF and slaves buffer. */ size_t freeMemoryGetNotCountedMemory(void) { + serverAssert(aeThreadOwnsLock()); size_t overhead = 0; int slaves = listLength(server.slaves); @@ -444,6 +445,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ int freeMemoryIfNeeded(void) { + serverAssert(aeThreadOwnsLock()); /* By default replicas should ignore maxmemory * and just be masters exact copies. */ if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; diff --git a/src/module.c b/src/module.c index d2601e16f..ea5f3271c 100644 --- a/src/module.c +++ b/src/module.c @@ -3696,7 +3696,7 @@ void moduleHandleBlockedClients(void) { /* Put the client in the list of clients that need to write * if there are pending replies here. This is needed since * during a non blocking command the client may receive output. */ - if (clientHasPendingReplies(c, FALSE) && + if (clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_WRITE)) { c->flags |= CLIENT_PENDING_WRITE; diff --git a/src/networking.cpp b/src/networking.cpp index b629a4e2c..435241a7b 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -45,15 +45,25 @@ class AeLocker bool m_fArmed = false; public: - AeLocker(bool fArm = false) + AeLocker() { - if (fArm) - arm(); } - void arm() + void arm(client *c) // if a client is passed, then the client is already locked { - if (!m_fArmed) + if (c != nullptr) + { + serverAssert(!m_fArmed); + serverAssert(c->lock.fOwnLock()); + while (!aeTryAcquireLock()) + { + c->lock.unlock(); + // give a chance for the global lock to progress if they were waiting on the client + c->lock.lock(); + } + m_fArmed = true; + } + else if (!m_fArmed) { m_fArmed = true; aeAcquireLock(); @@ -204,9 +214,6 @@ client *createClient(int fd, int iel) { c->bufAsync = NULL; c->buflenAsync = 0; c->bufposAsync = 0; - c->listbufferDoneAsync = listCreate(); - listSetFreeMethod(c->listbufferDoneAsync,freeClientReplyValue); - listSetDupMethod(c->listbufferDoneAsync,dupClientReplyValue); listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); @@ -276,6 +283,7 @@ void clientInstallAsyncWriteHandler(client *c) { int prepareClientToWrite(client *c, bool fAsync) { fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread serverAssert(!fAsync || aeThreadOwnsLock()); + serverAssert(c->lock.fOwnLock()); /* If it's the Lua client we always return ok without installing any * handler since there is no socket at all. */ @@ -293,7 +301,7 @@ int prepareClientToWrite(client *c, bool fAsync) { /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). */ - if (!fAsync && !clientHasPendingReplies(c, FALSE)) clientInstallWriteHandler(c); + if (!fAsync && !clientHasPendingReplies(c)) clientInstallWriteHandler(c); if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1014,8 +1022,8 @@ void copyClientOutputBuffer(client *dst, client *src) { /* Return true if the specified client has pending reply buffers to write to * the socket. */ -int clientHasPendingReplies(client *c, int fIncludeAsync) { - return c->bufpos || listLength(c->reply) || (fIncludeAsync && listLength(c->listbufferDoneAsync)); +int clientHasPendingReplies(client *c) { + return c->bufpos || listLength(c->reply); } #define MAX_ACCEPTS_PER_CALL 1000 @@ -1149,6 +1157,7 @@ static void freeClientArgv(client *c) { * when we resync with our own master and want to force all our slaves to * resync with us as well. */ void disconnectSlaves(void) { + serverAssert(aeThreadOwnsLock()); std::vector vecfreeImmediate; listNode *ln; listIter li; @@ -1327,7 +1336,6 @@ void freeClient(client *c) { /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ zfree(c->bufAsync); - listRelease(c->listbufferDoneAsync); if (c->name) decrRefCount(c->name); zfree(c->argv); freeClientMultiState(c); @@ -1342,10 +1350,10 @@ void freeClient(client *c) { * should be valid for the continuation of the flow of the program. */ void freeClientAsync(client *c) { if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; - aeAcquireLock(); + AeLocker lock; + lock.arm(nullptr); c->flags |= CLIENT_CLOSE_ASAP; listAddNodeTail(server.clients_to_close,c); - aeReleaseLock(); } void freeClientsInAsyncFreeQueue(int iel) { @@ -1381,48 +1389,10 @@ int writeToClient(int fd, client *c, int handler_installed) { clientReplyBlock *o; AssertCorrectThread(c); - // Decide up front if we are sending the done buffer. This prevents us from completing - // a transmission on another thread while transmitting the thread local buffer, resulting in us - // overlapping messages - AeLocker locker(true); - std::lock_guardlock)> lock(c->lock); // To prevent deadlocks this must be after we acquire the global lock - int fSendAsyncBuffer = listLength(c->listbufferDoneAsync) && (c->sentlen == 0 || c->sentlenAsync > 0); - if (!fSendAsyncBuffer) - locker.disarm(); + std::unique_locklock)> lock(c->lock); - while(fSendAsyncBuffer || clientHasPendingReplies(c, FALSE)) { - if (fSendAsyncBuffer) { - o = (clientReplyBlock*)listNodeValue(listFirst(c->listbufferDoneAsync)); - if (o->used == 0) { - listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync)); - if (listLength(c->listbufferDoneAsync) == 0) - { - fSendAsyncBuffer = 0; - locker.disarm(); - } - continue; - } - - nwritten = write(fd, o->buf() + c->sentlen, o->used - c->sentlen); - if (nwritten <= 0) - break; - c->sentlenAsync += nwritten; - totwritten += nwritten; - - /* If we fully sent the object on head go to the next one */ - if (c->sentlenAsync == o->used) { - listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync)); - c->sentlenAsync = 0; - /* If there are no longer objects in the list, we expect - * the count of reply bytes to be exactly zero. */ - if (listLength(c->listbufferDoneAsync) == 0) - { - fSendAsyncBuffer = 0; - locker.disarm(); - continue; - } - } - } else if (c->bufpos > 0) { + while(clientHasPendingReplies(c)) { + if (c->bufpos > 0) { nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; @@ -1486,9 +1456,17 @@ int writeToClient(int fd, client *c, int handler_installed) { } else { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); - aeAcquireLock(); - freeClient(c); - aeReleaseLock(); + if (aeTryAcquireLock()) + { + freeClient(c); + aeReleaseLock(); + } + else + { + lock.unlock(); + freeClientAsync(c); + } + return C_ERR; } } @@ -1499,15 +1477,22 @@ int writeToClient(int fd, client *c, int handler_installed) { * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; } - if (!clientHasPendingReplies(c, TRUE)) { + if (!clientHasPendingReplies(c)) { c->sentlen = 0; if (handler_installed) aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { - aeAcquireLock(); - freeClient(c); - aeReleaseLock(); + if (aeTryAcquireLock()) + { + freeClient(c); + aeReleaseLock(); + } + else + { + lock.unlock(); + freeClientAsync(c); + } return C_ERR; } } @@ -1542,7 +1527,9 @@ void ProcessPendingAsyncWrites() reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock); reply->used = c->bufposAsync; memcpy(reply->buf(), c->bufAsync, c->bufposAsync); - listAddNodeTail(c->listbufferDoneAsync, reply); + listAddNodeTail(c->reply, reply); + c->reply_bytes += reply->size; + c->bufposAsync = 0; c->buflenAsync = 0; zfree(c->bufAsync); @@ -1566,6 +1553,7 @@ void ProcessPendingAsyncWrites() (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) continue; + asyncCloseClientOnOutputBufferLimitReached(c); if (aeCreateRemoteFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR) continue; // We can retry later in the cron } @@ -1601,7 +1589,7 @@ int handleClientsWithPendingWrites(int iel) { /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ - if (clientHasPendingReplies(c, TRUE)) { + if (clientHasPendingReplies(c)) { int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, @@ -1619,7 +1607,8 @@ int handleClientsWithPendingWrites(int iel) { } } - AeLocker locker(true); + AeLocker locker; + locker.arm(nullptr); ProcessPendingAsyncWrites(); return processed; @@ -1675,7 +1664,7 @@ void unprotectClient(client *c) { if (c->flags & CLIENT_PROTECTED) { c->flags &= ~CLIENT_PROTECTED; aeCreateFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c); - if (clientHasPendingReplies(c, TRUE)) clientInstallWriteHandler(c); + if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); } } @@ -1967,7 +1956,8 @@ void processInputBuffer(client *c) { } else { serverPanic("Unknown request type"); } - AeLocker locker(true); + AeLocker locker; + locker.arm(c); server.current_client = c; /* Multibulk processing could see a <= 0 length. */ @@ -2033,9 +2023,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); serverAssert(mask & AE_READ_THREADSAFE); serverAssert(c->iel == ielFromEventLoop(el)); - AeLocker locker; + + AeLocker aelock; AssertCorrectThread(c); - std::lock_guardlock)> lock(c->lock); + std::unique_locklock)> lock(c->lock, std::defer_lock); + if (!lock.try_lock()) + return; // Process something else while we wait readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply @@ -2065,16 +2058,14 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); - aeAcquireLock(); + aelock.arm(c); freeClient(c); - aeReleaseLock(); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); - aeAcquireLock(); + aelock.arm(c); freeClient(c); - aeReleaseLock(); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer @@ -2095,9 +2086,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); - aeAcquireLock(); + aelock.arm(c); freeClient(c); - aeReleaseLock(); return; } @@ -2108,9 +2098,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ processInputBufferAndReplicate(c); - aeAcquireLock(); + aelock.arm(c); ProcessPendingAsyncWrites(); - aeReleaseLock(); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -2619,7 +2608,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * enforcing the client output length limits. */ unsigned long getClientOutputBufferMemoryUsage(client *c) { unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); - return c->reply_bytes + (list_item_size*listLength(c->reply)); + return c->reply_bytes + (list_item_size*listLength(c->reply)) + c->buflenAsync; } /* Get the class of a client, used in order to enforce limits to different @@ -2727,6 +2716,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) { * This is also called by SHUTDOWN for a best-effort attempt to send * slaves the latest writes. */ void flushSlavesOutputBuffers(void) { + serverAssert(aeThreadOwnsLock()); listIter li; listNode *ln; @@ -2747,7 +2737,7 @@ void flushSlavesOutputBuffers(void) { events = aeGetFileEvents(server.rgthreadvar[slave->iel].el,slave->fd); if (events & AE_WRITABLE && slave->replstate == SLAVE_STATE_ONLINE && - clientHasPendingReplies(slave, TRUE)) + clientHasPendingReplies(slave)) { writeToClient(slave->fd,slave,0); } @@ -2820,8 +2810,7 @@ int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; - // BUGBUG - This function isn't fair - why should clients on this thread get to run, but not clients elsewhere? - // We mix up replies when releasing the lock here so more work is needed to fix this + aeReleaseLock(); while (iterations--) { int events = 0; events += aeProcessEvents(server.rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); @@ -2829,5 +2818,6 @@ int processEventsWhileBlocked(int iel) { if (!events) break; count += events; } + aeAcquireLock(); return count; } diff --git a/src/object.c b/src/object.c index ec211cbf0..97d4887ae 100644 --- a/src/object.c +++ b/src/object.c @@ -940,6 +940,7 @@ void freeMemoryOverheadData(struct redisMemOverhead *mh) { * information used for the MEMORY OVERHEAD and INFO command. The returned * structure pointer should be freed calling freeMemoryOverheadData(). */ struct redisMemOverhead *getMemoryOverheadData(void) { + serverAssert(aeThreadOwnsLock()); int j; size_t mem_total = 0; size_t mem = 0; @@ -1077,6 +1078,7 @@ void inputCatSds(void *result, const char *str) { /* This implements MEMORY DOCTOR. An human readable analysis of the Redis * memory condition. */ sds getMemoryDoctorReport(void) { + serverAssert(aeThreadOwnsLock()); int empty = 0; /* Instance is empty or almost empty. */ int big_peak = 0; /* Memory peak is much larger than used mem. */ int high_frag = 0; /* High fragmentation. */ diff --git a/src/pubsub.c b/src/pubsub.c index ccd631c28..af064d06a 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -293,7 +293,9 @@ int pubsubPublishMessage(robj *channel, robj *message) { listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; + fastlock_lock(&c->lock); addReplyPubsubMessage(c,channel,message); + fastlock_unlock(&c->lock); receivers++; } } @@ -309,8 +311,10 @@ int pubsubPublishMessage(robj *channel, robj *message) { (char*)ptrFromObj(channel), sdslen(ptrFromObj(channel)),0)) { + fastlock_lock(&pat->pclient->lock); addReplyPubsubPatMessage(pat->pclient, pat->pattern,channel,message); + fastlock_unlock(&pat->pclient->lock); receivers++; } } diff --git a/src/rand.h b/src/rand.h index 1dce3e8b0..c6a9ae454 100644 --- a/src/rand.h +++ b/src/rand.h @@ -30,9 +30,17 @@ #ifndef REDIS_RANDOM_H #define REDIS_RANDOM_H +#ifdef __cplusplus +extern "C" { +#endif + int32_t redisLrand48(); void redisSrand48(int32_t seedval); +#ifdef __cplusplus +} +#endif + #define REDIS_LRAND48_MAX INT32_MAX #endif diff --git a/src/rdb.c b/src/rdb.c index f863da6f5..9940a0d52 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2140,6 +2140,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { * This function covers the case of RDB -> Salves socket transfers for * diskless replication. */ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { + serverAssert(aeThreadOwnsLock()); uint64_t *ok_slaves; if (!bysignal && exitcode == 0) { @@ -2259,6 +2260,7 @@ void killRDBChild(void) { /* Spawn an RDB child that writes the RDB to the sockets of the slaves * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { + serverAssert(aeThreadOwnsLock()); int *fds; uint64_t *clientids; int numfds; diff --git a/src/replication.cpp b/src/replication.cpp index cf5e02395..7b4409e50 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -37,6 +37,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(int newfd); @@ -115,6 +116,7 @@ void resizeReplicationBacklog(long long newsize) { } void freeReplicationBacklog(void) { + serverAssert(aeThreadOwnsLock()); serverAssert(listLength(server.slaves) == 0); zfree(server.repl_backlog); server.repl_backlog = NULL; @@ -200,6 +202,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); + /* Get the lock on all slaves */ + listRewind(slaves,&li); + while((ln = listNext(&li))) { + ((client*)ln->value)->lock.lock(); + } + /* Send SELECT command to every slave if needed. */ if (server.slaveseldb != dictid) { robj *selectcmd; @@ -280,6 +288,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { for (j = 0; j < argc; j++) addReplyBulkAsync(slave,argv[j]); } + + /* Release the lock on all slaves */ + listRewind(slaves,&li); + while((ln = listNext(&li))) { + ((client*)ln->value)->lock.unlock(); + } } /* This function is used in order to proxy what we receive from our master @@ -303,6 +317,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = (client*)ln->value; + std::lock_guardlock)> ulock(slave->lock); /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; @@ -348,6 +363,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors,&li); while((ln = listNext(&li))) { client *monitor = (client*)ln->value; + std::lock_guardlock)> lock(monitor->lock); addReplyAsync(monitor,cmdobj); } decrRefCount(cmdobj); @@ -459,6 +475,7 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) { * On success return C_OK, otherwise C_ERR is returned and we proceed * with the usual full resync. */ int masterTryPartialResynchronization(client *c) { + serverAssert(aeThreadOwnsLock()); long long psync_offset, psync_len; char *master_replid = (char*)ptrFromObj(c->argv[1]); char buf[128]; @@ -575,6 +592,7 @@ need_full_resync: * * Returns C_OK on success or C_ERR otherwise. */ int startBgsaveForReplication(int mincapa) { + serverAssert(aeThreadOwnsLock()); int retval; int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); listIter li; @@ -653,7 +671,7 @@ void syncCommand(client *c) { * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ - if (clientHasPendingReplies(c, TRUE)) { + if (clientHasPendingReplies(c)) { addReplyError(c,"SYNC and PSYNC are invalid with pending output"); return; } @@ -1637,6 +1655,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { /* This handler fires when the non blocking connect was able to * establish a connection with the master. */ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { + serverAssert(aeThreadOwnsLock()); char tmpfile[256], *err = NULL; int dfd = -1, maxtries = 5; int sockerr = 0, psync_result; @@ -2211,7 +2230,6 @@ void replicationCacheMaster(client *c) { if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); c->sentlen = 0; - listEmpty(c->listbufferDoneAsync); c->sentlenAsync = 0; c->reply_bytes = 0; c->bufpos = 0; @@ -2299,7 +2317,7 @@ void replicationResurrectCachedMaster(int newfd) { /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ - if (clientHasPendingReplies(server.master, TRUE)) { + if (clientHasPendingReplies(server.master)) { if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE|AE_WRITE_THREADSAFE, sendReplyToClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); @@ -2527,6 +2545,7 @@ void processClientsWaitingReplicas(void) { listRewind(server.clients_waiting_acks,&li); while((ln = listNext(&li))) { client *c = (client*)ln->value; + fastlock_lock(&c->lock); /* Every time we find a client that is satisfied for a given * offset and number of replicas, we remember it so the next client @@ -2547,6 +2566,7 @@ void processClientsWaitingReplicas(void) { addReplyLongLongAsync(c,numreplicas); } } + fastlock_unlock(&c->lock); } } @@ -2574,7 +2594,11 @@ long long replicationGetSlaveOffset(void) { /* Replication cron function, called 1 time per second. */ void replicationCron(void) { + serverAssert(aeThreadOwnsLock()); static long long replication_cron_loops = 0; + std::unique_locklock)> ulock; + if (server.master != nullptr) + ulock = decltype(ulock)(server.master->lock); /* Non blocking connection timeout? */ if (server.masterhost && diff --git a/src/scripting.cpp b/src/scripting.cpp index 87577fb7d..71d1a2815 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -32,11 +32,14 @@ #include "rand.h" #include "cluster.h" +extern "C" { #include #include #include +} #include #include +#include char *redisProtocolToLuaType_Int(lua_State *lua, char *reply); char *redisProtocolToLuaType_Bulk(lua_State *lua, char *reply); @@ -89,7 +92,7 @@ struct ldbState { void sha1hex(char *digest, char *script, size_t len) { SHA1_CTX ctx; unsigned char hash[20]; - char *cset = "0123456789abcdef"; + const char *cset = "0123456789abcdef"; int j; SHA1Init(&ctx); @@ -223,7 +226,7 @@ char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype) { * with a single "err" field set to the error string. Note that this * table is never a valid reply by proper commands, since the returned * tables are otherwise always indexed by integers, never by strings. */ -void luaPushError(lua_State *lua, char *error) { +void luaPushError(lua_State *lua, const char *error) { lua_Debug dbg; /* If debugging is active and in step mode, log errors resulting from @@ -365,6 +368,8 @@ void luaReplyToRedisReply(client *c, lua_State *lua) { #define LUA_CMD_OBJCACHE_MAX_LEN 64 int luaRedisGenericCommand(lua_State *lua, int raise_error) { int j, argc = lua_gettop(lua); + int acl_retval = 0; + int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; struct redisCommand *cmd; client *c = server.lua_client; sds reply; @@ -394,7 +399,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { * To make this function reentrant is futile and makes it slower, but * we should at least detect such a misuse, and abort. */ if (inuse) { - char *recursion_warning = + const char *recursion_warning = "luaRedisGenericCommand() recursive call detected. " "Are you doing funny stuff with Lua debug hooks?"; serverLog(LL_WARNING,"%s",recursion_warning); @@ -402,6 +407,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { return 1; } inuse++; + std::unique_locklock)> ulock(c->lock); /* Require at least one argument */ if (argc == 0) { @@ -413,7 +419,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { /* Build the arguments vector */ if (argv_size < argc) { - argv = zrealloc(argv,sizeof(robj*)*argc, MALLOC_LOCAL); + argv = (robj**)zrealloc(argv,sizeof(robj*)*argc, MALLOC_LOCAL); argv_size = argc; } @@ -438,7 +444,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { if (j < LUA_CMD_OBJCACHE_SIZE && cached_objects[j] && cached_objects_len[j] >= obj_len) { - sds s = ptrFromObj(cached_objects[j]); + sds s = (sds)ptrFromObj(cached_objects[j]); argv[j] = cached_objects[j]; cached_objects[j] = NULL; memcpy(s,obj_s,obj_len+1); @@ -478,14 +484,14 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { break; } else { cmdlog = sdscatlen(cmdlog," ",1); - cmdlog = sdscatsds(cmdlog,ptrFromObj(c->argv[j])); + cmdlog = sdscatsds(cmdlog,(sds)ptrFromObj(c->argv[j])); } } ldbLog(cmdlog); } /* Command lookup */ - cmd = lookupCommand(ptrFromObj(argv[0])); + cmd = lookupCommand((sds)ptrFromObj(argv[0])); if (!cmd || ((cmd->arity > 0 && cmd->arity != argc) || (argc < -cmd->arity))) { @@ -505,7 +511,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { } /* Check the ACLs. */ - int acl_retval = ACLCheckCommandPerm(c); + acl_retval = ACLCheckCommandPerm(c); if (acl_retval != ACL_OK) { if (acl_retval == ACL_DENIED_CMD) luaPushError(lua, "The user executing the script can't run this " @@ -530,11 +536,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { !server.loading && !(server.lua_caller->flags & CLIENT_MASTER)) { - luaPushError(lua, ptrFromObj(shared.roslaveerr)); + luaPushError(lua, (char*)ptrFromObj(shared.roslaveerr)); goto cleanup; } else if (deny_write_type != DISK_ERROR_TYPE_NONE) { if (deny_write_type == DISK_ERROR_TYPE_RDB) { - luaPushError(lua, ptrFromObj(shared.bgsaveerr)); + luaPushError(lua, (char*)ptrFromObj(shared.bgsaveerr)); } else { sds aof_write_err = sdscatfmt(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", @@ -557,7 +563,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { (cmd->flags & CMD_DENYOOM)) { if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK) { - luaPushError(lua, ptrFromObj(shared.oomerr)); + luaPushError(lua, (char*)ptrFromObj(shared.oomerr)); goto cleanup; } } @@ -598,7 +604,6 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { } /* Run the command */ - int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; if (server.lua_replicate_commands) { /* Set flags according to redis.set_repl() settings. */ if (server.lua_repl & PROPAGATE_AOF) @@ -622,9 +627,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { reply = sdsnewlen(c->buf,c->bufpos); c->bufpos = 0; while(listLength(c->reply)) { - clientReplyBlock *o = listNodeValue(listFirst(c->reply)); + clientReplyBlock *o = (clientReplyBlock*)listNodeValue(listFirst(c->reply)); - reply = sdscatlen(reply,o->buf,o->used); + reply = sdscatlen(reply,o->buf(),o->used); listDelNode(c->reply,listFirst(c->reply)); } } @@ -658,9 +663,9 @@ cleanup: o->refcount == 1 && (o->encoding == OBJ_ENCODING_RAW || o->encoding == OBJ_ENCODING_EMBSTR) && - sdslen(ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN) + sdslen((sds)ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN) { - sds s = ptrFromObj(o); + sds s = (sds)ptrFromObj(o); if (cached_objects[j]) decrRefCount(cached_objects[j]); cached_objects[j] = o; cached_objects_len[j] = sdsalloc(s); @@ -724,7 +729,7 @@ int luaRedisSha1hexCommand(lua_State *lua) { * return redis.error_reply("ERR Some Error") * return redis.status_reply("ERR Some Error") */ -int luaRedisReturnSingleFieldTable(lua_State *lua, char *field) { +int luaRedisReturnSingleFieldTable(lua_State *lua, const char *field) { if (lua_gettop(lua) != 1 || lua_type(lua,-1) != LUA_TSTRING) { luaPushError(lua, "wrong number or type of arguments"); return 1; @@ -870,10 +875,12 @@ void luaLoadLib(lua_State *lua, const char *libname, lua_CFunction luafunc) { lua_call(lua, 1, 0); } +extern "C" { LUALIB_API int (luaopen_cjson) (lua_State *L); LUALIB_API int (luaopen_struct) (lua_State *L); LUALIB_API int (luaopen_cmsgpack) (lua_State *L); LUALIB_API int (luaopen_bit) (lua_State *L); +} void luaLoadLibraries(lua_State *lua) { luaLoadLib(lua, "", luaopen_base); @@ -907,7 +914,7 @@ void luaRemoveUnsupportedFunctions(lua_State *lua) { * It should be the last to be called in the scripting engine initialization * sequence, because it may interact with creation of globals. */ void scriptingEnableGlobalsProtection(lua_State *lua) { - char *s[32]; + const char *s[32]; sds code = sdsempty(); int j = 0; @@ -1075,7 +1082,7 @@ void scriptingInit(int setup) { /* Add a helper function that we use to sort the multi bulk output of non * deterministic commands, when containing 'false' elements. */ { - char *compare_func = "function __redis__compare_helper(a,b)\n" + const char *compare_func = "function __redis__compare_helper(a,b)\n" " if a == false then a = '' end\n" " if b == false then b = '' end\n" " return aargv[1]),sdslen(ptrFromObj(c->argv[1]))); + sha1hex(funcname+2,(char*)ptrFromObj(c->argv[1]),sdslen((sds)ptrFromObj(c->argv[1]))); } else { /* We already have the SHA if it is a EVALSHA */ int j; - char *sha = ptrFromObj(c->argv[1]); + char *sha = (char*)ptrFromObj(c->argv[1]); /* Convert to lowercase. We don't use tolower since the function * managed to always show up in the profiler output consuming @@ -1470,13 +1477,13 @@ void evalGenericCommand(client *c, int evalsha) { * flush our cache of scripts that can be replicated as EVALSHA, while * for AOF we need to do so every time we rewrite the AOF file. */ if (evalsha && !server.lua_replicate_commands) { - if (!replicationScriptCacheExists(ptrFromObj(c->argv[1]))) { + if (!replicationScriptCacheExists((sds)ptrFromObj(c->argv[1]))) { /* This script is not in our script cache, replicate it as * EVAL, then add it into the script cache, as from now on * slaves and AOF know about it. */ - robj *script = dictFetchValue(server.lua_scripts,ptrFromObj(c->argv[1])); + robj *script = (robj*)dictFetchValue(server.lua_scripts,ptrFromObj(c->argv[1])); - replicationScriptCacheAdd(ptrFromObj(c->argv[1])); + replicationScriptCacheAdd((sds)ptrFromObj(c->argv[1])); serverAssertWithInfo(c,NULL,script != NULL); /* If the script did not produce any changes in the dataset we want @@ -1506,7 +1513,7 @@ void evalCommand(client *c) { } void evalShaCommand(client *c) { - if (sdslen(ptrFromObj(c->argv[1])) != 40) { + if (sdslen((sds)ptrFromObj(c->argv[1])) != 40) { /* We know that a match is not possible if the provided SHA is * not the right length. So we return an error ASAP, this way * evalGenericCommand() can be implemented without string length @@ -1523,7 +1530,7 @@ void evalShaCommand(client *c) { } void scriptCommand(client *c) { - if (c->argc == 2 && !strcasecmp(ptrFromObj(c->argv[1]),"help")) { + if (c->argc == 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"help")) { const char *help[] = { "DEBUG (yes|sync|no) -- Set the debug mode for subsequent scripts executed.", "EXISTS [ ...] -- Return information about the existence of the scripts in the script cache.", @@ -1533,12 +1540,12 @@ void scriptCommand(client *c) { NULL }; addReplyHelp(c, help); - } else if (c->argc == 2 && !strcasecmp(ptrFromObj(c->argv[1]),"flush")) { + } else if (c->argc == 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"flush")) { scriptingReset(); addReply(c,shared.ok); replicationScriptCacheFlush(); server.dirty++; /* Propagating this command is a good idea. */ - } else if (c->argc >= 2 && !strcasecmp(ptrFromObj(c->argv[1]),"exists")) { + } else if (c->argc >= 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"exists")) { int j; addReplyArrayLen(c, c->argc-2); @@ -1548,12 +1555,12 @@ NULL else addReply(c,shared.czero); } - } else if (c->argc == 3 && !strcasecmp(ptrFromObj(c->argv[1]),"load")) { + } else if (c->argc == 3 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"load")) { sds sha = luaCreateFunction(c,server.lua,c->argv[2]); if (sha == NULL) return; /* The error was sent by luaCreateFunction(). */ addReplyBulkCBuffer(c,sha,40); forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF); - } else if (c->argc == 2 && !strcasecmp(ptrFromObj(c->argv[1]),"kill")) { + } else if (c->argc == 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"kill")) { if (server.lua_caller == NULL) { addReplySds(c,sdsnew("-NOTBUSY No scripts in execution right now.\r\n")); } else if (server.lua_caller->flags & CLIENT_MASTER) { @@ -1564,18 +1571,18 @@ NULL server.lua_kill = 1; addReply(c,shared.ok); } - } else if (c->argc == 3 && !strcasecmp(ptrFromObj(c->argv[1]),"debug")) { - if (clientHasPendingReplies(c, TRUE)) { + } else if (c->argc == 3 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"debug")) { + if (clientHasPendingReplies(c)) { addReplyError(c,"SCRIPT DEBUG must be called outside a pipeline"); return; } - if (!strcasecmp(ptrFromObj(c->argv[2]),"no")) { + if (!strcasecmp((const char*)ptrFromObj(c->argv[2]),"no")) { ldbDisable(c); addReply(c,shared.ok); - } else if (!strcasecmp(ptrFromObj(c->argv[2]),"yes")) { + } else if (!strcasecmp((const char*)ptrFromObj(c->argv[2]),"yes")) { ldbEnable(c); addReply(c,shared.ok); - } else if (!strcasecmp(ptrFromObj(c->argv[2]),"sync")) { + } else if (!strcasecmp((const char*)ptrFromObj(c->argv[2]),"sync")) { ldbEnable(c); addReply(c,shared.ok); c->flags |= CLIENT_LUA_DEBUG_SYNC; @@ -1666,8 +1673,8 @@ void ldbSendLogs(void) { while(listLength(ldb.logs)) { listNode *ln = listFirst(ldb.logs); proto = sdscatlen(proto,"+",1); - sdsmapchars(ln->value,"\r\n"," ",2); - proto = sdscatsds(proto,ln->value); + sdsmapchars((sds)ln->value,"\r\n"," ",2); + proto = sdscatsds(proto,(sds)ln->value); proto = sdscatlen(proto,"\r\n",2); listDelNode(ldb.logs,ln); } @@ -1730,7 +1737,7 @@ int ldbStartSession(client *c) { /* First argument of EVAL is the script itself. We split it into different * lines since this is the way the debugger accesses the source code. */ - sds srcstring = sdsdup(ptrFromObj(c->argv[1])); + sds srcstring = sdsdup((sds)ptrFromObj(c->argv[1])); size_t srclen = sdslen(srcstring); while(srclen && (srcstring[srclen-1] == '\n' || srcstring[srclen-1] == '\r')) @@ -1820,7 +1827,7 @@ void evalGenericCommandWithDebugging(client *c, int evalsha) { /* Return a pointer to ldb.src source code line, considering line to be * one-based, and returning a special string for out of range lines. */ -char *ldbGetSourceLine(int line) { +const char *ldbGetSourceLine(int line) { int idx = line-1; if (idx < 0 || idx >= ldb.lines) return ""; return ldb.src[idx]; @@ -1868,6 +1875,7 @@ int ldbDelBreakpoint(int line) { sds *ldbReplParseCommand(int *argcp) { sds *argv = NULL; int argc = 0; + char *plen = NULL; if (sdslen(ldb.cbuf) == 0) return NULL; /* Working on a copy is simpler in this case. We can modify it freely @@ -1881,14 +1889,14 @@ sds *ldbReplParseCommand(int *argcp) { /* Seek and parse *\r\n. */ p = strchr(p,'*'); if (!p) goto protoerr; - char *plen = p+1; /* Multi bulk len pointer. */ + plen = p+1; /* Multi bulk len pointer. */ p = strstr(p,"\r\n"); if (!p) goto protoerr; *p = '\0'; p += 2; *argcp = atoi(plen); if (*argcp <= 0 || *argcp > 1024) goto protoerr; /* Parse each argument. */ - argv = zmalloc(sizeof(sds)*(*argcp), MALLOC_LOCAL); + argv = (sds*)zmalloc(sizeof(sds)*(*argcp), MALLOC_LOCAL); argc = 0; while(argc < *argcp) { if (*p != '$') goto protoerr; @@ -1913,8 +1921,8 @@ protoerr: /* Log the specified line in the Lua debugger output. */ void ldbLogSourceLine(int lnum) { - char *line = ldbGetSourceLine(lnum); - char *prefix; + const char *line = ldbGetSourceLine(lnum); + const char *prefix; int bp = ldbIsBreakpoint(lnum); int current = ldb.currentline == lnum; @@ -2020,12 +2028,12 @@ sds ldbCatStackValueRec(sds s, lua_State *lua, int idx, int level) { case LUA_TLIGHTUSERDATA: { const void *p = lua_topointer(lua,idx); - char *typename = "unknown"; - if (t == LUA_TFUNCTION) typename = "function"; - else if (t == LUA_TUSERDATA) typename = "userdata"; - else if (t == LUA_TTHREAD) typename = "thread"; - else if (t == LUA_TLIGHTUSERDATA) typename = "light-userdata"; - s = sdscatprintf(s,"\"%s@%p\"",typename,p); + const char *tname = "unknown"; + if (t == LUA_TFUNCTION) tname = "function"; + else if (t == LUA_TUSERDATA) tname = "userdata"; + else if (t == LUA_TTHREAD) tname = "thread"; + else if (t == LUA_TLIGHTUSERDATA) tname = "light-userdata"; + s = sdscatprintf(s,"\"%s@%p\"",tname,p); } break; default: @@ -2044,7 +2052,7 @@ sds ldbCatStackValue(sds s, lua_State *lua, int idx) { /* Produce a debugger log entry representing the value of the Lua object * currently on the top of the stack. The element is ot popped nor modified. * Check ldbCatStackValue() for the actual implementation. */ -void ldbLogStackValue(lua_State *lua, char *prefix) { +void ldbLogStackValue(lua_State *lua, const char *prefix) { sds s = sdsnew(prefix); s = ldbCatStackValue(s,lua,-1); ldbLogWithMaxLen(s); @@ -2466,7 +2474,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) { } if (ldb.step || bp) { - char *reason = "step over"; + const char *reason = "step over"; if (bp) reason = ldb.luabp ? "redis.breakpoint() called" : "break point"; else if (timeout) reason = "timeout reached, infinite loop?"; diff --git a/src/server.c b/src/server.c index bad6c2557..6b49c5809 100644 --- a/src/server.c +++ b/src/server.c @@ -1666,12 +1666,15 @@ void clientsCron(int iel) { c = listNodeValue(head); if (c->iel == iel) { + fastlock_lock(&c->lock); /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was * terminated. */ - if (clientsCronHandleTimeout(c,now)) continue; - if (clientsCronResizeQueryBuffer(c)) continue; - if (clientsCronTrackExpansiveClients(c)) continue; + if (clientsCronHandleTimeout(c,now)) goto LContinue; + if (clientsCronResizeQueryBuffer(c)) goto LContinue; + if (clientsCronTrackExpansiveClients(c)) goto LContinue; + LContinue: + fastlock_unlock(&c->lock); } } } @@ -3135,6 +3138,7 @@ struct redisCommand *lookupCommandOrOriginal(sds name) { void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { + serverAssert(aeThreadOwnsLock()); if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) @@ -5034,7 +5038,7 @@ int main(int argc, char **argv) { initServer(); - server.cthreads = 4; //testing + server.cthreads = 2; //testing initNetworking(1 /* fReusePort */); if (background || server.pidfile) createPidFile(); diff --git a/src/server.h b/src/server.h index 7d95402d0..b8e3fa4e7 100644 --- a/src/server.h +++ b/src/server.h @@ -49,7 +49,13 @@ #include #include #include +#ifdef __cplusplus +extern "C" { #include +} +#else +#include +#endif #include typedef long long mstime_t; /* millisecond time type. */ @@ -869,8 +875,6 @@ typedef struct client { int bufposAsync; int buflenAsync; char *bufAsync; - /* Async Done Buffer, moved after a thread is done async writing */ - list *listbufferDoneAsync; int iel; /* the event loop index we're registered with */ struct fastlock lock; @@ -1621,7 +1625,7 @@ void pauseClients(mstime_t duration); int clientsArePaused(void); int processEventsWhileBlocked(int iel); int handleClientsWithPendingWrites(int iel); -int clientHasPendingReplies(client *c, int fIncludeAsync); +int clientHasPendingReplies(client *c); void unlinkClient(client *c); int writeToClient(int fd, client *c, int handler_installed); void linkClient(client *c); diff --git a/src/sha1.h b/src/sha1.h index f41691258..e42d4d2d4 100644 --- a/src/sha1.h +++ b/src/sha1.h @@ -7,6 +7,10 @@ By Steve Reid 100% Public Domain */ +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { uint32_t state[5]; uint32_t count[2]; @@ -21,4 +25,9 @@ void SHA1Final(unsigned char digest[20], SHA1_CTX* context); #ifdef REDIS_TEST int sha1Test(int argc, char **argv); #endif + +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/t_list.c b/src/t_list.c index fc143cf34..c7350887f 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -680,6 +680,7 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb } else { /* BRPOPLPUSH failed because of wrong * destination type. */ + fastlock_unlock(&receiver->lock); return C_ERR; } }