Removed more uses of fSynchronous and the use of condition variable and mutex on the control struct.
Former-commit-id: 6ab08cc3e1429178b26b55ed7aa8ba85240eb766
This commit is contained in:
parent
2addbe7e4a
commit
662037e3a3
51
src/ae.cpp
51
src/ae.cpp
@ -109,13 +109,6 @@ enum class AE_ASYNC_OP
|
|||||||
CreateFileEvent,
|
CreateFileEvent,
|
||||||
};
|
};
|
||||||
|
|
||||||
struct aeCommandControl
|
|
||||||
{
|
|
||||||
std::condition_variable cv;
|
|
||||||
std::atomic<int> rval;
|
|
||||||
std::mutex mutexcv;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct aeCommand
|
struct aeCommand
|
||||||
{
|
{
|
||||||
AE_ASYNC_OP op;
|
AE_ASYNC_OP op;
|
||||||
@ -128,7 +121,6 @@ struct aeCommand
|
|||||||
std::function<void()> *pfn;
|
std::function<void()> *pfn;
|
||||||
};
|
};
|
||||||
void *clientData;
|
void *clientData;
|
||||||
aeCommandControl *pctl;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
||||||
@ -149,19 +141,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case AE_ASYNC_OP::CreateFileEvent:
|
case AE_ASYNC_OP::CreateFileEvent:
|
||||||
{
|
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
|
||||||
if (cmd.pctl != nullptr)
|
|
||||||
{
|
|
||||||
cmd.pctl->mutexcv.lock();
|
|
||||||
std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData));
|
|
||||||
cmd.pctl->cv.notify_all();
|
|
||||||
cmd.pctl->mutexcv.unlock();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case AE_ASYNC_OP::PostFunction:
|
case AE_ASYNC_OP::PostFunction:
|
||||||
@ -175,19 +155,11 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
|
|||||||
|
|
||||||
case AE_ASYNC_OP::PostCppFunction:
|
case AE_ASYNC_OP::PostCppFunction:
|
||||||
{
|
{
|
||||||
if (cmd.pctl != nullptr)
|
|
||||||
cmd.pctl->mutexcv.lock();
|
|
||||||
|
|
||||||
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
||||||
if (cmd.fLock)
|
if (cmd.fLock)
|
||||||
ulock.lock();
|
ulock.lock();
|
||||||
(*cmd.pfn)();
|
(*cmd.pfn)();
|
||||||
|
|
||||||
if (cmd.pctl != nullptr)
|
|
||||||
{
|
|
||||||
cmd.pctl->cv.notify_all();
|
|
||||||
cmd.pctl->mutexcv.unlock();
|
|
||||||
}
|
|
||||||
delete cmd.pfn;
|
delete cmd.pfn;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -226,7 +198,7 @@ ssize_t safe_write(int fd, const void *pv, size_t cb)
|
|||||||
}
|
}
|
||||||
|
|
||||||
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
||||||
aeFileProc *proc, void *clientData, int fSynchronous)
|
aeFileProc *proc, void *clientData)
|
||||||
{
|
{
|
||||||
if (eventLoop == g_eventLoopThisThread)
|
if (eventLoop == g_eventLoopThisThread)
|
||||||
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
|
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
|
||||||
@ -239,13 +211,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
cmd.mask = mask;
|
cmd.mask = mask;
|
||||||
cmd.fproc = proc;
|
cmd.fproc = proc;
|
||||||
cmd.clientData = clientData;
|
cmd.clientData = clientData;
|
||||||
cmd.pctl = nullptr;
|
|
||||||
cmd.fLock = true;
|
cmd.fLock = true;
|
||||||
if (fSynchronous)
|
|
||||||
{
|
|
||||||
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
|
|
||||||
cmd.pctl->mutexcv.lock();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
if (size != sizeof(cmd))
|
if (size != sizeof(cmd))
|
||||||
@ -254,16 +220,6 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
serverAssert(errno == EAGAIN);
|
serverAssert(errno == EAGAIN);
|
||||||
ret = AE_ERR;
|
ret = AE_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fSynchronous)
|
|
||||||
{
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
|
|
||||||
cmd.pctl->cv.wait(ulock);
|
|
||||||
ret = cmd.pctl->rval;
|
|
||||||
}
|
|
||||||
delete cmd.pctl;
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -297,7 +253,6 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock,
|
|||||||
aeCommand cmd = {};
|
aeCommand cmd = {};
|
||||||
cmd.op = AE_ASYNC_OP::PostCppFunction;
|
cmd.op = AE_ASYNC_OP::PostCppFunction;
|
||||||
cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn);
|
cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn);
|
||||||
cmd.pctl = nullptr;
|
|
||||||
cmd.fLock = fLock;
|
cmd.fLock = fLock;
|
||||||
|
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
|
@ -5045,7 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
|||||||
zfree(ctx);
|
zfree(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
__thread bool g_fModuleThread = false;
|
thread_local bool g_fModuleThread = false;
|
||||||
/* Acquire the server lock before executing a thread safe API call.
|
/* Acquire the server lock before executing a thread safe API call.
|
||||||
* This is not needed for `RedisModule_Reply*` calls when there is
|
* This is not needed for `RedisModule_Reply*` calls when there is
|
||||||
* a blocked client connected to the thread safe context. */
|
* a blocked client connected to the thread safe context. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user