initial changes needed to turn the current VM code into a cache system. Tons of work to do still.

This commit is contained in:
antirez 2010-12-28 15:20:20 +01:00
parent 33388d4304
commit 697af434fb
4 changed files with 43 additions and 75 deletions

View File

@ -16,8 +16,17 @@
* directory will contain in the average 15258 entires, that is ok with * directory will contain in the average 15258 entires, that is ok with
* most filesystems implementation. * most filesystems implementation.
* *
* The actaul implementation of this disk store is highly related to the * Note that since Redis supports multiple databases, the actual key name
* filesystem implementation. This implementation may be replaced by * is:
*
* /0b/ee/<dbid>_0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
*
* so for instance if the key is inside DB 0:
*
* /0b/ee/0_0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
*
* The actaul implementation of this disk store is highly dependant to the
* filesystem implementation itself. This implementation may be replaced by
* a B+TREE implementation in future implementations. * a B+TREE implementation in future implementations.
* *
* Data ok every key is serialized using the same format used for .rdb * Data ok every key is serialized using the same format used for .rdb
@ -68,7 +77,7 @@
int dsOpen(void) { int dsOpen(void) {
struct stat sb; struct stat sb;
int retval; int retval;
char *path = server.diskstore_path; char *path = server.ds_path;
if ((retval = stat(path,&sb) == -1) && errno != ENOENT) { if ((retval = stat(path,&sb) == -1) && errno != ENOENT) {
redisLog(REDIS_WARNING, "Error opening disk store at %s: %s", redisLog(REDIS_WARNING, "Error opening disk store at %s: %s",

View File

@ -167,7 +167,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
void addReply(redisClient *c, robj *obj) { void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return; if (_installWriteEvent(c) != REDIS_OK) return;
redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY); redisAssert(!server.ds_enabled || obj->storage == REDIS_VM_MEMORY);
/* This is an important place where we can avoid copy-on-write /* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the * when there is a saving child running, avoiding touching the
@ -460,7 +460,7 @@ void freeClient(redisClient *c) {
/* Remove from the list of clients waiting for swapped keys, or ready /* Remove from the list of clients waiting for swapped keys, or ready
* to be restarted, but not yet woken up again. */ * to be restarted, but not yet woken up again. */
if (c->flags & REDIS_IO_WAIT) { if (c->flags & REDIS_IO_WAIT) {
redisAssert(server.vm_enabled); redisAssert(server.ds_enabled);
if (listLength(c->io_keys) == 0) { if (listLength(c->io_keys) == 0) {
ln = listSearchKey(server.io_ready_clients,c); ln = listSearchKey(server.io_ready_clients,c);
@ -474,7 +474,7 @@ void freeClient(redisClient *c) {
dontWaitForSwappedKey(c,ln->value); dontWaitForSwappedKey(c,ln->value);
} }
} }
server.vm_blocked_clients--; server.cache_blocked_clients--;
} }
listRelease(c->io_keys); listRelease(c->io_keys);
/* Master/slave cleanup. /* Master/slave cleanup.

View File

@ -618,27 +618,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* in order to guarantee a strict consistency. */ * in order to guarantee a strict consistency. */
if (server.masterhost == NULL) activeExpireCycle(); if (server.masterhost == NULL) activeExpireCycle();
/* Swap a few keys on disk if we are over the memory limit and VM /* Remove a few cached objects from memory if we are over the
* is enbled. Try to free objects from the free list first. */ * configured memory limit */
if (vmCanSwapOut()) { while (server.ds_enabled && zmalloc_used_memory() >
while (server.vm_enabled && zmalloc_used_memory() > server.cache_max_memory)
server.vm_max_memory) {
{ cacheFreeOneEntry();
int retval = (server.vm_max_threads == 0) ?
vmSwapOneObjectBlocking() :
vmSwapOneObjectThreaded();
if (retval == REDIS_ERR && !(loops % 300) &&
zmalloc_used_memory() >
(server.vm_max_memory+server.vm_max_memory/10))
{
redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
}
/* Note that when using threade I/O we free just one object,
* because anyway when the I/O thread in charge to swap this
* object out will finish, the handler of completed jobs
* will try to swap more objects if we are still out of memory. */
if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
}
} }
/* Replication cron function -- used to reconnect to master and /* Replication cron function -- used to reconnect to master and
@ -656,8 +641,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
listNode *ln; listNode *ln;
redisClient *c; redisClient *c;
/* Awake clients that got all the swapped keys they requested */ /* Awake clients that got all the on disk keys they requested */
if (server.vm_enabled && listLength(server.io_ready_clients)) { if (server.ds_enabled && listLength(server.io_ready_clients)) {
listIter li; listIter li;
listRewind(server.io_ready_clients,&li); listRewind(server.io_ready_clients,&li);
@ -668,7 +653,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Resume the client. */ /* Resume the client. */
listDelNode(server.io_ready_clients,ln); listDelNode(server.io_ready_clients,ln);
c->flags &= (~REDIS_IO_WAIT); c->flags &= (~REDIS_IO_WAIT);
server.vm_blocked_clients--; server.cache_blocked_clients--;
aeCreateFileEvent(server.el, c->fd, AE_READABLE, aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c); readQueryFromClient, c);
cmd = lookupCommand(c->argv[0]->ptr); cmd = lookupCommand(c->argv[0]->ptr);
@ -787,13 +772,10 @@ void initServerConfig() {
server.maxmemory = 0; server.maxmemory = 0;
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
server.maxmemory_samples = 3; server.maxmemory_samples = 3;
server.vm_enabled = 0; server.ds_enabled = 0;
server.vm_swap_file = zstrdup("/tmp/redis-%p.vm"); server.ds_path = zstrdup("/tmp/redis.ds");
server.vm_page_size = 256; /* 256 bytes per page */ server.cache_max_memory = 64LL*1024*1024; /* 64 MB of RAM */
server.vm_pages = 1024*1024*100; /* 104 millions of pages */ server.cache_blocked_clients = 0;
server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
server.vm_max_threads = 4;
server.vm_blocked_clients = 0;
server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
@ -873,7 +855,7 @@ void initServer() {
server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
if (server.vm_enabled) if (server.ds_enabled)
server.db[j].io_keys = dictCreate(&keylistDictType,NULL); server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j; server.db[j].id = j;
} }
@ -911,7 +893,7 @@ void initServer() {
} }
} }
if (server.vm_enabled) vmInit(); if (server.ds_enabled) dsInit();
} }
/* Populates the Redis Command Table starting from the hard coded list /* Populates the Redis Command Table starting from the hard coded list
@ -1050,8 +1032,8 @@ int processCommand(redisClient *c) {
queueMultiCommand(c,cmd); queueMultiCommand(c,cmd);
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
if (server.vm_enabled && server.vm_max_threads > 0 && if (server.ds_enabled && blockClientOnSwappedKeys(c,cmd))
blockClientOnSwappedKeys(c,cmd)) return REDIS_ERR; return REDIS_ERR;
call(c,cmd); call(c,cmd);
} }
return REDIS_OK; return REDIS_OK;
@ -1072,7 +1054,6 @@ int prepareForShutdown() {
if (server.appendonly) { if (server.appendonly) {
/* Append only file: fsync() the AOF and exit */ /* Append only file: fsync() the AOF and exit */
aof_fsync(server.appendfd); aof_fsync(server.appendfd);
if (server.vm_enabled) unlink(server.vm_swap_file);
} else if (server.saveparamslen > 0) { } else if (server.saveparamslen > 0) {
/* Snapshotting. Perform a SYNC SAVE and exit */ /* Snapshotting. Perform a SYNC SAVE and exit */
if (rdbSave(server.dbfilename) != REDIS_OK) { if (rdbSave(server.dbfilename) != REDIS_OK) {
@ -1185,7 +1166,7 @@ sds genRedisInfoString(void) {
"hash_max_zipmap_value:%zu\r\n" "hash_max_zipmap_value:%zu\r\n"
"pubsub_channels:%ld\r\n" "pubsub_channels:%ld\r\n"
"pubsub_patterns:%u\r\n" "pubsub_patterns:%u\r\n"
"vm_enabled:%d\r\n" "ds_enabled:%d\r\n"
"role:%s\r\n" "role:%s\r\n"
,REDIS_VERSION, ,REDIS_VERSION,
redisGitSHA1(), redisGitSHA1(),
@ -1228,7 +1209,7 @@ sds genRedisInfoString(void) {
server.hash_max_zipmap_value, server.hash_max_zipmap_value,
dictSize(server.pubsub_channels), dictSize(server.pubsub_channels),
listLength(server.pubsub_patterns), listLength(server.pubsub_patterns),
server.vm_enabled != 0, server.ds_enabled != 0,
server.masterhost == NULL ? "master" : "slave" server.masterhost == NULL ? "master" : "slave"
); );
if (server.masterhost) { if (server.masterhost) {
@ -1255,33 +1236,13 @@ sds genRedisInfoString(void) {
); );
} }
} }
if (server.vm_enabled) { if (server.ds_enabled) {
lockThreadedIO(); lockThreadedIO();
info = sdscatprintf(info, info = sdscatprintf(info,
"vm_conf_max_memory:%llu\r\n" "cache_max_memory:%llu\r\n"
"vm_conf_page_size:%llu\r\n" "cache_blocked_clients:%lu\r\n"
"vm_conf_pages:%llu\r\n" ,(unsigned long long) server.cache_max_memory,
"vm_stats_used_pages:%llu\r\n" (unsigned long) server.cache_blocked_clients
"vm_stats_swapped_objects:%llu\r\n"
"vm_stats_swappin_count:%llu\r\n"
"vm_stats_swappout_count:%llu\r\n"
"vm_stats_io_newjobs_len:%lu\r\n"
"vm_stats_io_processing_len:%lu\r\n"
"vm_stats_io_processed_len:%lu\r\n"
"vm_stats_io_active_threads:%lu\r\n"
"vm_stats_blocked_clients:%lu\r\n"
,(unsigned long long) server.vm_max_memory,
(unsigned long long) server.vm_page_size,
(unsigned long long) server.vm_pages,
(unsigned long long) server.vm_stats_used_pages,
(unsigned long long) server.vm_stats_swapped_objects,
(unsigned long long) server.vm_stats_swapins,
(unsigned long long) server.vm_stats_swapouts,
(unsigned long) listLength(server.io_newjobs),
(unsigned long) listLength(server.io_processing),
(unsigned long) listLength(server.io_processed),
(unsigned long) server.io_active_threads,
(unsigned long) server.vm_blocked_clients
); );
unlockThreadedIO(); unlockThreadedIO();
} }

View File

@ -440,7 +440,7 @@ struct redisServer {
int maxmemory_samples; int maxmemory_samples;
/* Blocked clients */ /* Blocked clients */
unsigned int bpop_blocked_clients; unsigned int bpop_blocked_clients;
unsigned int vm_blocked_clients; unsigned int cache_blocked_clients;
list *unblocked_clients; list *unblocked_clients;
/* Sort parameters - qsort_r() is only available under BSD so we /* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */ * have to take this state global, in order to pass it to sortCompare() */
@ -448,11 +448,9 @@ struct redisServer {
int sort_alpha; int sort_alpha;
int sort_bypattern; int sort_bypattern;
/* Virtual memory configuration */ /* Virtual memory configuration */
int vm_enabled; int ds_enabled; /* backend disk in redis.conf */
char *vm_swap_file; char *ds_path; /* location of the disk store on disk */
off_t vm_page_size; unsigned long long cache_max_memory;
off_t vm_pages;
unsigned long long vm_max_memory;
/* Zip structure config */ /* Zip structure config */
size_t hash_max_zipmap_entries; size_t hash_max_zipmap_entries;
size_t hash_max_zipmap_value; size_t hash_max_zipmap_value;