Chunked loading of RDB to prevent redis from stalling reading very large keys.

This commit is contained in:
yoav 2012-12-12 15:59:22 +02:00 committed by antirez
parent 9d520a7f70
commit 63d15dfc87
5 changed files with 45 additions and 15 deletions

View File

@ -1057,21 +1057,32 @@ void stopLoading(void) {
server.loading = 0; server.loading = 0;
} }
/* Track loading progress in order to serve client's from time to time
and if needed calculate rdb checksum */
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.rdb_checksum)
rioGenericUpdateChecksum(r, buf, len);
if (server.loading_process_events_interval_bytes &&
(r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes) {
loadingProgress(r->processed_bytes);
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}
}
int rdbLoad(char *filename) { int rdbLoad(char *filename) {
uint32_t dbid; uint32_t dbid;
int type, rdbver; int type, rdbver;
redisDb *db = server.db+0; redisDb *db = server.db+0;
char buf[1024]; char buf[1024];
long long expiretime, now = mstime(); long long expiretime, now = mstime();
long loops = 0;
FILE *fp; FILE *fp;
rio rdb; rio rdb;
if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR; if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;
rioInitWithFile(&rdb,fp); rioInitWithFile(&rdb,fp);
if (server.rdb_checksum) rdb.update_cksum = rdbLoadProgressCallback;
rdb.update_cksum = rioGenericUpdateChecksum; rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(&rdb,buf,9) == 0) goto eoferr; if (rioRead(&rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0'; buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) { if (memcmp(buf,"REDIS",5) != 0) {
@ -1093,12 +1104,6 @@ int rdbLoad(char *filename) {
robj *key, *val; robj *key, *val;
expiretime = -1; expiretime = -1;
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(rioTell(&rdb));
aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
}
/* Read type. */ /* Read type. */
if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
if (type == REDIS_RDB_OPCODE_EXPIRETIME) { if (type == REDIS_RDB_OPCODE_EXPIRETIME) {

View File

@ -1290,6 +1290,7 @@ void initServerConfig() {
server.lua_client = NULL; server.lua_client = NULL;
server.lua_timedout = 0; server.lua_timedout = 0;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.loading_process_events_interval_bytes = (1024*1024*2);
updateLRUClock(); updateLRUClock();
resetServerSaveParams(); resetServerSaveParams();

View File

@ -753,6 +753,7 @@ struct redisServer {
off_t loading_total_bytes; off_t loading_total_bytes;
off_t loading_loaded_bytes; off_t loading_loaded_bytes;
time_t loading_start_time; time_t loading_start_time;
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */ /* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand, struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand; *rpopCommand;

View File

@ -108,6 +108,8 @@ static const rio rioBufferIO = {
rioBufferTell, rioBufferTell,
NULL, /* update_checksum */ NULL, /* update_checksum */
0, /* current checksum */ 0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */ { { NULL, 0 } } /* union for io-specific vars */
}; };
@ -117,6 +119,8 @@ static const rio rioFileIO = {
rioFileTell, rioFileTell,
NULL, /* update_checksum */ NULL, /* update_checksum */
0, /* current checksum */ 0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */ { { NULL, 0 } } /* union for io-specific vars */
}; };

View File

@ -53,6 +53,12 @@ struct _rio {
/* The current checksum */ /* The current checksum */
uint64_t cksum; uint64_t cksum;
/* number of bytes read or written */
size_t processed_bytes;
/* maximum simgle read or write chunk size */
size_t max_processing_chunk;
/* Backend-specific vars. */ /* Backend-specific vars. */
union { union {
struct { struct {
@ -74,16 +80,29 @@ typedef struct _rio rio;
* if needed. */ * if needed. */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) { static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->update_cksum) r->update_cksum(r,buf,len); while (len) {
return r->write(r,buf,len); size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
if (r->write(r,buf,bytes_to_write) == 0)
return 0;
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
r->processed_bytes += bytes_to_write;
}
return 1;
} }
static inline size_t rioRead(rio *r, void *buf, size_t len) { static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->read(r,buf,len) == 1) { while (len) {
if (r->update_cksum) r->update_cksum(r,buf,len); size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
return 1; if (r->read(r,buf,bytes_to_read) == 0)
return 0;
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
r->processed_bytes += bytes_to_read;
} }
return 0; return 1;
} }
static inline off_t rioTell(rio *r) { static inline off_t rioTell(rio *r) {