
This addresses an issue #73 that @huangpeizhi discovered while using Tile38 in production where AOFSHRINK sometimes corrupts the database causing the server to not start the next time.
300 lines
8.0 KiB
Go
300 lines
8.0 KiB
Go
package controller
|
|
|
|
import (
|
|
"math"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/tidwall/tile38/controller/collection"
|
|
"github.com/tidwall/tile38/controller/log"
|
|
"github.com/tidwall/tile38/geojson"
|
|
)
|
|
|
|
const maxkeys = 8
|
|
const maxids = 32
|
|
const maxchunk = 4 * 1024 * 1024
|
|
|
|
func (c *Controller) aofshrink() {
|
|
start := time.Now()
|
|
c.mu.Lock()
|
|
if c.shrinking {
|
|
c.mu.Unlock()
|
|
return
|
|
}
|
|
c.shrinking = true
|
|
c.shrinklog = nil
|
|
c.mu.Unlock()
|
|
|
|
defer func() {
|
|
c.mu.Lock()
|
|
c.shrinking = false
|
|
c.shrinklog = nil
|
|
c.mu.Unlock()
|
|
log.Infof("aof shrink ended %v", time.Now().Sub(start))
|
|
return
|
|
}()
|
|
|
|
err := func() error {
|
|
f, err := os.Create(path.Join(c.dir, "shrink"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
var aofbuf []byte
|
|
var values []string
|
|
var keys []string
|
|
var nextkey string
|
|
var keysdone bool
|
|
for {
|
|
if len(keys) == 0 {
|
|
// load more keys
|
|
if keysdone {
|
|
break
|
|
}
|
|
keysdone = true
|
|
func() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.scanGreaterOrEqual(nextkey, func(key string, col *collection.Collection) bool {
|
|
if len(keys) == maxkeys {
|
|
keysdone = false
|
|
nextkey = key
|
|
return false
|
|
}
|
|
keys = append(keys, key)
|
|
return true
|
|
})
|
|
}()
|
|
continue
|
|
}
|
|
|
|
var idsdone bool
|
|
var nextid string
|
|
for {
|
|
if idsdone {
|
|
keys = keys[1:]
|
|
break
|
|
}
|
|
|
|
// load more objects
|
|
func() {
|
|
idsdone = true
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
col := c.getCol(keys[0])
|
|
if col == nil {
|
|
return
|
|
}
|
|
var fnames = col.FieldArr() // reload an array of field names to match each object
|
|
var exm = c.expires[keys[0]] // the expiration map
|
|
var now = time.Now() // used for expiration
|
|
var count = 0 // the object count
|
|
col.ScanGreaterOrEqual(nextid, 0, false,
|
|
func(id string, obj geojson.Object, fields []float64) bool {
|
|
if count == maxids {
|
|
// we reached the max number of ids for one batch
|
|
nextid = id
|
|
idsdone = false
|
|
return false
|
|
}
|
|
|
|
// here we fill the values array with a new command
|
|
values = values[:0]
|
|
values = append(values, "set")
|
|
values = append(values, keys[0])
|
|
values = append(values, id)
|
|
for i, fvalue := range fields {
|
|
if fvalue != 0 {
|
|
values = append(values, "field")
|
|
values = append(values, fnames[i])
|
|
values = append(values, strconv.FormatFloat(fvalue, 'f', -1, 64))
|
|
}
|
|
}
|
|
if exm != nil {
|
|
at, ok := exm[id]
|
|
if ok {
|
|
expires := at.Sub(now)
|
|
if expires > 0 {
|
|
values = append(values, "ex")
|
|
values = append(values, strconv.FormatFloat(math.Floor(float64(expires)/float64(time.Second)*10)/10, 'f', -1, 64))
|
|
}
|
|
}
|
|
}
|
|
switch obj := obj.(type) {
|
|
default:
|
|
if obj.IsGeometry() {
|
|
values = append(values, "object")
|
|
values = append(values, obj.JSON())
|
|
} else {
|
|
values = append(values, "string")
|
|
values = append(values, obj.String())
|
|
}
|
|
case geojson.SimplePoint:
|
|
values = append(values, "point")
|
|
values = append(values, strconv.FormatFloat(obj.Y, 'f', -1, 64))
|
|
values = append(values, strconv.FormatFloat(obj.X, 'f', -1, 64))
|
|
case geojson.Point:
|
|
if obj.Coordinates.Z == 0 {
|
|
values = append(values, "point")
|
|
values = append(values, strconv.FormatFloat(obj.Coordinates.Y, 'f', -1, 64))
|
|
values = append(values, strconv.FormatFloat(obj.Coordinates.X, 'f', -1, 64))
|
|
values = append(values, strconv.FormatFloat(obj.Coordinates.Z, 'f', -1, 64))
|
|
} else {
|
|
values = append(values, "point")
|
|
values = append(values, strconv.FormatFloat(obj.Coordinates.Y, 'f', -1, 64))
|
|
values = append(values, strconv.FormatFloat(obj.Coordinates.X, 'f', -1, 64))
|
|
}
|
|
}
|
|
|
|
// append the values to the aof buffer
|
|
aofbuf = append(aofbuf, '*')
|
|
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
for _, value := range values {
|
|
aofbuf = append(aofbuf, '$')
|
|
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
aofbuf = append(aofbuf, value...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
}
|
|
|
|
// increment the object count
|
|
count++
|
|
return true
|
|
},
|
|
)
|
|
|
|
}()
|
|
}
|
|
if len(aofbuf) > maxchunk {
|
|
if _, err := f.Write(aofbuf); err != nil {
|
|
return err
|
|
}
|
|
aofbuf = aofbuf[:0]
|
|
}
|
|
}
|
|
|
|
// load hooks
|
|
// first load the names of the hooks
|
|
var hnames []string
|
|
func() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
for name := range c.hooks {
|
|
hnames = append(hnames, name)
|
|
}
|
|
}()
|
|
// sort the names for consistency
|
|
sort.Strings(hnames)
|
|
for _, name := range hnames {
|
|
func() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
hook := c.hooks[name]
|
|
if hook == nil {
|
|
return
|
|
}
|
|
hook.mu.Lock()
|
|
defer hook.mu.Unlock()
|
|
|
|
var values []string
|
|
values = append(values, "sethook")
|
|
values = append(values, name)
|
|
values = append(values, strings.Join(hook.Endpoints, ","))
|
|
for _, value := range hook.Message.Values {
|
|
values = append(values, value.String())
|
|
}
|
|
|
|
// append the values to the aof buffer
|
|
aofbuf = append(aofbuf, '*')
|
|
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
for _, value := range values {
|
|
aofbuf = append(aofbuf, '$')
|
|
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
aofbuf = append(aofbuf, value...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
}
|
|
}()
|
|
}
|
|
if len(aofbuf) > 0 {
|
|
if _, err := f.Write(aofbuf); err != nil {
|
|
return err
|
|
}
|
|
aofbuf = aofbuf[:0]
|
|
}
|
|
if err := f.Sync(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// finally grab any new data that may have been written since
|
|
// the aofshrink has started and swap out the files.
|
|
return func() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
aofbuf = aofbuf[:0]
|
|
for _, values := range c.shrinklog {
|
|
// append the values to the aof buffer
|
|
aofbuf = append(aofbuf, '*')
|
|
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(values)), 10)...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
for _, value := range values {
|
|
aofbuf = append(aofbuf, '$')
|
|
aofbuf = append(aofbuf, strconv.FormatInt(int64(len(value)), 10)...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
aofbuf = append(aofbuf, value...)
|
|
aofbuf = append(aofbuf, '\r', '\n')
|
|
}
|
|
}
|
|
if _, err := f.Write(aofbuf); err != nil {
|
|
return err
|
|
}
|
|
if err := f.Sync(); err != nil {
|
|
return err
|
|
}
|
|
// we now have a shrunken aof file that is fully in-sync with
|
|
// the current dataset. let's swap out the on disk files and
|
|
// point to the new file.
|
|
|
|
// anything below this point is unrecoverable. just log and exit process
|
|
// back up the live aof, just in case of fatal error
|
|
if err := os.Rename(path.Join(c.dir, "appendonly.aof"), path.Join(c.dir, "appendonly.bak")); err != nil {
|
|
log.Fatalf("shink backup fatal operation: %v", err)
|
|
}
|
|
if err := os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof")); err != nil {
|
|
log.Fatalf("shink rename fatal operation: %v", err)
|
|
}
|
|
if err := c.f.Close(); err != nil {
|
|
log.Fatalf("shink live aof close fatal operation: %v", err)
|
|
}
|
|
c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600)
|
|
if err != nil {
|
|
log.Fatalf("shink openfile fatal operation: %v", err)
|
|
}
|
|
var n int64
|
|
n, err = c.f.Seek(0, 2)
|
|
if err != nil {
|
|
log.Fatalf("shink seek end fatal operation: %v", err)
|
|
}
|
|
c.aofsz = int(n)
|
|
|
|
os.Remove(path.Join(c.dir, "appendonly.bak")) // ignore error
|
|
|
|
// kill all followers connections
|
|
for conn := range c.aofconnM {
|
|
conn.Close()
|
|
}
|
|
return nil
|
|
}()
|
|
}()
|
|
if err != nil {
|
|
log.Errorf("aof shrink failed: %v", err)
|
|
return
|
|
}
|
|
}
|