diskstore cache bug fixing
This commit is contained in:
parent
273e9b905e
commit
aa694aae07
1
src/db.c
1
src/db.c
@ -21,6 +21,7 @@ robj *lookupKey(redisDb *db, robj *key) {
|
|||||||
/* FIXME: change this code to just wait for our object to
|
/* FIXME: change this code to just wait for our object to
|
||||||
* get out of the IO Job. */
|
* get out of the IO Job. */
|
||||||
waitEmptyIOJobsQueue();
|
waitEmptyIOJobsQueue();
|
||||||
|
processAllPendingIOJobs();
|
||||||
redisAssert(val->storage != REDIS_DS_SAVING);
|
redisAssert(val->storage != REDIS_DS_SAVING);
|
||||||
}
|
}
|
||||||
server.stat_keyspace_hits++;
|
server.stat_keyspace_hits++;
|
||||||
|
@ -404,16 +404,14 @@ void spawnIOThread(void) {
|
|||||||
server.io_active_threads++;
|
server.io_active_threads++;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We need to wait for the last thread to exit before we are able to
|
/* Wait that all the pending IO Jobs are processed */
|
||||||
* fork() in order to BGSAVE or BGREWRITEAOF. */
|
|
||||||
void waitEmptyIOJobsQueue(void) {
|
void waitEmptyIOJobsQueue(void) {
|
||||||
while(1) {
|
while(1) {
|
||||||
int io_processed_len;
|
int io_processed_len;
|
||||||
|
|
||||||
lockThreadedIO();
|
lockThreadedIO();
|
||||||
if (listLength(server.io_newjobs) == 0 &&
|
if (listLength(server.io_newjobs) == 0 &&
|
||||||
listLength(server.io_processing) == 0 &&
|
listLength(server.io_processing) == 0)
|
||||||
server.io_active_threads == 0)
|
|
||||||
{
|
{
|
||||||
unlockThreadedIO();
|
unlockThreadedIO();
|
||||||
return;
|
return;
|
||||||
@ -434,6 +432,21 @@ void waitEmptyIOJobsQueue(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Process all the IO Jobs already completed by threads but still waiting
|
||||||
|
* processing from the main thread. */
|
||||||
|
void processAllPendingIOJobs(void) {
|
||||||
|
while(1) {
|
||||||
|
int io_processed_len;
|
||||||
|
|
||||||
|
lockThreadedIO();
|
||||||
|
io_processed_len = listLength(server.io_processed);
|
||||||
|
unlockThreadedIO();
|
||||||
|
if (io_processed_len == 0) return;
|
||||||
|
vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
|
||||||
|
(void*)0xdeadbeef,0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* This function must be called while with threaded IO locked */
|
/* This function must be called while with threaded IO locked */
|
||||||
void queueIOJob(iojob *j) {
|
void queueIOJob(iojob *j) {
|
||||||
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
|
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
|
||||||
|
@ -799,6 +799,7 @@ void unlockThreadedIO(void);
|
|||||||
void freeIOJob(iojob *j);
|
void freeIOJob(iojob *j);
|
||||||
void queueIOJob(iojob *j);
|
void queueIOJob(iojob *j);
|
||||||
void waitEmptyIOJobsQueue(void);
|
void waitEmptyIOJobsQueue(void);
|
||||||
|
void processAllPendingIOJobs(void);
|
||||||
void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
|
void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
|
||||||
void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
|
void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
|
||||||
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
|
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user