a lot of code reworked/removed to implement object caching

This commit is contained in:
antirez 2010-12-28 18:06:40 +01:00
parent 697af434fb
commit 16d778780e
8 changed files with 75 additions and 174 deletions

View File

@ -242,7 +242,6 @@ int loadAppendOnlyFile(char *filename) {
char buf[128];
sds argsds;
struct redisCommand *cmd;
int force_swapout;
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
@ -286,17 +285,6 @@ int loadAppendOnlyFile(char *filename) {
/* Clean up, ready for the next command */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
/* Handle swapping while loading big datasets when VM is on */
force_swapout = 0;
if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
force_swapout = 1;
if (server.vm_enabled && force_swapout) {
while (zmalloc_used_memory() > server.vm_max_memory) {
if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
}
}
}
/* This point can only be reached when EOF is reached without errors.
@ -359,22 +347,11 @@ int rewriteAppendOnlyFile(char *filename) {
sds keystr = dictGetEntryKey(de);
robj key, *o;
time_t expiretime;
int swapped;
keystr = dictGetEntryKey(de);
o = dictGetEntryVal(de);
initStaticStringObject(key,keystr);
/* If the value for this key is swapped, load a preview in memory.
* We use a "swapped" flag to remember if we need to free the
* value object instead to just increment the ref count anyway
* in order to avoid copy-on-write of pages if we are forked() */
if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
o->storage == REDIS_VM_SWAPPING) {
swapped = 0;
} else {
o = vmPreviewObject(o);
swapped = 1;
}
expiretime = getExpire(db,&key);
/* Save the key and associated value */
@ -509,7 +486,6 @@ int rewriteAppendOnlyFile(char *filename) {
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
}
if (swapped) decrRefCount(o);
}
dictReleaseIterator(di);
}
@ -553,12 +529,11 @@ int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
if (server.bgrewritechildpid != -1) return REDIS_ERR;
if (server.vm_enabled) waitEmptyIOJobsQueue();
redisAssert(server.ds_enabled == 0);
if ((childpid = fork()) == 0) {
/* Child */
char tmpfile[256];
if (server.vm_enabled) vmReopenSwapFile();
if (server.ipfd > 0) close(server.ipfd);
if (server.sofd > 0) close(server.sofd);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());

View File

