major bug and a dead lock fixed

This commit is contained in:
antirez 2010-12-31 14:30:24 +01:00
parent 81b453e33f
commit e70f346651
2 changed files with 19 additions and 7 deletions

View File

@ -97,11 +97,14 @@ int dbAdd(redisDb *db, robj *key, robj *val) {
* *
* On update (key already existed) 0 is returned. Otherwise 1. */ * On update (key already existed) 0 is returned. Otherwise 1. */
int dbReplace(redisDb *db, robj *key, robj *val) { int dbReplace(redisDb *db, robj *key, robj *val) {
if (dictFind(db->dict,key->ptr) == NULL) { robj *oldval;
if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
sds copy = sdsdup(key->ptr); sds copy = sdsdup(key->ptr);
dictAdd(db->dict, copy, val); dictAdd(db->dict, copy, val);
return 1; return 1;
} else { } else {
val->storage = oldval->storage;
dictReplace(db->dict, key->ptr, val); dictReplace(db->dict, key->ptr, val);
return 0; return 0;
} }

View File

@ -356,12 +356,10 @@ void *IOThreadEntryPoint(void *arg) {
pthread_detach(pthread_self()); pthread_detach(pthread_self());
lockThreadedIO(); lockThreadedIO();
while(1) { while(1) {
/* Wait for more work to do */
pthread_cond_wait(&server.io_condvar,&server.io_mutex);
/* Get a new job to process */ /* Get a new job to process */
if (listLength(server.io_newjobs) == 0) { if (listLength(server.io_newjobs) == 0) {
/* No new jobs in queue, reiterate. */ /* Wait for more work to do */
unlockThreadedIO(); pthread_cond_wait(&server.io_condvar,&server.io_mutex);
continue; continue;
} }
ln = listFirst(server.io_newjobs); ln = listFirst(server.io_newjobs);
@ -439,6 +437,16 @@ void waitEmptyIOJobsQueue(void) {
unlockThreadedIO(); unlockThreadedIO();
return; return;
} }
/* If there are new jobs we need to signal the thread to
* process the next one. */
redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d",
listLength(server.io_newjobs),
listLength(server.io_processing));
/*
if (listLength(server.io_newjobs)) {
pthread_cond_signal(&server.io_condvar);
}
*/
/* While waiting for empty jobs queue condition we post-process some /* While waiting for empty jobs queue condition we post-process some
* finshed job, as I/O threads may be hanging trying to write against * finshed job, as I/O threads may be hanging trying to write against
* the io_ready_pipe_write FD but there are so much pending jobs that * the io_ready_pipe_write FD but there are so much pending jobs that
@ -509,7 +517,8 @@ void cacheScheduleForFlush(redisDb *db, robj *key) {
val->storage = REDIS_DS_DIRTY; val->storage = REDIS_DS_DIRTY;
} }
redisLog(REDIS_DEBUG,"Scheduling key %s for saving",key->ptr); redisLog(REDIS_DEBUG,"Scheduling key %s for saving (%s)",key->ptr,
de ? "key exists" : "key does not exist");
dk = zmalloc(sizeof(*dk)); dk = zmalloc(sizeof(*dk));
dk->db = db; dk->db = db;
dk->key = key; dk->key = key;
@ -533,7 +542,7 @@ void cacheCron(void) {
redisLog(REDIS_DEBUG,"Creating IO Job to save key %s",dk->key->ptr); redisLog(REDIS_DEBUG,"Creating IO Job to save key %s",dk->key->ptr);
/* Lookup the key, in order to put the current value in the IO /* Lookup the key, in order to put the current value in the IO
* Job and mark ti as DS_SAVING. * Job and mark it as DS_SAVING.
* Otherwise if the key does not exists we schedule a disk store * Otherwise if the key does not exists we schedule a disk store
* delete operation, setting the value to NULL. */ * delete operation, setting the value to NULL. */
de = dictFind(dk->db->dict,dk->key->ptr); de = dictFind(dk->db->dict,dk->key->ptr);