Prevent mixed up client replies, and deadlocks
This commit is contained in:
parent
2788cf57b0
commit
30e8a859c0
@ -655,6 +655,8 @@ struct client *createFakeClient(void) {
|
||||
c->puser = NULL;
|
||||
listSetFreeMethod(c->reply,freeClientReplyValue);
|
||||
listSetDupMethod(c->reply,dupClientReplyValue);
|
||||
fastlock_init(&c->lock);
|
||||
fastlock_lock(&c->lock);
|
||||
initClientMultiState(c);
|
||||
return c;
|
||||
}
|
||||
@ -672,6 +674,8 @@ void freeFakeClient(struct client *c) {
|
||||
listRelease(c->reply);
|
||||
listRelease(c->watched_keys);
|
||||
freeClientMultiState(c);
|
||||
fastlock_unlock(&c->lock);
|
||||
fastlock_free(&c->lock);
|
||||
zfree(c);
|
||||
}
|
||||
|
||||
|
@ -223,7 +223,8 @@ void disconnectAllBlockedClients(void) {
|
||||
listRewind(server.clients,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *c = listNodeValue(ln);
|
||||
|
||||
|
||||
fastlock_lock(&c->lock);
|
||||
if (c->flags & CLIENT_BLOCKED) {
|
||||
addReplySdsAsync(c,sdsnew(
|
||||
"-UNBLOCKED force unblock from blocking operation, "
|
||||
@ -231,6 +232,7 @@ void disconnectAllBlockedClients(void) {
|
||||
unblockClient(c);
|
||||
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
|
||||
}
|
||||
fastlock_unlock(&c->lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -309,6 +311,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
* freed by the next unblockClient()
|
||||
* call. */
|
||||
if (dstkey) incrRefCount(dstkey);
|
||||
fastlock_lock(&receiver->lock);
|
||||
unblockClient(receiver);
|
||||
|
||||
if (serveClientBlockedOnList(receiver,
|
||||
@ -321,6 +324,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
}
|
||||
|
||||
if (dstkey) decrRefCount(dstkey);
|
||||
fastlock_unlock(&receiver->lock);
|
||||
decrRefCount(value);
|
||||
} else {
|
||||
break;
|
||||
@ -360,6 +364,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
continue;
|
||||
}
|
||||
|
||||
fastlock_lock(&receiver->lock);
|
||||
int where = (receiver->lastcmd &&
|
||||
receiver->lastcmd->proc == bzpopminCommand)
|
||||
? ZSET_MIN : ZSET_MAX;
|
||||
@ -377,6 +382,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
incrRefCount(rl->key);
|
||||
propagate(cmd,receiver->db->id,
|
||||
argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
fastlock_unlock(&receiver->lock);
|
||||
decrRefCount(argv[0]);
|
||||
decrRefCount(argv[1]);
|
||||
}
|
||||
@ -419,10 +425,12 @@ void handleClientsBlockedOnKeys(void) {
|
||||
/* If the group was not found, send an error
|
||||
* to the consumer. */
|
||||
if (!group) {
|
||||
fastlock_lock(&receiver->lock);
|
||||
addReplyErrorAsync(receiver,
|
||||
"-NOGROUP the consumer group this client "
|
||||
"was blocked on no longer exists");
|
||||
unblockClient(receiver);
|
||||
fastlock_unlock(&receiver->lock);
|
||||
continue;
|
||||
} else {
|
||||
*gt = group->last_id;
|
||||
@ -444,6 +452,8 @@ void handleClientsBlockedOnKeys(void) {
|
||||
noack = receiver->bpop.xread_group_noack;
|
||||
}
|
||||
|
||||
fastlock_lock(&receiver->lock);
|
||||
|
||||
/* Emit the two elements sub-array consisting of
|
||||
* the name of the stream and the data we
|
||||
* extracted from it. Wrapped in a single-item
|
||||
@ -469,6 +479,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
* valid, so we must do the setup above before
|
||||
* this call. */
|
||||
unblockClient(receiver);
|
||||
fastlock_unlock(&receiver->lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,10 @@
|
||||
#ifndef __CLUSTER_H
|
||||
#define __CLUSTER_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* Redis cluster data structures, defines, exported API.
|
||||
*----------------------------------------------------------------------------*/
|
||||
@ -287,4 +291,8 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||
int clusterRedirectBlockedClientIfNeeded(client *c);
|
||||
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* __CLUSTER_H */
|
||||
|
1
src/db.c
1
src/db.c
@ -1110,6 +1110,7 @@ long long getExpire(redisDb *db, robj *key) {
|
||||
* will be consistent even if we allow write operations against expiring
|
||||
* keys. */
|
||||
void propagateExpire(redisDb *db, robj *key, int lazy) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
robj *argv[2];
|
||||
|
||||
argv[0] = lazy ? shared.unlink : shared.del;
|
||||
|
@ -350,6 +350,7 @@ unsigned long LFUDecrAndReturn(robj *o) {
|
||||
* used memory: the eviction should use mostly data size. This function
|
||||
* returns the sum of AOF and slaves buffer. */
|
||||
size_t freeMemoryGetNotCountedMemory(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
size_t overhead = 0;
|
||||
int slaves = listLength(server.slaves);
|
||||
|
||||
@ -444,6 +445,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
|
||||
* Otehrwise if we are over the memory limit, but not enough memory
|
||||
* was freed to return back under the limit, the function returns C_ERR. */
|
||||
int freeMemoryIfNeeded(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
/* By default replicas should ignore maxmemory
|
||||
* and just be masters exact copies. */
|
||||
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
|
||||
|
@ -3696,7 +3696,7 @@ void moduleHandleBlockedClients(void) {
|
||||
/* Put the client in the list of clients that need to write
|
||||
* if there are pending replies here. This is needed since
|
||||
* during a non blocking command the client may receive output. */
|
||||
if (clientHasPendingReplies(c, FALSE) &&
|
||||
if (clientHasPendingReplies(c) &&
|
||||
!(c->flags & CLIENT_PENDING_WRITE))
|
||||
{
|
||||
c->flags |= CLIENT_PENDING_WRITE;
|
||||
|
@ -45,15 +45,25 @@ class AeLocker
|
||||
bool m_fArmed = false;
|
||||
|
||||
public:
|
||||
AeLocker(bool fArm = false)
|
||||
AeLocker()
|
||||
{
|
||||
if (fArm)
|
||||
arm();
|
||||
}
|
||||
|
||||
void arm()
|
||||
void arm(client *c) // if a client is passed, then the client is already locked
|
||||
{
|
||||
if (!m_fArmed)
|
||||
if (c != nullptr)
|
||||
{
|
||||
serverAssert(!m_fArmed);
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
while (!aeTryAcquireLock())
|
||||
{
|
||||
c->lock.unlock();
|
||||
// give a chance for the global lock to progress if they were waiting on the client
|
||||
c->lock.lock();
|
||||
}
|
||||
m_fArmed = true;
|
||||
}
|
||||
else if (!m_fArmed)
|
||||
{
|
||||
m_fArmed = true;
|
||||
aeAcquireLock();
|
||||
@ -204,9 +214,6 @@ client *createClient(int fd, int iel) {
|
||||
c->bufAsync = NULL;
|
||||
c->buflenAsync = 0;
|
||||
c->bufposAsync = 0;
|
||||
c->listbufferDoneAsync = listCreate();
|
||||
listSetFreeMethod(c->listbufferDoneAsync,freeClientReplyValue);
|
||||
listSetDupMethod(c->listbufferDoneAsync,dupClientReplyValue);
|
||||
|
||||
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
|
||||
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
|
||||
@ -276,6 +283,7 @@ void clientInstallAsyncWriteHandler(client *c) {
|
||||
int prepareClientToWrite(client *c, bool fAsync) {
|
||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||
serverAssert(!fAsync || aeThreadOwnsLock());
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
|
||||
/* If it's the Lua client we always return ok without installing any
|
||||
* handler since there is no socket at all. */
|
||||
@ -293,7 +301,7 @@ int prepareClientToWrite(client *c, bool fAsync) {
|
||||
|
||||
/* Schedule the client to write the output buffers to the socket, unless
|
||||
* it should already be setup to do so (it has already pending data). */
|
||||
if (!fAsync && !clientHasPendingReplies(c, FALSE)) clientInstallWriteHandler(c);
|
||||
if (!fAsync && !clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
||||
if (fAsync && !(c->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(c);
|
||||
|
||||
/* Authorize the caller to queue in the output buffer of this client. */
|
||||
@ -1014,8 +1022,8 @@ void copyClientOutputBuffer(client *dst, client *src) {
|
||||
|
||||
/* Return true if the specified client has pending reply buffers to write to
|
||||
* the socket. */
|
||||
int clientHasPendingReplies(client *c, int fIncludeAsync) {
|
||||
return c->bufpos || listLength(c->reply) || (fIncludeAsync && listLength(c->listbufferDoneAsync));
|
||||
int clientHasPendingReplies(client *c) {
|
||||
return c->bufpos || listLength(c->reply);
|
||||
}
|
||||
|
||||
#define MAX_ACCEPTS_PER_CALL 1000
|
||||
@ -1149,6 +1157,7 @@ static void freeClientArgv(client *c) {
|
||||
* when we resync with our own master and want to force all our slaves to
|
||||
* resync with us as well. */
|
||||
void disconnectSlaves(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
std::vector<client*> vecfreeImmediate;
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
@ -1327,7 +1336,6 @@ void freeClient(client *c) {
|
||||
/* Release other dynamically allocated client structure fields,
|
||||
* and finally release the client structure itself. */
|
||||
zfree(c->bufAsync);
|
||||
listRelease(c->listbufferDoneAsync);
|
||||
if (c->name) decrRefCount(c->name);
|
||||
zfree(c->argv);
|
||||
freeClientMultiState(c);
|
||||
@ -1342,10 +1350,10 @@ void freeClient(client *c) {
|
||||
* should be valid for the continuation of the flow of the program. */
|
||||
void freeClientAsync(client *c) {
|
||||
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
|
||||
aeAcquireLock();
|
||||
AeLocker lock;
|
||||
lock.arm(nullptr);
|
||||
c->flags |= CLIENT_CLOSE_ASAP;
|
||||
listAddNodeTail(server.clients_to_close,c);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
void freeClientsInAsyncFreeQueue(int iel) {
|
||||
@ -1381,48 +1389,10 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
clientReplyBlock *o;
|
||||
AssertCorrectThread(c);
|
||||
|
||||
// Decide up front if we are sending the done buffer. This prevents us from completing
|
||||
// a transmission on another thread while transmitting the thread local buffer, resulting in us
|
||||
// overlapping messages
|
||||
AeLocker locker(true);
|
||||
std::lock_guard<decltype(c->lock)> lock(c->lock); // To prevent deadlocks this must be after we acquire the global lock
|
||||
int fSendAsyncBuffer = listLength(c->listbufferDoneAsync) && (c->sentlen == 0 || c->sentlenAsync > 0);
|
||||
if (!fSendAsyncBuffer)
|
||||
locker.disarm();
|
||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||
|
||||
while(fSendAsyncBuffer || clientHasPendingReplies(c, FALSE)) {
|
||||
if (fSendAsyncBuffer) {
|
||||
o = (clientReplyBlock*)listNodeValue(listFirst(c->listbufferDoneAsync));
|
||||
if (o->used == 0) {
|
||||
listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync));
|
||||
if (listLength(c->listbufferDoneAsync) == 0)
|
||||
{
|
||||
fSendAsyncBuffer = 0;
|
||||
locker.disarm();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
nwritten = write(fd, o->buf() + c->sentlen, o->used - c->sentlen);
|
||||
if (nwritten <= 0)
|
||||
break;
|
||||
c->sentlenAsync += nwritten;
|
||||
totwritten += nwritten;
|
||||
|
||||
/* If we fully sent the object on head go to the next one */
|
||||
if (c->sentlenAsync == o->used) {
|
||||
listDelNode(c->listbufferDoneAsync,listFirst(c->listbufferDoneAsync));
|
||||
c->sentlenAsync = 0;
|
||||
/* If there are no longer objects in the list, we expect
|
||||
* the count of reply bytes to be exactly zero. */
|
||||
if (listLength(c->listbufferDoneAsync) == 0)
|
||||
{
|
||||
fSendAsyncBuffer = 0;
|
||||
locker.disarm();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else if (c->bufpos > 0) {
|
||||
while(clientHasPendingReplies(c)) {
|
||||
if (c->bufpos > 0) {
|
||||
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
|
||||
|
||||
if (nwritten <= 0) break;
|
||||
@ -1486,9 +1456,17 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
} else {
|
||||
serverLog(LL_VERBOSE,
|
||||
"Error writing to client: %s", strerror(errno));
|
||||
aeAcquireLock();
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
if (aeTryAcquireLock())
|
||||
{
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
}
|
||||
else
|
||||
{
|
||||
lock.unlock();
|
||||
freeClientAsync(c);
|
||||
}
|
||||
|
||||
return C_ERR;
|
||||
}
|
||||
}
|
||||
@ -1499,15 +1477,22 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
* We just rely on data / pings received for timeout detection. */
|
||||
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
|
||||
}
|
||||
if (!clientHasPendingReplies(c, TRUE)) {
|
||||
if (!clientHasPendingReplies(c)) {
|
||||
c->sentlen = 0;
|
||||
if (handler_installed) aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE);
|
||||
|
||||
/* Close connection after entire reply has been sent. */
|
||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
||||
aeAcquireLock();
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
if (aeTryAcquireLock())
|
||||
{
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
}
|
||||
else
|
||||
{
|
||||
lock.unlock();
|
||||
freeClientAsync(c);
|
||||
}
|
||||
return C_ERR;
|
||||
}
|
||||
}
|
||||
@ -1542,7 +1527,9 @@ void ProcessPendingAsyncWrites()
|
||||
reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock);
|
||||
reply->used = c->bufposAsync;
|
||||
memcpy(reply->buf(), c->bufAsync, c->bufposAsync);
|
||||
listAddNodeTail(c->listbufferDoneAsync, reply);
|
||||
listAddNodeTail(c->reply, reply);
|
||||
c->reply_bytes += reply->size;
|
||||
|
||||
c->bufposAsync = 0;
|
||||
c->buflenAsync = 0;
|
||||
zfree(c->bufAsync);
|
||||
@ -1566,6 +1553,7 @@ void ProcessPendingAsyncWrites()
|
||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))))
|
||||
continue;
|
||||
|
||||
asyncCloseClientOnOutputBufferLimitReached(c);
|
||||
if (aeCreateRemoteFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR)
|
||||
continue; // We can retry later in the cron
|
||||
}
|
||||
@ -1601,7 +1589,7 @@ int handleClientsWithPendingWrites(int iel) {
|
||||
|
||||
/* If after the synchronous writes above we still have data to
|
||||
* output to the client, we need to install the writable handler. */
|
||||
if (clientHasPendingReplies(c, TRUE)) {
|
||||
if (clientHasPendingReplies(c)) {
|
||||
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
|
||||
/* For the fsync=always policy, we want that a given FD is never
|
||||
* served for reading and writing in the same event loop iteration,
|
||||
@ -1619,7 +1607,8 @@ int handleClientsWithPendingWrites(int iel) {
|
||||
}
|
||||
}
|
||||
|
||||
AeLocker locker(true);
|
||||
AeLocker locker;
|
||||
locker.arm(nullptr);
|
||||
ProcessPendingAsyncWrites();
|
||||
|
||||
return processed;
|
||||
@ -1675,7 +1664,7 @@ void unprotectClient(client *c) {
|
||||
if (c->flags & CLIENT_PROTECTED) {
|
||||
c->flags &= ~CLIENT_PROTECTED;
|
||||
aeCreateFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c);
|
||||
if (clientHasPendingReplies(c, TRUE)) clientInstallWriteHandler(c);
|
||||
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1967,7 +1956,8 @@ void processInputBuffer(client *c) {
|
||||
} else {
|
||||
serverPanic("Unknown request type");
|
||||
}
|
||||
AeLocker locker(true);
|
||||
AeLocker locker;
|
||||
locker.arm(c);
|
||||
server.current_client = c;
|
||||
|
||||
/* Multibulk processing could see a <= 0 length. */
|
||||
@ -2033,9 +2023,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
UNUSED(mask);
|
||||
serverAssert(mask & AE_READ_THREADSAFE);
|
||||
serverAssert(c->iel == ielFromEventLoop(el));
|
||||
AeLocker locker;
|
||||
|
||||
AeLocker aelock;
|
||||
AssertCorrectThread(c);
|
||||
std::lock_guard<decltype(c->lock)> lock(c->lock);
|
||||
std::unique_lock<decltype(c->lock)> lock(c->lock, std::defer_lock);
|
||||
if (!lock.try_lock())
|
||||
return; // Process something else while we wait
|
||||
|
||||
readlen = PROTO_IOBUF_LEN;
|
||||
/* If this is a multi bulk request, and we are processing a bulk reply
|
||||
@ -2065,16 +2058,14 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
return;
|
||||
} else {
|
||||
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
|
||||
aeAcquireLock();
|
||||
aelock.arm(c);
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
return;
|
||||
}
|
||||
} else if (nread == 0) {
|
||||
serverLog(LL_VERBOSE, "Client closed connection");
|
||||
aeAcquireLock();
|
||||
aelock.arm(c);
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
return;
|
||||
} else if (c->flags & CLIENT_MASTER) {
|
||||
/* Append the query buffer to the pending (not applied) buffer
|
||||
@ -2095,9 +2086,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
|
||||
sdsfree(ci);
|
||||
sdsfree(bytes);
|
||||
aeAcquireLock();
|
||||
aelock.arm(c);
|
||||
freeClient(c);
|
||||
aeReleaseLock();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -2108,9 +2098,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
* corresponding part of the replication stream, will be propagated to
|
||||
* the sub-slaves and to the replication backlog. */
|
||||
processInputBufferAndReplicate(c);
|
||||
aeAcquireLock();
|
||||
aelock.arm(c);
|
||||
ProcessPendingAsyncWrites();
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
||||
@ -2619,7 +2608,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
|
||||
* enforcing the client output length limits. */
|
||||
unsigned long getClientOutputBufferMemoryUsage(client *c) {
|
||||
unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
|
||||
return c->reply_bytes + (list_item_size*listLength(c->reply));
|
||||
return c->reply_bytes + (list_item_size*listLength(c->reply)) + c->buflenAsync;
|
||||
}
|
||||
|
||||
/* Get the class of a client, used in order to enforce limits to different
|
||||
@ -2727,6 +2716,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) {
|
||||
* This is also called by SHUTDOWN for a best-effort attempt to send
|
||||
* slaves the latest writes. */
|
||||
void flushSlavesOutputBuffers(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
@ -2747,7 +2737,7 @@ void flushSlavesOutputBuffers(void) {
|
||||
events = aeGetFileEvents(server.rgthreadvar[slave->iel].el,slave->fd);
|
||||
if (events & AE_WRITABLE &&
|
||||
slave->replstate == SLAVE_STATE_ONLINE &&
|
||||
clientHasPendingReplies(slave, TRUE))
|
||||
clientHasPendingReplies(slave))
|
||||
{
|
||||
writeToClient(slave->fd,slave,0);
|
||||
}
|
||||
@ -2820,8 +2810,7 @@ int processEventsWhileBlocked(int iel) {
|
||||
int iterations = 4; /* See the function top-comment. */
|
||||
int count = 0;
|
||||
|
||||
// BUGBUG - This function isn't fair - why should clients on this thread get to run, but not clients elsewhere?
|
||||
// We mix up replies when releasing the lock here so more work is needed to fix this
|
||||
aeReleaseLock();
|
||||
while (iterations--) {
|
||||
int events = 0;
|
||||
events += aeProcessEvents(server.rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
|
||||
@ -2829,5 +2818,6 @@ int processEventsWhileBlocked(int iel) {
|
||||
if (!events) break;
|
||||
count += events;
|
||||
}
|
||||
aeAcquireLock();
|
||||
return count;
|
||||
}
|
||||
|
@ -940,6 +940,7 @@ void freeMemoryOverheadData(struct redisMemOverhead *mh) {
|
||||
* information used for the MEMORY OVERHEAD and INFO command. The returned
|
||||
* structure pointer should be freed calling freeMemoryOverheadData(). */
|
||||
struct redisMemOverhead *getMemoryOverheadData(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
int j;
|
||||
size_t mem_total = 0;
|
||||
size_t mem = 0;
|
||||
@ -1077,6 +1078,7 @@ void inputCatSds(void *result, const char *str) {
|
||||
/* This implements MEMORY DOCTOR. An human readable analysis of the Redis
|
||||
* memory condition. */
|
||||
sds getMemoryDoctorReport(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
int empty = 0; /* Instance is empty or almost empty. */
|
||||
int big_peak = 0; /* Memory peak is much larger than used mem. */
|
||||
int high_frag = 0; /* High fragmentation. */
|
||||
|
@ -293,7 +293,9 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
||||
listRewind(list,&li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
client *c = ln->value;
|
||||
fastlock_lock(&c->lock);
|
||||
addReplyPubsubMessage(c,channel,message);
|
||||
fastlock_unlock(&c->lock);
|
||||
receivers++;
|
||||
}
|
||||
}
|
||||
@ -309,8 +311,10 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
||||
(char*)ptrFromObj(channel),
|
||||
sdslen(ptrFromObj(channel)),0))
|
||||
{
|
||||
fastlock_lock(&pat->pclient->lock);
|
||||
addReplyPubsubPatMessage(pat->pclient,
|
||||
pat->pattern,channel,message);
|
||||
fastlock_unlock(&pat->pclient->lock);
|
||||
receivers++;
|
||||
}
|
||||
}
|
||||
|
@ -30,9 +30,17 @@
|
||||
#ifndef REDIS_RANDOM_H
|
||||
#define REDIS_RANDOM_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t redisLrand48();
|
||||
void redisSrand48(int32_t seedval);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#define REDIS_LRAND48_MAX INT32_MAX
|
||||
|
||||
#endif
|
||||
|
@ -2140,6 +2140,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
|
||||
* This function covers the case of RDB -> Salves socket transfers for
|
||||
* diskless replication. */
|
||||
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
uint64_t *ok_slaves;
|
||||
|
||||
if (!bysignal && exitcode == 0) {
|
||||
@ -2259,6 +2260,7 @@ void killRDBChild(void) {
|
||||
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
|
||||
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
|
||||
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
int *fds;
|
||||
uint64_t *clientids;
|
||||
int numfds;
|
||||
|
@ -37,6 +37,7 @@
|
||||
#include <fcntl.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/stat.h>
|
||||
#include <mutex>
|
||||
|
||||
void replicationDiscardCachedMaster(void);
|
||||
void replicationResurrectCachedMaster(int newfd);
|
||||
@ -115,6 +116,7 @@ void resizeReplicationBacklog(long long newsize) {
|
||||
}
|
||||
|
||||
void freeReplicationBacklog(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(listLength(server.slaves) == 0);
|
||||
zfree(server.repl_backlog);
|
||||
server.repl_backlog = NULL;
|
||||
@ -200,6 +202,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
/* We can't have slaves attached and no backlog. */
|
||||
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
|
||||
|
||||
/* Get the lock on all slaves */
|
||||
listRewind(slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
((client*)ln->value)->lock.lock();
|
||||
}
|
||||
|
||||
/* Send SELECT command to every slave if needed. */
|
||||
if (server.slaveseldb != dictid) {
|
||||
robj *selectcmd;
|
||||
@ -280,6 +288,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
for (j = 0; j < argc; j++)
|
||||
addReplyBulkAsync(slave,argv[j]);
|
||||
}
|
||||
|
||||
/* Release the lock on all slaves */
|
||||
listRewind(slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
((client*)ln->value)->lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/* This function is used in order to proxy what we receive from our master
|
||||
@ -303,6 +317,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
|
||||
listRewind(slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = (client*)ln->value;
|
||||
std::lock_guard<decltype(slave->lock)> ulock(slave->lock);
|
||||
|
||||
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
@ -348,6 +363,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
||||
listRewind(monitors,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *monitor = (client*)ln->value;
|
||||
std::lock_guard<decltype(monitor->lock)> lock(monitor->lock);
|
||||
addReplyAsync(monitor,cmdobj);
|
||||
}
|
||||
decrRefCount(cmdobj);
|
||||
@ -459,6 +475,7 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
|
||||
* On success return C_OK, otherwise C_ERR is returned and we proceed
|
||||
* with the usual full resync. */
|
||||
int masterTryPartialResynchronization(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
long long psync_offset, psync_len;
|
||||
char *master_replid = (char*)ptrFromObj(c->argv[1]);
|
||||
char buf[128];
|
||||
@ -575,6 +592,7 @@ need_full_resync:
|
||||
*
|
||||
* Returns C_OK on success or C_ERR otherwise. */
|
||||
int startBgsaveForReplication(int mincapa) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
int retval;
|
||||
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
|
||||
listIter li;
|
||||
@ -653,7 +671,7 @@ void syncCommand(client *c) {
|
||||
* the client about already issued commands. We need a fresh reply
|
||||
* buffer registering the differences between the BGSAVE and the current
|
||||
* dataset, so that we can copy to other slaves if needed. */
|
||||
if (clientHasPendingReplies(c, TRUE)) {
|
||||
if (clientHasPendingReplies(c)) {
|
||||
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
|
||||
return;
|
||||
}
|
||||
@ -1637,6 +1655,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) {
|
||||
/* This handler fires when the non blocking connect was able to
|
||||
* establish a connection with the master. */
|
||||
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
char tmpfile[256], *err = NULL;
|
||||
int dfd = -1, maxtries = 5;
|
||||
int sockerr = 0, psync_result;
|
||||
@ -2211,7 +2230,6 @@ void replicationCacheMaster(client *c) {
|
||||
if (c->flags & CLIENT_MULTI) discardTransaction(c);
|
||||
listEmpty(c->reply);
|
||||
c->sentlen = 0;
|
||||
listEmpty(c->listbufferDoneAsync);
|
||||
c->sentlenAsync = 0;
|
||||
c->reply_bytes = 0;
|
||||
c->bufpos = 0;
|
||||
@ -2299,7 +2317,7 @@ void replicationResurrectCachedMaster(int newfd) {
|
||||
|
||||
/* We may also need to install the write handler as well if there is
|
||||
* pending data in the write buffers. */
|
||||
if (clientHasPendingReplies(server.master, TRUE)) {
|
||||
if (clientHasPendingReplies(server.master)) {
|
||||
if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE|AE_WRITE_THREADSAFE,
|
||||
sendReplyToClient, server.master)) {
|
||||
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
|
||||
@ -2527,6 +2545,7 @@ void processClientsWaitingReplicas(void) {
|
||||
listRewind(server.clients_waiting_acks,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *c = (client*)ln->value;
|
||||
fastlock_lock(&c->lock);
|
||||
|
||||
/* Every time we find a client that is satisfied for a given
|
||||
* offset and number of replicas, we remember it so the next client
|
||||
@ -2547,6 +2566,7 @@ void processClientsWaitingReplicas(void) {
|
||||
addReplyLongLongAsync(c,numreplicas);
|
||||
}
|
||||
}
|
||||
fastlock_unlock(&c->lock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2574,7 +2594,11 @@ long long replicationGetSlaveOffset(void) {
|
||||
|
||||
/* Replication cron function, called 1 time per second. */
|
||||
void replicationCron(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
static long long replication_cron_loops = 0;
|
||||
std::unique_lock<decltype(server.master->lock)> ulock;
|
||||
if (server.master != nullptr)
|
||||
ulock = decltype(ulock)(server.master->lock);
|
||||
|
||||
/* Non blocking connection timeout? */
|
||||
if (server.masterhost &&
|
||||
|
@ -32,11 +32,14 @@
|
||||
#include "rand.h"
|
||||
#include "cluster.h"
|
||||
|
||||
extern "C" {
|
||||
#include <lua.h>
|
||||
#include <lauxlib.h>
|
||||
#include <lualib.h>
|
||||
}
|
||||
#include <ctype.h>
|
||||
#include <math.h>
|
||||
#include <mutex>
|
||||
|
||||
char *redisProtocolToLuaType_Int(lua_State *lua, char *reply);
|
||||
char *redisProtocolToLuaType_Bulk(lua_State *lua, char *reply);
|
||||
@ -89,7 +92,7 @@ struct ldbState {
|
||||
void sha1hex(char *digest, char *script, size_t len) {
|
||||
SHA1_CTX ctx;
|
||||
unsigned char hash[20];
|
||||
char *cset = "0123456789abcdef";
|
||||
const char *cset = "0123456789abcdef";
|
||||
int j;
|
||||
|
||||
SHA1Init(&ctx);
|
||||
@ -223,7 +226,7 @@ char *redisProtocolToLuaType_MultiBulk(lua_State *lua, char *reply, int atype) {
|
||||
* with a single "err" field set to the error string. Note that this
|
||||
* table is never a valid reply by proper commands, since the returned
|
||||
* tables are otherwise always indexed by integers, never by strings. */
|
||||
void luaPushError(lua_State *lua, char *error) {
|
||||
void luaPushError(lua_State *lua, const char *error) {
|
||||
lua_Debug dbg;
|
||||
|
||||
/* If debugging is active and in step mode, log errors resulting from
|
||||
@ -365,6 +368,8 @@ void luaReplyToRedisReply(client *c, lua_State *lua) {
|
||||
#define LUA_CMD_OBJCACHE_MAX_LEN 64
|
||||
int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
int j, argc = lua_gettop(lua);
|
||||
int acl_retval = 0;
|
||||
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
|
||||
struct redisCommand *cmd;
|
||||
client *c = server.lua_client;
|
||||
sds reply;
|
||||
@ -394,7 +399,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
* To make this function reentrant is futile and makes it slower, but
|
||||
* we should at least detect such a misuse, and abort. */
|
||||
if (inuse) {
|
||||
char *recursion_warning =
|
||||
const char *recursion_warning =
|
||||
"luaRedisGenericCommand() recursive call detected. "
|
||||
"Are you doing funny stuff with Lua debug hooks?";
|
||||
serverLog(LL_WARNING,"%s",recursion_warning);
|
||||
@ -402,6 +407,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
return 1;
|
||||
}
|
||||
inuse++;
|
||||
std::unique_lock<decltype(c->lock)> ulock(c->lock);
|
||||
|
||||
/* Require at least one argument */
|
||||
if (argc == 0) {
|
||||
@ -413,7 +419,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
|
||||
/* Build the arguments vector */
|
||||
if (argv_size < argc) {
|
||||
argv = zrealloc(argv,sizeof(robj*)*argc, MALLOC_LOCAL);
|
||||
argv = (robj**)zrealloc(argv,sizeof(robj*)*argc, MALLOC_LOCAL);
|
||||
argv_size = argc;
|
||||
}
|
||||
|
||||
@ -438,7 +444,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
if (j < LUA_CMD_OBJCACHE_SIZE && cached_objects[j] &&
|
||||
cached_objects_len[j] >= obj_len)
|
||||
{
|
||||
sds s = ptrFromObj(cached_objects[j]);
|
||||
sds s = (sds)ptrFromObj(cached_objects[j]);
|
||||
argv[j] = cached_objects[j];
|
||||
cached_objects[j] = NULL;
|
||||
memcpy(s,obj_s,obj_len+1);
|
||||
@ -478,14 +484,14 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
break;
|
||||
} else {
|
||||
cmdlog = sdscatlen(cmdlog," ",1);
|
||||
cmdlog = sdscatsds(cmdlog,ptrFromObj(c->argv[j]));
|
||||
cmdlog = sdscatsds(cmdlog,(sds)ptrFromObj(c->argv[j]));
|
||||
}
|
||||
}
|
||||
ldbLog(cmdlog);
|
||||
}
|
||||
|
||||
/* Command lookup */
|
||||
cmd = lookupCommand(ptrFromObj(argv[0]));
|
||||
cmd = lookupCommand((sds)ptrFromObj(argv[0]));
|
||||
if (!cmd || ((cmd->arity > 0 && cmd->arity != argc) ||
|
||||
(argc < -cmd->arity)))
|
||||
{
|
||||
@ -505,7 +511,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
}
|
||||
|
||||
/* Check the ACLs. */
|
||||
int acl_retval = ACLCheckCommandPerm(c);
|
||||
acl_retval = ACLCheckCommandPerm(c);
|
||||
if (acl_retval != ACL_OK) {
|
||||
if (acl_retval == ACL_DENIED_CMD)
|
||||
luaPushError(lua, "The user executing the script can't run this "
|
||||
@ -530,11 +536,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
!server.loading &&
|
||||
!(server.lua_caller->flags & CLIENT_MASTER))
|
||||
{
|
||||
luaPushError(lua, ptrFromObj(shared.roslaveerr));
|
||||
luaPushError(lua, (char*)ptrFromObj(shared.roslaveerr));
|
||||
goto cleanup;
|
||||
} else if (deny_write_type != DISK_ERROR_TYPE_NONE) {
|
||||
if (deny_write_type == DISK_ERROR_TYPE_RDB) {
|
||||
luaPushError(lua, ptrFromObj(shared.bgsaveerr));
|
||||
luaPushError(lua, (char*)ptrFromObj(shared.bgsaveerr));
|
||||
} else {
|
||||
sds aof_write_err = sdscatfmt(sdsempty(),
|
||||
"-MISCONF Errors writing to the AOF file: %s\r\n",
|
||||
@ -557,7 +563,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
(cmd->flags & CMD_DENYOOM))
|
||||
{
|
||||
if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK) {
|
||||
luaPushError(lua, ptrFromObj(shared.oomerr));
|
||||
luaPushError(lua, (char*)ptrFromObj(shared.oomerr));
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
@ -598,7 +604,6 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
}
|
||||
|
||||
/* Run the command */
|
||||
int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS;
|
||||
if (server.lua_replicate_commands) {
|
||||
/* Set flags according to redis.set_repl() settings. */
|
||||
if (server.lua_repl & PROPAGATE_AOF)
|
||||
@ -622,9 +627,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
reply = sdsnewlen(c->buf,c->bufpos);
|
||||
c->bufpos = 0;
|
||||
while(listLength(c->reply)) {
|
||||
clientReplyBlock *o = listNodeValue(listFirst(c->reply));
|
||||
clientReplyBlock *o = (clientReplyBlock*)listNodeValue(listFirst(c->reply));
|
||||
|
||||
reply = sdscatlen(reply,o->buf,o->used);
|
||||
reply = sdscatlen(reply,o->buf(),o->used);
|
||||
listDelNode(c->reply,listFirst(c->reply));
|
||||
}
|
||||
}
|
||||
@ -658,9 +663,9 @@ cleanup:
|
||||
o->refcount == 1 &&
|
||||
(o->encoding == OBJ_ENCODING_RAW ||
|
||||
o->encoding == OBJ_ENCODING_EMBSTR) &&
|
||||
sdslen(ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN)
|
||||
sdslen((sds)ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN)
|
||||
{
|
||||
sds s = ptrFromObj(o);
|
||||
sds s = (sds)ptrFromObj(o);
|
||||
if (cached_objects[j]) decrRefCount(cached_objects[j]);
|
||||
cached_objects[j] = o;
|
||||
cached_objects_len[j] = sdsalloc(s);
|
||||
@ -724,7 +729,7 @@ int luaRedisSha1hexCommand(lua_State *lua) {
|
||||
* return redis.error_reply("ERR Some Error")
|
||||
* return redis.status_reply("ERR Some Error")
|
||||
*/
|
||||
int luaRedisReturnSingleFieldTable(lua_State *lua, char *field) {
|
||||
int luaRedisReturnSingleFieldTable(lua_State *lua, const char *field) {
|
||||
if (lua_gettop(lua) != 1 || lua_type(lua,-1) != LUA_TSTRING) {
|
||||
luaPushError(lua, "wrong number or type of arguments");
|
||||
return 1;
|
||||
@ -870,10 +875,12 @@ void luaLoadLib(lua_State *lua, const char *libname, lua_CFunction luafunc) {
|
||||
lua_call(lua, 1, 0);
|
||||
}
|
||||
|
||||
extern "C" {
|
||||
LUALIB_API int (luaopen_cjson) (lua_State *L);
|
||||
LUALIB_API int (luaopen_struct) (lua_State *L);
|
||||
LUALIB_API int (luaopen_cmsgpack) (lua_State *L);
|
||||
LUALIB_API int (luaopen_bit) (lua_State *L);
|
||||
}
|
||||
|
||||
void luaLoadLibraries(lua_State *lua) {
|
||||
luaLoadLib(lua, "", luaopen_base);
|
||||
@ -907,7 +914,7 @@ void luaRemoveUnsupportedFunctions(lua_State *lua) {
|
||||
* It should be the last to be called in the scripting engine initialization
|
||||
* sequence, because it may interact with creation of globals. */
|
||||
void scriptingEnableGlobalsProtection(lua_State *lua) {
|
||||
char *s[32];
|
||||
const char *s[32];
|
||||
sds code = sdsempty();
|
||||
int j = 0;
|
||||
|
||||
@ -1075,7 +1082,7 @@ void scriptingInit(int setup) {
|
||||
/* Add a helper function that we use to sort the multi bulk output of non
|
||||
* deterministic commands, when containing 'false' elements. */
|
||||
{
|
||||
char *compare_func = "function __redis__compare_helper(a,b)\n"
|
||||
const char *compare_func = "function __redis__compare_helper(a,b)\n"
|
||||
" if a == false then a = '' end\n"
|
||||
" if b == false then b = '' end\n"
|
||||
" return a<b\n"
|
||||
@ -1089,7 +1096,7 @@ void scriptingInit(int setup) {
|
||||
* information about the caller, that's what makes sense from the point
|
||||
* of view of the user debugging a script. */
|
||||
{
|
||||
char *errh_func = "local dbg = debug\n"
|
||||
const char *errh_func = "local dbg = debug\n"
|
||||
"function __redis__err__handler(err)\n"
|
||||
" local i = dbg.getinfo(2,'nSl')\n"
|
||||
" if i and i.what == 'C' then\n"
|
||||
@ -1137,12 +1144,12 @@ void scriptingReset(void) {
|
||||
|
||||
/* Set an array of Redis String Objects as a Lua array (table) stored into a
|
||||
* global variable. */
|
||||
void luaSetGlobalArray(lua_State *lua, char *var, robj **elev, int elec) {
|
||||
void luaSetGlobalArray(lua_State *lua, const char *var, robj **elev, int elec) {
|
||||
int j;
|
||||
|
||||
lua_newtable(lua);
|
||||
for (j = 0; j < elec; j++) {
|
||||
lua_pushlstring(lua,(char*)ptrFromObj(elev[j]),sdslen(ptrFromObj(elev[j])));
|
||||
lua_pushlstring(lua,(char*)ptrFromObj(elev[j]),sdslen((sds)ptrFromObj(elev[j])));
|
||||
lua_rawseti(lua,-2,j+1);
|
||||
}
|
||||
lua_setglobal(lua,var);
|
||||
@ -1218,19 +1225,19 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) {
|
||||
|
||||
funcname[0] = 'f';
|
||||
funcname[1] = '_';
|
||||
sha1hex(funcname+2,ptrFromObj(body),sdslen(ptrFromObj(body)));
|
||||
sha1hex(funcname+2,(char*)ptrFromObj(body),sdslen((sds)ptrFromObj(body)));
|
||||
|
||||
sds sha = sdsnewlen(funcname+2,40);
|
||||
if ((de = dictFind(server.lua_scripts,sha)) != NULL) {
|
||||
sdsfree(sha);
|
||||
return dictGetKey(de);
|
||||
return (sds)dictGetKey(de);
|
||||
}
|
||||
|
||||
sds funcdef = sdsempty();
|
||||
funcdef = sdscat(funcdef,"function ");
|
||||
funcdef = sdscatlen(funcdef,funcname,42);
|
||||
funcdef = sdscatlen(funcdef,"() ",3);
|
||||
funcdef = sdscatlen(funcdef,ptrFromObj(body),sdslen(ptrFromObj(body)));
|
||||
funcdef = sdscatlen(funcdef,ptrFromObj(body),sdslen((sds)ptrFromObj(body)));
|
||||
funcdef = sdscatlen(funcdef,"\nend",4);
|
||||
|
||||
if (luaL_loadbuffer(lua,funcdef,sdslen(funcdef),"@user_script")) {
|
||||
@ -1334,11 +1341,11 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
funcname[1] = '_';
|
||||
if (!evalsha) {
|
||||
/* Hash the code if this is an EVAL call */
|
||||
sha1hex(funcname+2,ptrFromObj(c->argv[1]),sdslen(ptrFromObj(c->argv[1])));
|
||||
sha1hex(funcname+2,(char*)ptrFromObj(c->argv[1]),sdslen((sds)ptrFromObj(c->argv[1])));
|
||||
} else {
|
||||
/* We already have the SHA if it is a EVALSHA */
|
||||
int j;
|
||||
char *sha = ptrFromObj(c->argv[1]);
|
||||
char *sha = (char*)ptrFromObj(c->argv[1]);
|
||||
|
||||
/* Convert to lowercase. We don't use tolower since the function
|
||||
* managed to always show up in the profiler output consuming
|
||||
@ -1470,13 +1477,13 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
* flush our cache of scripts that can be replicated as EVALSHA, while
|
||||
* for AOF we need to do so every time we rewrite the AOF file. */
|
||||
if (evalsha && !server.lua_replicate_commands) {
|
||||
if (!replicationScriptCacheExists(ptrFromObj(c->argv[1]))) {
|
||||
if (!replicationScriptCacheExists((sds)ptrFromObj(c->argv[1]))) {
|
||||
/* This script is not in our script cache, replicate it as
|
||||
* EVAL, then add it into the script cache, as from now on
|
||||
* slaves and AOF know about it. */
|
||||
robj *script = dictFetchValue(server.lua_scripts,ptrFromObj(c->argv[1]));
|
||||
robj *script = (robj*)dictFetchValue(server.lua_scripts,ptrFromObj(c->argv[1]));
|
||||
|
||||
replicationScriptCacheAdd(ptrFromObj(c->argv[1]));
|
||||
replicationScriptCacheAdd((sds)ptrFromObj(c->argv[1]));
|
||||
serverAssertWithInfo(c,NULL,script != NULL);
|
||||
|
||||
/* If the script did not produce any changes in the dataset we want
|
||||
@ -1506,7 +1513,7 @@ void evalCommand(client *c) {
|
||||
}
|
||||
|
||||
void evalShaCommand(client *c) {
|
||||
if (sdslen(ptrFromObj(c->argv[1])) != 40) {
|
||||
if (sdslen((sds)ptrFromObj(c->argv[1])) != 40) {
|
||||
/* We know that a match is not possible if the provided SHA is
|
||||
* not the right length. So we return an error ASAP, this way
|
||||
* evalGenericCommand() can be implemented without string length
|
||||
@ -1523,7 +1530,7 @@ void evalShaCommand(client *c) {
|
||||
}
|
||||
|
||||
void scriptCommand(client *c) {
|
||||
if (c->argc == 2 && !strcasecmp(ptrFromObj(c->argv[1]),"help")) {
|
||||
if (c->argc == 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"help")) {
|
||||
const char *help[] = {
|
||||
"DEBUG (yes|sync|no) -- Set the debug mode for subsequent scripts executed.",
|
||||
"EXISTS <sha1> [<sha1> ...] -- Return information about the existence of the scripts in the script cache.",
|
||||
@ -1533,12 +1540,12 @@ void scriptCommand(client *c) {
|
||||
NULL
|
||||
};
|
||||
addReplyHelp(c, help);
|
||||
} else if (c->argc == 2 && !strcasecmp(ptrFromObj(c->argv[1]),"flush")) {
|
||||
} else if (c->argc == 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"flush")) {
|
||||
scriptingReset();
|
||||
addReply(c,shared.ok);
|
||||
replicationScriptCacheFlush();
|
||||
server.dirty++; /* Propagating this command is a good idea. */
|
||||
} else if (c->argc >= 2 && !strcasecmp(ptrFromObj(c->argv[1]),"exists")) {
|
||||
} else if (c->argc >= 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"exists")) {
|
||||
int j;
|
||||
|
||||
addReplyArrayLen(c, c->argc-2);
|
||||
@ -1548,12 +1555,12 @@ NULL
|
||||
else
|
||||
addReply(c,shared.czero);
|
||||
}
|
||||
} else if (c->argc == 3 && !strcasecmp(ptrFromObj(c->argv[1]),"load")) {
|
||||
} else if (c->argc == 3 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"load")) {
|
||||
sds sha = luaCreateFunction(c,server.lua,c->argv[2]);
|
||||
if (sha == NULL) return; /* The error was sent by luaCreateFunction(). */
|
||||
addReplyBulkCBuffer(c,sha,40);
|
||||
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
|
||||
} else if (c->argc == 2 && !strcasecmp(ptrFromObj(c->argv[1]),"kill")) {
|
||||
} else if (c->argc == 2 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"kill")) {
|
||||
if (server.lua_caller == NULL) {
|
||||
addReplySds(c,sdsnew("-NOTBUSY No scripts in execution right now.\r\n"));
|
||||
} else if (server.lua_caller->flags & CLIENT_MASTER) {
|
||||
@ -1564,18 +1571,18 @@ NULL
|
||||
server.lua_kill = 1;
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
} else if (c->argc == 3 && !strcasecmp(ptrFromObj(c->argv[1]),"debug")) {
|
||||
if (clientHasPendingReplies(c, TRUE)) {
|
||||
} else if (c->argc == 3 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"debug")) {
|
||||
if (clientHasPendingReplies(c)) {
|
||||
addReplyError(c,"SCRIPT DEBUG must be called outside a pipeline");
|
||||
return;
|
||||
}
|
||||
if (!strcasecmp(ptrFromObj(c->argv[2]),"no")) {
|
||||
if (!strcasecmp((const char*)ptrFromObj(c->argv[2]),"no")) {
|
||||
ldbDisable(c);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(ptrFromObj(c->argv[2]),"yes")) {
|
||||
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[2]),"yes")) {
|
||||
ldbEnable(c);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(ptrFromObj(c->argv[2]),"sync")) {
|
||||
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[2]),"sync")) {
|
||||
ldbEnable(c);
|
||||
addReply(c,shared.ok);
|
||||
c->flags |= CLIENT_LUA_DEBUG_SYNC;
|
||||
@ -1666,8 +1673,8 @@ void ldbSendLogs(void) {
|
||||
while(listLength(ldb.logs)) {
|
||||
listNode *ln = listFirst(ldb.logs);
|
||||
proto = sdscatlen(proto,"+",1);
|
||||
sdsmapchars(ln->value,"\r\n"," ",2);
|
||||
proto = sdscatsds(proto,ln->value);
|
||||
sdsmapchars((sds)ln->value,"\r\n"," ",2);
|
||||
proto = sdscatsds(proto,(sds)ln->value);
|
||||
proto = sdscatlen(proto,"\r\n",2);
|
||||
listDelNode(ldb.logs,ln);
|
||||
}
|
||||
@ -1730,7 +1737,7 @@ int ldbStartSession(client *c) {
|
||||
|
||||
/* First argument of EVAL is the script itself. We split it into different
|
||||
* lines since this is the way the debugger accesses the source code. */
|
||||
sds srcstring = sdsdup(ptrFromObj(c->argv[1]));
|
||||
sds srcstring = sdsdup((sds)ptrFromObj(c->argv[1]));
|
||||
size_t srclen = sdslen(srcstring);
|
||||
while(srclen && (srcstring[srclen-1] == '\n' ||
|
||||
srcstring[srclen-1] == '\r'))
|
||||
@ -1820,7 +1827,7 @@ void evalGenericCommandWithDebugging(client *c, int evalsha) {
|
||||
|
||||
/* Return a pointer to ldb.src source code line, considering line to be
|
||||
* one-based, and returning a special string for out of range lines. */
|
||||
char *ldbGetSourceLine(int line) {
|
||||
const char *ldbGetSourceLine(int line) {
|
||||
int idx = line-1;
|
||||
if (idx < 0 || idx >= ldb.lines) return "<out of range source code line>";
|
||||
return ldb.src[idx];
|
||||
@ -1868,6 +1875,7 @@ int ldbDelBreakpoint(int line) {
|
||||
sds *ldbReplParseCommand(int *argcp) {
|
||||
sds *argv = NULL;
|
||||
int argc = 0;
|
||||
char *plen = NULL;
|
||||
if (sdslen(ldb.cbuf) == 0) return NULL;
|
||||
|
||||
/* Working on a copy is simpler in this case. We can modify it freely
|
||||
@ -1881,14 +1889,14 @@ sds *ldbReplParseCommand(int *argcp) {
|
||||
|
||||
/* Seek and parse *<count>\r\n. */
|
||||
p = strchr(p,'*'); if (!p) goto protoerr;
|
||||
char *plen = p+1; /* Multi bulk len pointer. */
|
||||
plen = p+1; /* Multi bulk len pointer. */
|
||||
p = strstr(p,"\r\n"); if (!p) goto protoerr;
|
||||
*p = '\0'; p += 2;
|
||||
*argcp = atoi(plen);
|
||||
if (*argcp <= 0 || *argcp > 1024) goto protoerr;
|
||||
|
||||
/* Parse each argument. */
|
||||
argv = zmalloc(sizeof(sds)*(*argcp), MALLOC_LOCAL);
|
||||
argv = (sds*)zmalloc(sizeof(sds)*(*argcp), MALLOC_LOCAL);
|
||||
argc = 0;
|
||||
while(argc < *argcp) {
|
||||
if (*p != '$') goto protoerr;
|
||||
@ -1913,8 +1921,8 @@ protoerr:
|
||||
|
||||
/* Log the specified line in the Lua debugger output. */
|
||||
void ldbLogSourceLine(int lnum) {
|
||||
char *line = ldbGetSourceLine(lnum);
|
||||
char *prefix;
|
||||
const char *line = ldbGetSourceLine(lnum);
|
||||
const char *prefix;
|
||||
int bp = ldbIsBreakpoint(lnum);
|
||||
int current = ldb.currentline == lnum;
|
||||
|
||||
@ -2020,12 +2028,12 @@ sds ldbCatStackValueRec(sds s, lua_State *lua, int idx, int level) {
|
||||
case LUA_TLIGHTUSERDATA:
|
||||
{
|
||||
const void *p = lua_topointer(lua,idx);
|
||||
char *typename = "unknown";
|
||||
if (t == LUA_TFUNCTION) typename = "function";
|
||||
else if (t == LUA_TUSERDATA) typename = "userdata";
|
||||
else if (t == LUA_TTHREAD) typename = "thread";
|
||||
else if (t == LUA_TLIGHTUSERDATA) typename = "light-userdata";
|
||||
s = sdscatprintf(s,"\"%s@%p\"",typename,p);
|
||||
const char *tname = "unknown";
|
||||
if (t == LUA_TFUNCTION) tname = "function";
|
||||
else if (t == LUA_TUSERDATA) tname = "userdata";
|
||||
else if (t == LUA_TTHREAD) tname = "thread";
|
||||
else if (t == LUA_TLIGHTUSERDATA) tname = "light-userdata";
|
||||
s = sdscatprintf(s,"\"%s@%p\"",tname,p);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -2044,7 +2052,7 @@ sds ldbCatStackValue(sds s, lua_State *lua, int idx) {
|
||||
/* Produce a debugger log entry representing the value of the Lua object
|
||||
* currently on the top of the stack. The element is ot popped nor modified.
|
||||
* Check ldbCatStackValue() for the actual implementation. */
|
||||
void ldbLogStackValue(lua_State *lua, char *prefix) {
|
||||
void ldbLogStackValue(lua_State *lua, const char *prefix) {
|
||||
sds s = sdsnew(prefix);
|
||||
s = ldbCatStackValue(s,lua,-1);
|
||||
ldbLogWithMaxLen(s);
|
||||
@ -2466,7 +2474,7 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
|
||||
}
|
||||
|
||||
if (ldb.step || bp) {
|
||||
char *reason = "step over";
|
||||
const char *reason = "step over";
|
||||
if (bp) reason = ldb.luabp ? "redis.breakpoint() called" :
|
||||
"break point";
|
||||
else if (timeout) reason = "timeout reached, infinite loop?";
|
||||
|
12
src/server.c
12
src/server.c
@ -1666,12 +1666,15 @@ void clientsCron(int iel) {
|
||||
c = listNodeValue(head);
|
||||
if (c->iel == iel)
|
||||
{
|
||||
fastlock_lock(&c->lock);
|
||||
/* The following functions do different service checks on the client.
|
||||
* The protocol is that they return non-zero if the client was
|
||||
* terminated. */
|
||||
if (clientsCronHandleTimeout(c,now)) continue;
|
||||
if (clientsCronResizeQueryBuffer(c)) continue;
|
||||
if (clientsCronTrackExpansiveClients(c)) continue;
|
||||
if (clientsCronHandleTimeout(c,now)) goto LContinue;
|
||||
if (clientsCronResizeQueryBuffer(c)) goto LContinue;
|
||||
if (clientsCronTrackExpansiveClients(c)) goto LContinue;
|
||||
LContinue:
|
||||
fastlock_unlock(&c->lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3135,6 +3138,7 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
|
||||
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
|
||||
int flags)
|
||||
{
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
|
||||
feedAppendOnlyFile(cmd,dbid,argv,argc);
|
||||
if (flags & PROPAGATE_REPL)
|
||||
@ -5034,7 +5038,7 @@ int main(int argc, char **argv) {
|
||||
|
||||
initServer();
|
||||
|
||||
server.cthreads = 4; //testing
|
||||
server.cthreads = 2; //testing
|
||||
initNetworking(1 /* fReusePort */);
|
||||
|
||||
if (background || server.pidfile) createPidFile();
|
||||
|
10
src/server.h
10
src/server.h
@ -49,7 +49,13 @@
|
||||
#include <pthread.h>
|
||||
#include <syslog.h>
|
||||
#include <netinet/in.h>
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#include <lua.h>
|
||||
}
|
||||
#else
|
||||
#include <lua.h>
|
||||
#endif
|
||||
#include <signal.h>
|
||||
|
||||
typedef long long mstime_t; /* millisecond time type. */
|
||||
@ -869,8 +875,6 @@ typedef struct client {
|
||||
int bufposAsync;
|
||||
int buflenAsync;
|
||||
char *bufAsync;
|
||||
/* Async Done Buffer, moved after a thread is done async writing */
|
||||
list *listbufferDoneAsync;
|
||||
|
||||
int iel; /* the event loop index we're registered with */
|
||||
struct fastlock lock;
|
||||
@ -1621,7 +1625,7 @@ void pauseClients(mstime_t duration);
|
||||
int clientsArePaused(void);
|
||||
int processEventsWhileBlocked(int iel);
|
||||
int handleClientsWithPendingWrites(int iel);
|
||||
int clientHasPendingReplies(client *c, int fIncludeAsync);
|
||||
int clientHasPendingReplies(client *c);
|
||||
void unlinkClient(client *c);
|
||||
int writeToClient(int fd, client *c, int handler_installed);
|
||||
void linkClient(client *c);
|
||||
|
@ -7,6 +7,10 @@ By Steve Reid <steve@edmweb.com>
|
||||
100% Public Domain
|
||||
*/
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
uint32_t state[5];
|
||||
uint32_t count[2];
|
||||
@ -21,4 +25,9 @@ void SHA1Final(unsigned char digest[20], SHA1_CTX* context);
|
||||
#ifdef REDIS_TEST
|
||||
int sha1Test(int argc, char **argv);
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
@ -680,6 +680,7 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb
|
||||
} else {
|
||||
/* BRPOPLPUSH failed because of wrong
|
||||
* destination type. */
|
||||
fastlock_unlock(&receiver->lock);
|
||||
return C_ERR;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user