aof migration
This commit is contained in:
parent
083f1f1ba1
commit
8d75b20617
@ -1,11 +1,16 @@
|
|||||||
package controller
|
package controller
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/tidwall/resp"
|
||||||
|
"github.com/tidwall/tile38/controller/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errCorruptedAOF = errors.New("corrupted aof file")
|
var errCorruptedAOF = errors.New("corrupted aof file")
|
||||||
@ -90,7 +95,7 @@ func (c *Controller) migrateAOF() error {
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
log.Warn("Migrating aof to new format")
|
||||||
newf, err := os.Create(path.Join(c.dir, "migrate.aof"))
|
newf, err := os.Create(path.Join(c.dir, "migrate.aof"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -102,18 +107,53 @@ func (c *Controller) migrateAOF() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer oldf.Close()
|
defer oldf.Close()
|
||||||
|
start := time.Now()
|
||||||
rd := NewLegacyAOFReader(newf)
|
count := 0
|
||||||
|
wr := bufio.NewWriter(newf)
|
||||||
|
rd := NewLegacyAOFReader(oldf)
|
||||||
for {
|
for {
|
||||||
cmd, err := rd.ReadCommand()
|
cmdb, err := rd.ReadCommand()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
println(string(cmd))
|
line := string(cmdb)
|
||||||
|
var tok string
|
||||||
|
values := make([]resp.Value, 0, 64)
|
||||||
|
for line != "" {
|
||||||
|
line, tok = token(line)
|
||||||
|
if len(tok) > 0 && tok[0] == '{' {
|
||||||
|
if line != "" {
|
||||||
|
tok = tok + " " + line
|
||||||
|
line = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
values = append(values, resp.StringValue(tok))
|
||||||
|
}
|
||||||
|
data, err := resp.ArrayValue(values).MarshalRESP()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := wr.Write(data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if wr.Buffered() > 1024*1024 {
|
||||||
|
if err := wr.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
if err := wr.Flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
oldf.Close()
|
||||||
|
newf.Close()
|
||||||
|
log.Debugf("%d items: %.0f/sec", count, float64(count)/(float64(time.Now().Sub(start))/float64(time.Second)))
|
||||||
|
if err := os.Rename(path.Join(c.dir, "migrate.aof"), path.Join(c.dir, "appendonly.aof")); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return errors.New("unsupported")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user