Lock use after free
This commit is contained in:
parent
174e9fe809
commit
dbd82f6bfc
@ -161,10 +161,12 @@ void queueClientForReprocessing(client *c) {
|
||||
/* The client may already be into the unblocked list because of a previous
|
||||
* blocking operation, don't add back it into the list multiple times. */
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
fastlock_lock(&c->lock);
|
||||
if (!(c->flags & CLIENT_UNBLOCKED)) {
|
||||
c->flags |= CLIENT_UNBLOCKED;
|
||||
listAddNodeTail(server.rgthreadvar[c->iel].unblocked_clients,c);
|
||||
}
|
||||
fastlock_unlock(&c->lock);
|
||||
}
|
||||
|
||||
/* Unblock a client calling the right function depending on the kind
|
||||
@ -258,6 +260,7 @@ void disconnectAllBlockedClients(void) {
|
||||
* be used only for a single type, like virtually any Redis application will
|
||||
* do, the function is already fair. */
|
||||
void handleClientsBlockedOnKeys(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
while(listLength(server.ready_keys) != 0) {
|
||||
list *l;
|
||||
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include <sys/types.h>
|
||||
#include <sched.h>
|
||||
#include <atomic>
|
||||
#include <assert.h>
|
||||
|
||||
/****************************************************
|
||||
*
|
||||
@ -56,6 +57,7 @@ extern "C" void fastlock_init(struct fastlock *lock)
|
||||
lock->m_ticket.m_active = 0;
|
||||
lock->m_ticket.m_avail = 0;
|
||||
lock->m_depth = 0;
|
||||
lock->m_pidOwner = -1;
|
||||
}
|
||||
|
||||
extern "C" void fastlock_lock(struct fastlock *lock)
|
||||
@ -111,6 +113,7 @@ extern "C" void fastlock_unlock(struct fastlock *lock)
|
||||
--lock->m_depth;
|
||||
if (lock->m_depth == 0)
|
||||
{
|
||||
assert((int)__atomic_load_4(&lock->m_pidOwner, __ATOMIC_RELAXED) >= 0); // unlock after free
|
||||
lock->m_pidOwner = -1;
|
||||
std::atomic_thread_fence(std::memory_order_acquire);
|
||||
__atomic_fetch_add(&lock->m_ticket.m_active, 1, __ATOMIC_ACQ_REL);
|
||||
@ -120,7 +123,9 @@ extern "C" void fastlock_unlock(struct fastlock *lock)
|
||||
extern "C" void fastlock_free(struct fastlock *lock)
|
||||
{
|
||||
// NOP
|
||||
(void)lock;
|
||||
assert((lock->m_ticket.m_active == lock->m_ticket.m_avail) // Asser the lock is unlocked
|
||||
|| (lock->m_pidOwner == gettid() && (lock->m_ticket.m_active == lock->m_ticket.m_avail-1))); // OR we own the lock and nobody else is waiting
|
||||
lock->m_pidOwner = -2; // sentinal value indicating free
|
||||
}
|
||||
|
||||
|
||||
|
@ -3642,6 +3642,7 @@ void moduleHandleBlockedClients(void) {
|
||||
if (c)
|
||||
{
|
||||
AssertCorrectThread(c);
|
||||
fastlock_lock(&c->lock);
|
||||
}
|
||||
|
||||
/* Release the lock during the loop, as long as we don't
|
||||
@ -3708,6 +3709,7 @@ void moduleHandleBlockedClients(void) {
|
||||
/* Free 'bc' only after unblocking the client, since it is
|
||||
* referenced in the client blocking context, and must be valid
|
||||
* when calling unblockClient(). */
|
||||
fastlock_unlock(&c->lock);
|
||||
zfree(bc);
|
||||
|
||||
/* Lock again before to iterate the loop. */
|
||||
|
@ -55,12 +55,23 @@ public:
|
||||
{
|
||||
serverAssert(!m_fArmed);
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
|
||||
bool fClientLocked = true;
|
||||
while (!aeTryAcquireLock())
|
||||
{
|
||||
c->lock.unlock();
|
||||
// give a chance for the global lock to progress if they were waiting on the client
|
||||
c->lock.lock();
|
||||
if (fClientLocked) c->lock.unlock();
|
||||
fClientLocked = false;
|
||||
aeAcquireLock();
|
||||
if (!c->lock.try_lock())
|
||||
{
|
||||
aeReleaseLock();
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
m_fArmed = true;
|
||||
}
|
||||
else if (!m_fArmed)
|
||||
@ -239,6 +250,7 @@ void clientInstallWriteHandler(client *c) {
|
||||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
|
||||
{
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
/* Here instead of installing the write handler, we just flag the
|
||||
* client and put it into a list of clients that have something
|
||||
* to write to the socket. This way before re-entering the event
|
||||
@ -324,6 +336,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) {
|
||||
int minsize = len + c->bufposAsync;
|
||||
c->buflenAsync = std::max(minsize, c->buflenAsync*2 - c->buflenAsync);
|
||||
c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL);
|
||||
c->buflenAsync = zmalloc_usable(c->bufAsync);
|
||||
}
|
||||
memcpy(c->bufAsync+c->bufposAsync,s,len);
|
||||
c->bufposAsync += len;
|
||||
@ -1185,6 +1198,7 @@ void unlinkClient(client *c) {
|
||||
listNode *ln;
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
|
||||
/* If this is marked as current client unset it. */
|
||||
if (server.current_client == c) server.current_client = NULL;
|
||||
@ -1227,15 +1241,17 @@ void unlinkClient(client *c) {
|
||||
|
||||
if (c->fPendingAsyncWrite) {
|
||||
ln = NULL;
|
||||
int iel = 0;
|
||||
for (; iel < server.cthreads; ++iel)
|
||||
bool fFound = false;
|
||||
for (int iel = 0; iel < server.cthreads; ++iel)
|
||||
{
|
||||
ln = listSearchKey(server.rgthreadvar[iel].clients_pending_asyncwrite,c);
|
||||
if (ln)
|
||||
break;
|
||||
{
|
||||
fFound = true;
|
||||
listDelNode(server.rgthreadvar[iel].clients_pending_asyncwrite,ln);
|
||||
}
|
||||
}
|
||||
serverAssert(ln != NULL);
|
||||
listDelNode(server.rgthreadvar[iel].clients_pending_asyncwrite,ln);
|
||||
serverAssert(fFound);
|
||||
c->fPendingAsyncWrite = FALSE;
|
||||
}
|
||||
}
|
||||
@ -1244,6 +1260,7 @@ void freeClient(client *c) {
|
||||
listNode *ln;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
AssertCorrectThread(c);
|
||||
std::unique_lock<decltype(c->lock)> ulock(c->lock);
|
||||
|
||||
/* If a client is protected, yet we need to free it right now, make sure
|
||||
* to at least use asynchronous freeing. */
|
||||
@ -1340,6 +1357,7 @@ void freeClient(client *c) {
|
||||
zfree(c->argv);
|
||||
freeClientMultiState(c);
|
||||
sdsfree(c->peerid);
|
||||
ulock.unlock();
|
||||
fastlock_free(&c->lock);
|
||||
zfree(c);
|
||||
}
|
||||
@ -1352,6 +1370,7 @@ void freeClientAsync(client *c) {
|
||||
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
|
||||
AeLocker lock;
|
||||
lock.arm(nullptr);
|
||||
std::lock_guard<decltype(c->lock)> clientlock(c->lock);
|
||||
c->flags |= CLIENT_CLOSE_ASAP;
|
||||
listAddNodeTail(server.clients_to_close,c);
|
||||
}
|
||||
@ -1456,6 +1475,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
} else {
|
||||
serverLog(LL_VERBOSE,
|
||||
"Error writing to client: %s", strerror(errno));
|
||||
lock.unlock();
|
||||
if (aeTryAcquireLock())
|
||||
{
|
||||
freeClient(c);
|
||||
@ -1483,6 +1503,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
|
||||
|
||||
/* Close connection after entire reply has been sent. */
|
||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
|
||||
lock.unlock();
|
||||
if (aeTryAcquireLock())
|
||||
{
|
||||
freeClient(c);
|
||||
@ -1574,7 +1595,7 @@ int handleClientsWithPendingWrites(int iel) {
|
||||
listRewind(list,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *c = (client*)listNodeValue(ln);
|
||||
std::lock_guard<decltype(c->lock)> lock(c->lock);
|
||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||
|
||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||
listDelNode(list,ln);
|
||||
@ -1585,7 +1606,10 @@ int handleClientsWithPendingWrites(int iel) {
|
||||
if (c->flags & CLIENT_PROTECTED) continue;
|
||||
|
||||
/* Try to write buffers to the client socket. */
|
||||
if (writeToClient(c->fd,c,0) == C_ERR) continue;
|
||||
if (writeToClient(c->fd,c,0) == C_ERR) {
|
||||
lock.release(); // client is free'd
|
||||
continue;
|
||||
}
|
||||
|
||||
/* If after the synchronous writes above we still have data to
|
||||
* output to the client, we need to install the writable handler. */
|
||||
@ -1956,14 +1980,15 @@ void processInputBuffer(client *c) {
|
||||
} else {
|
||||
serverPanic("Unknown request type");
|
||||
}
|
||||
AeLocker locker;
|
||||
locker.arm(c);
|
||||
server.current_client = c;
|
||||
|
||||
/* Multibulk processing could see a <= 0 length. */
|
||||
if (c->argc == 0) {
|
||||
resetClient(c);
|
||||
} else {
|
||||
AeLocker locker;
|
||||
locker.arm(c);
|
||||
server.current_client = c;
|
||||
|
||||
/* Only reset the client when the command was executed. */
|
||||
if (processCommand(c) == C_OK) {
|
||||
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
|
||||
@ -1982,6 +2007,7 @@ void processInputBuffer(client *c) {
|
||||
* result into a slave, that may be the active client, to be
|
||||
* freed. */
|
||||
if (server.current_client == NULL) break;
|
||||
server.current_client = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1990,8 +2016,6 @@ void processInputBuffer(client *c) {
|
||||
sdsrange(c->querybuf,c->qb_pos,-1);
|
||||
c->qb_pos = 0;
|
||||
}
|
||||
|
||||
server.current_client = NULL;
|
||||
}
|
||||
|
||||
/* This is a wrapper for processInputBuffer that also cares about handling
|
||||
@ -2058,13 +2082,15 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
return;
|
||||
} else {
|
||||
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
|
||||
aelock.arm(c);
|
||||
lock.unlock();
|
||||
aelock.arm(nullptr);
|
||||
freeClient(c);
|
||||
return;
|
||||
}
|
||||
} else if (nread == 0) {
|
||||
serverLog(LL_VERBOSE, "Client closed connection");
|
||||
aelock.arm(c);
|
||||
lock.unlock();
|
||||
aelock.arm(nullptr);
|
||||
freeClient(c);
|
||||
return;
|
||||
} else if (c->flags & CLIENT_MASTER) {
|
||||
@ -2086,7 +2112,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
|
||||
sdsfree(ci);
|
||||
sdsfree(bytes);
|
||||
aelock.arm(c);
|
||||
lock.unlock();
|
||||
aelock.arm(nullptr);
|
||||
freeClient(c);
|
||||
return;
|
||||
}
|
||||
@ -2098,7 +2125,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
* corresponding part of the replication stream, will be propagated to
|
||||
* the sub-slaves and to the replication backlog. */
|
||||
processInputBufferAndReplicate(c);
|
||||
aelock.arm(c);
|
||||
aelock.arm(nullptr);
|
||||
ProcessPendingAsyncWrites();
|
||||
}
|
||||
|
||||
|
@ -82,10 +82,10 @@ robj *createRawStringObject(const char *ptr, size_t len) {
|
||||
* an object where the sds string is actually an unmodifiable string
|
||||
* allocated in the same chunk as the object itself. */
|
||||
robj *createEmbeddedStringObject(const char *ptr, size_t len) {
|
||||
size_t alloclen = len;
|
||||
if (len < sizeof(void*))
|
||||
alloclen = sizeof(void*);
|
||||
robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+alloclen+1-sizeof(o->m_ptr), MALLOC_SHARED);
|
||||
size_t allocsize = sizeof(struct sdshdr8)+len+1;
|
||||
if (allocsize < sizeof(void*))
|
||||
allocsize = sizeof(void*);
|
||||
robj *o = zmalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
|
||||
struct sdshdr8 *sh = (void*)(&o->m_ptr);
|
||||
|
||||
o->type = OBJ_STRING;
|
||||
|
@ -188,13 +188,6 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
* master replication history and has the same backlog and offsets). */
|
||||
if (server.masterhost != NULL) return;
|
||||
|
||||
/* If the instance is not a top level master, return ASAP: we'll just proxy
|
||||
* the stream of data we receive from our master instead, in order to
|
||||
* propagate *identical* replication stream. In this way this slave can
|
||||
* advertise the same replication ID as the master (since it shares the
|
||||
* master replication history and has the same backlog and offsets). */
|
||||
if (server.masterhost != NULL) return;
|
||||
|
||||
/* If there aren't slaves, and there is no backlog buffer to populate,
|
||||
* we can return ASAP. */
|
||||
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
|
||||
@ -889,7 +882,7 @@ void putSlaveOnline(client *slave) {
|
||||
slave->replstate = SLAVE_STATE_ONLINE;
|
||||
slave->repl_put_online_on_ack = 0;
|
||||
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
||||
//AssertCorrectThread(slave);
|
||||
AssertCorrectThread(slave);
|
||||
if (aeCreateFileEvent(server.rgthreadvar[slave->iel].el, slave->fd, AE_WRITABLE|AE_WRITE_THREADSAFE,
|
||||
sendReplyToClient, slave) == AE_ERR) {
|
||||
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
|
||||
@ -2042,7 +2035,12 @@ void replicationUnsetMaster(void) {
|
||||
* used as secondary ID up to the current offset, and a new replication
|
||||
* ID is created to continue with a new replication history. */
|
||||
shiftReplicationId();
|
||||
if (server.master) freeClientAsync(server.master);
|
||||
if (server.master) {
|
||||
if (FCorrectThread(server.master))
|
||||
freeClient(server.master);
|
||||
else
|
||||
freeClientAsync(server.master);
|
||||
}
|
||||
replicationDiscardCachedMaster();
|
||||
cancelReplicationHandshake();
|
||||
/* Disconnecting all the slaves is required: we need to inform slaves
|
||||
@ -2216,6 +2214,7 @@ void replicationCacheMaster(client *c) {
|
||||
serverAssert(server.master != NULL && server.cached_master == NULL);
|
||||
serverLog(LL_NOTICE,"Caching the disconnected master state.");
|
||||
AssertCorrectThread(c);
|
||||
std::lock_guard<decltype(c->lock)> clientlock(c->lock);
|
||||
|
||||
/* Unlink the client from the server structures. */
|
||||
unlinkClient(c);
|
||||
@ -2265,6 +2264,7 @@ void replicationCacheMasterUsingMyself(void) {
|
||||
* the new master will start its replication stream with SELECT. */
|
||||
server.master_initial_offset = server.master_repl_offset;
|
||||
replicationCreateMasterClient(-1,-1);
|
||||
std::lock_guard<decltype(server.master->lock)> lock(server.master->lock);
|
||||
|
||||
/* Use our own ID / offset. */
|
||||
memcpy(server.master->replid, server.replid, sizeof(server.replid));
|
||||
@ -2283,7 +2283,10 @@ void replicationDiscardCachedMaster(void) {
|
||||
|
||||
serverLog(LL_NOTICE,"Discarding previously cached master state.");
|
||||
server.cached_master->flags &= ~CLIENT_MASTER;
|
||||
freeClientAsync(server.cached_master);
|
||||
if (FCorrectThread(server.cached_master))
|
||||
freeClient(server.cached_master);
|
||||
else
|
||||
freeClientAsync(server.cached_master);
|
||||
server.cached_master = NULL;
|
||||
}
|
||||
|
||||
@ -2705,7 +2708,10 @@ void replicationCron(void) {
|
||||
{
|
||||
serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
|
||||
replicationGetSlaveName(slave));
|
||||
freeClientAsync(slave);
|
||||
if (FCorrectThread(slave))
|
||||
freeClient(slave);
|
||||
else
|
||||
freeClientAsync(slave);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
15
src/server.c
15
src/server.c
@ -2109,9 +2109,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
flushAppendOnlyFile(0);
|
||||
|
||||
/* Handle writes with pending output buffers. */
|
||||
aeReleaseLock();
|
||||
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN);
|
||||
aeAcquireLock();
|
||||
|
||||
/* Before we are going to sleep, let the threads access the dataset by
|
||||
* releasing the GIL. Redis main thread will not touch anything at this
|
||||
@ -2128,10 +2126,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
||||
if (listLength(server.rgthreadvar[iel].unblocked_clients)) {
|
||||
processUnblockedClients(iel);
|
||||
}
|
||||
aeReleaseLock();
|
||||
|
||||
/* Handle writes with pending output buffers. */
|
||||
handleClientsWithPendingWrites(iel);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
/* This function is called immadiately after the event loop multiplexing
|
||||
@ -4065,6 +4063,7 @@ sds genRedisInfoString(char *section) {
|
||||
bytesToHuman(maxmemory_hmem,server.maxmemory);
|
||||
|
||||
if (sections++) info = sdscat(info,"\r\n");
|
||||
serverLog(LL_WARNING, "OOM max sent used_memory: %zu", zmalloc_used);
|
||||
info = sdscatprintf(info,
|
||||
"# Memory\r\n"
|
||||
"used_memory:%zu\r\n"
|
||||
@ -4499,6 +4498,7 @@ void infoCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
addReplyBulkSds(c, genRedisInfoString(section));
|
||||
serverLog(LL_WARNING, "OOM max info command %zu", zmalloc_used_memory());
|
||||
}
|
||||
|
||||
void monitorCommand(client *c) {
|
||||
@ -5038,8 +5038,8 @@ int main(int argc, char **argv) {
|
||||
|
||||
initServer();
|
||||
|
||||
server.cthreads = 2; //testing
|
||||
initNetworking(1 /* fReusePort */);
|
||||
server.cthreads = 1; //testing
|
||||
initNetworking(0 /* fReusePort */);
|
||||
|
||||
if (background || server.pidfile) createPidFile();
|
||||
redisSetProcTitle(argv[0]);
|
||||
@ -5080,12 +5080,11 @@ int main(int argc, char **argv) {
|
||||
|
||||
serverAssert(server.cthreads > 0 && server.cthreads <= MAX_EVENT_LOOPS);
|
||||
pthread_t rgthread[MAX_EVENT_LOOPS];
|
||||
for (int iel = 0; iel < server.cthreads; ++iel)
|
||||
for (int iel = 1; iel < server.cthreads; ++iel)
|
||||
{
|
||||
pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel));
|
||||
}
|
||||
void *pretT;
|
||||
pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pretT);
|
||||
workerThreadMain((void*)((int64_t)IDX_EVENT_LOOP_MAIN));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -74,6 +74,10 @@ void zlibc_free(void *ptr) {
|
||||
#define free(ptr) je_free(ptr)
|
||||
#define mallocx(size,flags) je_mallocx(size,flags)
|
||||
#define dallocx(ptr,flags) je_dallocx(ptr,flags)
|
||||
#else
|
||||
#define malloc(size, type) malloc(size)
|
||||
#define calloc(count,size,type) calloc(count,size)
|
||||
#define realloc(ptr,size,type) realloc(ptr,size)
|
||||
#endif
|
||||
|
||||
#define update_zmalloc_stat_alloc(__n) do { \
|
||||
|
@ -40,9 +40,6 @@
|
||||
#define ZMALLOC_LIB ("memkind")
|
||||
#undef USE_JEMALLOC
|
||||
#define USE_MALLOC_CLASS 1
|
||||
// Even though memkind supports malloc_usable_size we don't use it for performance reasons
|
||||
//#define HAVE_MALLOC_SIZE 0
|
||||
//#define zmalloc_size(p) salloc_usable_size(p)
|
||||
#elif defined(USE_TCMALLOC)
|
||||
#define ZMALLOC_LIB ("tcmalloc-" __xstr(TC_VERSION_MAJOR) "." __xstr(TC_VERSION_MINOR))
|
||||
#include <google/tcmalloc.h>
|
||||
|
Loading…
x
Reference in New Issue
Block a user