@ -241,21 +241,15 @@ void loadServerConfig(char *filename) {
} else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
zfree(server.dbfilename);
server.dbfilename = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
} else if (!strcasecmp(argv[0],"diskstore-enabled") && argc == 2) {
if ((server.ds_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"vm-swap-file") && argc == 2) {
zfree(server.vm_swap_file);
server.vm_swap_file = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) {
server.vm_max_memory = memtoll(argv[1],NULL);
} else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) {
server.vm_page_size = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
server.vm_pages = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
server.vm_max_threads = strtoll(argv[1], NULL, 10);
} else if (!strcasecmp(argv[0],"diskstore-path") && argc == 2) {
zfree(server.ds_path);
server.ds_path = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"cache-max-memory") && argc == 2) {
server.cache_max_memory = memtoll(argv[1],NULL);
} else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) {
server.hash_max_zipmap_entries = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2) {

View File

@ -17,29 +17,17 @@ robj *lookupKey(redisDb *db, robj *key) {
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
val->lru = server.lruclock;
if (server.vm_enabled) {
if (val->storage == REDIS_VM_MEMORY ||
val->storage == REDIS_VM_SWAPPING)
{
/* If we were swapping the object out, cancel the operation */
if (val->storage == REDIS_VM_SWAPPING)
vmCancelThreadedIOJob(val);
} else {
int notify = (val->storage == REDIS_VM_LOADING);
/* Our value was swapped on disk. Bring it at home. */
redisAssert(val->type == REDIS_VMPOINTER);
val = vmLoadObject(val);
dictGetEntryVal(de) = val;
/* Clients blocked by the VM subsystem may be waiting for
* this key... */
if (notify) handleClientsBlockedOnSwappedKey(db,key);
}
if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
/* FIXME: change this code to just wait for our object to
* get out of the IO Job. */
waitEmptyIOJobsQueue();
redisAssert(val->storage != REDIS_DS_SAVING);
}
server.stat_keyspace_hits++;
return val;
} else {
/* FIXME: Check if the object is on disk, if it is, load it
* in a blocking way now. */
server.stat_keyspace_misses++;
return NULL;
}
@ -133,7 +121,11 @@ int dbDelete(redisDb *db, robj *key) {
* deleting the key will kill the I/O thread bringing the key from swap
* to memory, so the client will never be notified and unblocked if we
* don't do it now. */
if (server.vm_enabled) handleClientsBlockedOnSwappedKey(db,key);
if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
/* FIXME: we need to delete the IO Job loading the key, or simply we can
* wait for it to finish. */
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

View File

@ -26,26 +26,17 @@
*
* - cron() checks if there are elements on this list. When there are things
* to flush, we create an IO Job for the I/O thread.
* FIXME: how to mark this key as "busy"? With VM we used to change the
* object->storage field, but this time we need this to work with every
* kind of object, including shared ones. One possibility is just killing
* object sharing at all. So let's assume this will be our solution.
*
* So we set keys that are in the process of being saved as
* object->storage = REDIS_STORAGE_SAVING;
* NOTE: We disalbe object sharing when server.ds_enabled == 1 so objects
* that are referenced an IO job for flushing on disk are marked as
* o->storage == REDIS_DS_SAVING.
*
* - This is what we do on key lookup:
* 1) The key already exists in memory. object->storage == REDIS_DS_MEMORY.
* 1) The key already exists in memory. object->storage == REDIS_DS_MEMORY
* or it is object->storage == REDIS_DS_DIRTY:
* We don't do nothing special, lookup, return value object pointer.
* 2) The key is in memory but object->storage == REDIS_DS_SAVING.
* This is an explicit lookup so we have to abort the saving operation.
* We kill the IO Job, set the storage to == REDIS_DB_MEMORY but
* re-queue the object in the server.ds_cache_dirty list.
*
* Btw here we need some protection against the problem of continuously
* writing against a value having the effect of this value to be never
* saved on disk. That is, at some point we need to block and write it
* if there is too much delay.
* When this happens we block waiting for the I/O thread to process
* this object. Then continue.
* 3) The key is not in memory. We block to load the key from disk.
* Of course the key may not be present at all on the disk store as well,
* in such case we just detect this condition and continue, returning
@ -56,20 +47,43 @@
* keys a client is going to use. We block the client, load keys
* using the I/O thread, unblock the client. Same code as VM more or less.
*
* - Transfering keys from memory to disk.
* Again while in cron() we detect our memory limit was reached. What we
* do is transfering random keys that are not set as dirty on disk, using
* LRU to select the key.
* - Reclaiming memory.
* In cron() we detect our memory limit was reached. What we
* do is deleting keys that are REDIS_DS_MEMORY, using LRU.
*
* If this is not enough to return again under the memory limits we also
* start to flush keys that need to be synched on disk synchronously,
* removing it from the memory.
* removing it from the memory. We do this blocking as memory limit is a
* much "harder" barrirer in the new design.
*
* - IO thread operations are no longer stopped for sync loading/saving of
* things. When a key is found to be in the process of being saved or
* loaded we simply wait for the IO thread to end its work.
* things. When a key is found to be in the process of being saved
* we simply wait for the IO thread to end its work.
*
* Otherwise if there is to load a key without any IO thread operation
* just started it is blocking-loaded in the lookup function.
*
* - What happens when an object is destroyed?
*
* If o->storage == REDIS_DS_MEMORY then we simply destory the object.
* If o->storage == REDIS_DS_DIRTY we can still remove the object. It had
* changes not flushed on disk, but is being removed so
* who cares.
* if o->storage == REDIS_DS_SAVING then the object is being saved so
* it is impossible that its refcount == 1, must be at
* least two. When the object is saved the storage will
* be set back to DS_MEMORY.
*
* - What happens when keys are deleted?
*
* We simply schedule a key flush operation as usually, but when the
* IO thread will be created the object pointer will be set to NULL
* so the IO thread will know that the work to do is to delete the key
* from the disk store.
*
* - What happens with MULTI/EXEC?
*
* Good question.
*/
/* Virtual Memory is composed mainly of two subsystems:

View File

@ -167,7 +167,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;
redisAssert(!server.ds_enabled || obj->storage == REDIS_VM_MEMORY);
redisAssert(!server.ds_enabled || obj->storage == REDIS_DS_MEMORY);
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the

View File

@ -21,7 +21,7 @@ robj *createObject(int type, void *ptr) {
/* The following is only needed if VM is active, but since the conditional
* is probably more costly than initializing the field it's better to
* have every field properly initialized anyway. */
o->storage = REDIS_VM_MEMORY;
o->storage = REDIS_DS_MEMORY;
return o;
}
@ -160,31 +160,11 @@ void incrRefCount(robj *o) {
void decrRefCount(void *obj) {
robj *o = obj;
/* Object is a swapped out value, or in the process of being loaded. */
if (server.vm_enabled &&
(o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
{
vmpointer *vp = obj;
if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o);
vmMarkPagesFree(vp->page,vp->usedpages);
server.vm_stats_swapped_objects--;
zfree(vp);
return;
}
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
/* Object is in memory, or in the process of being swapped out.
*
* If the object is being swapped out, abort the operation on
* decrRefCount even if the refcount does not drop to 0: the object
* is referenced at least two times, as value of the key AND as
* job->val in the iojob. So if we don't invalidate the iojob, when it is
* done but the relevant key was removed in the meantime, the
* complete jobs handler will not find the key about the job and the
* assert will fail. */
if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING)
vmCancelThreadedIOJob(o);
if (--(o->refcount) == 0) {
/* DS_SAVING objects should always have a reference in the
* IO Job structure. So we should never reach this state. */
redisAssert(o->storage != REDIS_DS_SAVING);
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;

View File

@ -395,12 +395,6 @@ off_t rdbSavedObjectLen(robj *o) {
return len;
}
/* Return the number of pages required to save this object in the swap file */
off_t rdbSavedObjectPages(robj *o) {
off_t bytes = rdbSavedObjectLen(o);
return (bytes+(server.vm_page_size-1))/server.vm_page_size;
}
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
int rdbSave(char *filename) {
dictIterator *di = NULL;
@ -410,11 +404,8 @@ int rdbSave(char *filename) {
int j;
time_t now = time(NULL);
/* Wait for I/O therads to terminate, just in case this is a
* foreground-saving, to avoid seeking the swap file descriptor at the
* same time. */
if (server.vm_enabled)
waitEmptyIOJobsQueue();
/* FIXME: implement .rdb save for disk store properly */
redisAssert(server.ds_enabled == 0);
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
@ -453,26 +444,10 @@ int rdbSave(char *filename) {
if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
if (rdbSaveTime(fp,expiretime) == -1) goto werr;
}
/* Save the key and associated value. This requires special
* handling if the value is swapped out. */
if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
o->storage == REDIS_VM_SWAPPING) {
/* Save type, key, value */
if (rdbSaveType(fp,o->type) == -1) goto werr;
if (rdbSaveStringObject(fp,&key) == -1) goto werr;
if (rdbSaveObject(fp,o) == -1) goto werr;
} else {
/* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
robj *po;
/* Get a preview of the object in memory */
po = vmPreviewObject(o);
/* Save type, key, value */
if (rdbSaveType(fp,po->type) == -1) goto werr;
if (rdbSaveStringObject(fp,&key) == -1) goto werr;
if (rdbSaveObject(fp,po) == -1) goto werr;
/* Remove the loaded object from memory */
decrRefCount(po);
}
/* Save type, key, value */
if (rdbSaveType(fp,o->type) == -1) goto werr;
if (rdbSaveStringObject(fp,&key) == -1) goto werr;
if (rdbSaveObject(fp,o) == -1) goto werr;
}
dictReleaseIterator(di);
}
@ -508,11 +483,10 @@ int rdbSaveBackground(char *filename) {
pid_t childpid;
if (server.bgsavechildpid != -1) return REDIS_ERR;
if (server.vm_enabled) waitEmptyIOJobsQueue();
redisAssert(server.ds_enabled == 0);
server.dirty_before_bgsave = server.dirty;
if ((childpid = fork()) == 0) {
/* Child */
if (server.vm_enabled) vmReopenSwapFile();
if (server.ipfd > 0) close(server.ipfd);
if (server.sofd > 0) close(server.sofd);
if (rdbSave(filename) == REDIS_OK) {
@ -899,8 +873,6 @@ int rdbLoad(char *filename) {
startLoading(fp);
while(1) {
robj *key, *val;
int force_swapout;
expiretime = -1;
/* Serve the clients from time to time */
@ -970,21 +942,6 @@ int rdbLoad(char *filename) {
continue;
}
decrRefCount(key);
/* Flush data on disk once 32 MB of additional RAM are used... */
force_swapout = 0;
if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
force_swapout = 1;
/* If we have still some hope of having some value fitting memory
* then we try random sampling. */
if (!swap_all_values && server.vm_enabled && force_swapout) {
while (zmalloc_used_memory() > server.vm_max_memory) {
if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
}
if (zmalloc_used_memory() > server.vm_max_memory)
swap_all_values = 1; /* We are already using too much mem */
}
}
fclose(fp);
stopLoading();

View File

@ -119,22 +119,11 @@
#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
/* Virtual memory object->where field. */
#define REDIS_VM_MEMORY 0 /* The object is on memory */
#define REDIS_VM_SWAPPED 1 /* The object is on disk */
#define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
#define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
/* Disk store cache object->storage values */
#define REDIS_DS_MEMORY 0 /* The object is on memory */
#define REDIS_DS_DIRTY 1 /* The object was modified */
#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */
/* Virtual memory static configuration stuff.
* Check vmFindContiguousPages() to know more about this magic numbers. */
#define REDIS_VM_MAX_NEAR_PAGES 65536
#define REDIS_VM_MAX_RANDOM_JUMP 4096
#define REDIS_VM_MAX_THREADS 32
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
/* The following is the *percentage* of completed I/O jobs to process when the
* handelr is called. While Virtual Memory I/O operations are performed by
* threads, this operations must be processed by the main thread when completed
* in order to take effect. */
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
/* Client flags */
@ -271,7 +260,7 @@ typedef struct vmPointer {
_var.type = REDIS_STRING; \
_var.encoding = REDIS_ENCODING_RAW; \
_var.ptr = _ptr; \
_var.storage = REDIS_VM_MEMORY; \
_var.storage = REDIS_DS_MEMORY; \
} while(0);
typedef struct redisDb {