From 96283054fc97d9b544675a532e743da32ff9e6d7 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Wed, 30 Mar 2016 09:32:02 -0700 Subject: [PATCH] new shrink formula --- controller/aofshrink.go | 233 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 controller/aofshrink.go diff --git a/controller/aofshrink.go b/controller/aofshrink.go new file mode 100644 index 00000000..e5a2bedc --- /dev/null +++ b/controller/aofshrink.go @@ -0,0 +1,233 @@ +package controller + +import ( + "bytes" + "errors" + "io" + "os" + "path" + "sort" + "strings" + "time" + + "github.com/tidwall/resp" + "github.com/tidwall/tile38/controller/collection" + "github.com/tidwall/tile38/controller/log" + "github.com/tidwall/tile38/geojson" +) + +type objFields struct { + obj geojson.Object + fields []float64 +} + +const maxKeyGroup = 10 +const maxIDGroup = 10 + +// aofshrink shrinks the aof file to it's minimum size. +// There are some pauses but each pause should not take more that 100ms on a busy server. +func (c *Controller) aofshrink() { + start := time.Now() + c.mu.Lock() + c.f.Sync() + if c.shrinking { + c.mu.Unlock() + return + } + f, err := os.Create(path.Join(c.dir, "shrink")) + if err != nil { + log.Errorf("aof shrink failed: %s\n", err.Error()) + return + } + defer func() { + f.Close() + //os.RemoveAll(rewritePath) + }() + var ferr error // stores the final error + c.shrinking = true + endpos := int64(c.aofsz) // 1) Log the aofsize at start. Locked + c.mu.Unlock() + defer func() { + c.mu.Lock() + defer c.mu.Unlock() + c.shrinking = false + + defer func() { + if ferr != nil { + log.Errorf("aof shrink failed: %s\n", ferr.Error()) + } else { + log.Printf("aof shrink completed in %s", time.Now().Sub(start)) + } + }() + if ferr != nil { + return + } + + of, err := os.Open(c.f.Name()) + if err != nil { + ferr = err + return + } + defer of.Close() + if _, err := of.Seek(endpos, 0); err != nil { + ferr = err + return + } + rd := resp.NewReader(of) + for { + v, telnet, _, err := rd.ReadMultiBulk() + if err != nil { + if err == io.EOF { + break + } + ferr = err + return + } + if telnet { + ferr = errors.New("invalid RESP message") + return + } + data, err := v.MarshalRESP() + if err != nil { + ferr = err + return + } + if _, err := f.Write(data); err != nil { + ferr = err + return + } + break + } + of.Close() + // swap files + f.Close() + c.f.Close() + err = os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "aof")) + if err != nil { + log.Fatal("shink rename fatal operation") + } + c.f, err = os.OpenFile(path.Join(c.dir, "aof"), os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + log.Fatal("shink openfile fatal operation") + } + var n int64 + n, err = c.f.Seek(0, 2) + if err != nil { + log.Fatal("shink seek end fatal operation") + } + c.aofsz = int(n) + + }() + log.Infof("aof shrink started at pos %d", endpos) + + // Ascend collections. Load maxKeyGroup at a time. + nextKey := "" + for { + cols := make(map[string]*collection.Collection) + c.mu.Lock() + c.scanGreaterOrEqual(nextKey, func(key string, col *collection.Collection) bool { + if key != nextKey { + cols[key] = col + nextKey = key + } + return len(cols) < maxKeyGroup + }) + c.mu.Unlock() + + keys := make([]string, 0, maxKeyGroup) + for key := range cols { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + col := cols[key] + // Ascend objects. Load maxIDGroup at a time. + nextID := "" + for { + objs := make(map[string]objFields) + c.mu.Lock() + fnames := col.FieldArr() // reload an array of field names to match each object + col.ScanGreaterOrEqual(nextID, 0, func(id string, obj geojson.Object, fields []float64) bool { + if id != nextID { + objs[id] = objFields{obj, fields} + nextID = id + } + return len(objs) < maxIDGroup + }) + c.mu.Unlock() + + ids := make([]string, 0, maxIDGroup) + for id := range objs { + ids = append(ids, id) + } + sort.Strings(ids) + + linebuf := &bytes.Buffer{} + for _, id := range ids { + obj := objs[id] + values := make([]resp.Value, 0, len(obj.fields)*3+16) + values = append(values, resp.StringValue("set"), resp.StringValue(key), resp.StringValue(id)) + for i, fvalue := range obj.fields { + if fvalue != 0 { + values = append(values, resp.StringValue("field"), resp.StringValue(fnames[i]), resp.FloatValue(fvalue)) + } + } + switch obj := obj.obj.(type) { + default: + values = append(values, resp.StringValue("object"), resp.StringValue(obj.JSON())) + case geojson.SimplePoint: + values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Y), resp.FloatValue(obj.X)) + case geojson.Point: + if obj.Coordinates.Z == 0 { + values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Coordinates.Y), resp.FloatValue(obj.Coordinates.X)) + } else { + values = append(values, resp.StringValue("point"), resp.FloatValue(obj.Coordinates.Y), resp.FloatValue(obj.Coordinates.X), resp.FloatValue(obj.Coordinates.Z)) + } + } + data, err := resp.ArrayValue(values).MarshalRESP() + if err != nil { + ferr = err + return + } + linebuf.Write(data) + } + if _, err := f.Write(linebuf.Bytes()); err != nil { + ferr = err + return + } + if len(objs) < maxIDGroup { + break + } + } + } + if len(cols) < maxKeyGroup { + break + } + } + + // load hooks + c.mu.Lock() + for name, hook := range c.hooks { + values := make([]resp.Value, 0, 3+len(hook.Message.Values)) + endpoints := make([]string, len(hook.Endpoints)) + for i, endpoint := range hook.Endpoints { + endpoints[i] = endpoint.Original + } + values = append(values, resp.StringValue("sethook"), resp.StringValue(name), resp.StringValue(strings.Join(endpoints, ","))) + values = append(values, hook.Message.Values...) + data, err := resp.ArrayValue(values).MarshalRESP() + if err != nil { + c.mu.Unlock() + ferr = err + return + } + if _, err := f.Write(data); err != nil { + c.mu.Unlock() + ferr = err + return + } + } + c.mu.Unlock() + +}