From fbeecaacd981f9e793a42fede72b14df0175ddd0 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Wed, 9 Nov 2016 13:43:26 -0700 Subject: [PATCH] faster aof loading About 30% faster loading of AOF file during server restart. --- controller/aof.go | 93 ++++++++++++++++++++++++++++++++++++++++----- controller/crud.go | 40 +++++++++---------- controller/token.go | 26 ++++++++++++- 3 files changed, 128 insertions(+), 31 deletions(-) diff --git a/controller/aof.go b/controller/aof.go index c588419f..7a337820 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -1,6 +1,7 @@ package controller import ( + "bufio" "errors" "fmt" "io" @@ -27,6 +28,8 @@ func (err errAOFHook) Error() string { return fmt.Sprintf("hook: %v", err.err) } +var errInvalidAOF = errors.New("invalid aof file") + func (c *Controller) loadAOF() error { fi, err := c.f.Stat() if err != nil { @@ -50,32 +53,102 @@ func (c *Controller) loadAOF() error { log.Infof("AOF loaded %d commands: %.2fs, %.0f/s, %s", count, float64(d)/float64(time.Second), ps, byteSpeed) }() - rd := resp.NewReader(c.f) + var msg server.Message + rd := bufio.NewReader(c.f) for { - v, _, n, err := rd.ReadMultiBulk() + var nn int + ch, err := rd.ReadByte() if err != nil { if err == io.EOF { return nil } return err } - values := v.Array() - if len(values) == 0 { - return errors.New("multibulk missing command component") + nn += 1 + if ch != '*' { + return errInvalidAOF } - msg := &server.Message{ - Command: strings.ToLower(values[0].String()), - Values: values, + ns, err := rd.ReadString('\n') + if err != nil { + return err } - if _, _, err := c.command(msg, nil); err != nil { + nn += len(ns) + if len(ns) < 2 || ns[len(ns)-2] != '\r' { + return errInvalidAOF + } + n, err := strconv.ParseUint(ns[:len(ns)-2], 10, 64) + if err != nil { + return err + } + if int(n) == 0 { + continue + } + msg.Values = msg.Values[:0] + for i := 0; i < int(n); i++ { + ch, err := rd.ReadByte() + if err != nil { + return err + } + if ch != '$' { + return errInvalidAOF + } + ns, err := rd.ReadString('\n') + if err != nil { + return err + } + if len(ns) < 2 || ns[len(ns)-2] != '\r' { + return errInvalidAOF + } + n, err := strconv.ParseUint(ns[:len(ns)-2], 10, 64) + if err != nil { + return err + } + b := make([]byte, int(n)) + _, err = io.ReadFull(rd, b) + if err != nil { + return err + } + if ch, err := rd.ReadByte(); err != nil { + return err + } else if ch != '\r' { + return errInvalidAOF + } + if ch, err := rd.ReadByte(); err != nil { + return err + } else if ch != '\n' { + return errInvalidAOF + } + msg.Values = append(msg.Values, resp.BytesValue(b)) + if i == 0 { + msg.Command = qlower(b) + } + nn += 1 + len(ns) + int(n) + 2 + } + if _, _, err := c.command(&msg, nil); err != nil { if commandErrIsFatal(err) { return err } } - c.aofsz += n + c.aofsz += nn count++ } } +func qlower(s []byte) string { + if len(s) == 3 { + if s[0] == 'S' && s[1] == 'E' && s[2] == 'T' { + return "set" + } + if s[0] == 'D' && s[1] == 'E' && s[2] == 'L' { + return "del" + } + } + for i := 0; i < len(s); i++ { + if s[i] >= 'A' || s[i] <= 'Z' { + return strings.ToLower(string(s)) + } + } + return string(s) +} func commandErrIsFatal(err error) bool { // FSET (and other writable commands) may return errors that we need diff --git a/controller/crud.go b/controller/crud.go index aecb7fa9..feaeef45 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -403,10 +403,10 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai func (c *Controller) parseSetArgs(vs []resp.Value) ( d commandDetailsT, fields []string, values []float64, xx, nx bool, - expires *float64, etype string, evs []resp.Value, err error, + expires *float64, etype []byte, evs []resp.Value, err error, ) { var ok bool - var typ string + var typ []byte if vs, d.key, ok = tokenval(vs); !ok || d.key == "" { err = errInvalidNumberOfArguments return @@ -415,16 +415,14 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( err = errInvalidNumberOfArguments return } - var arg string + var arg []byte var nvs []resp.Value - fields = make([]string, 0, 8) - values = make([]float64, 0, 8) for { - if nvs, arg, ok = tokenval(vs); !ok || arg == "" { + if nvs, arg, ok = tokenvalbytes(vs); !ok || len(arg) == 0 { err = errInvalidNumberOfArguments return } - if lc(arg, "field") { + if lcb(arg, "field") { vs = nvs var name string var svalue string @@ -450,10 +448,10 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( values = append(values, value) continue } - if lc(arg, "ex") { + if lcb(arg, "ex") { vs = nvs if expires != nil { - err = errInvalidArgument(arg) + err = errInvalidArgument(string(arg)) return } var s string @@ -470,19 +468,19 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( expires = &v continue } - if lc(arg, "xx") { + if lcb(arg, "xx") { vs = nvs if nx { - err = errInvalidArgument(arg) + err = errInvalidArgument(string(arg)) return } xx = true continue } - if lc(arg, "nx") { + if lcb(arg, "nx") { vs = nvs if xx { - err = errInvalidArgument(arg) + err = errInvalidArgument(string(arg)) return } nx = true @@ -490,7 +488,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( } break } - if vs, typ, ok = tokenval(vs); !ok || typ == "" { + if vs, typ, ok = tokenvalbytes(vs); !ok || len(typ) == 0 { err = errInvalidNumberOfArguments return } @@ -502,16 +500,16 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( evs = vs switch { default: - err = errInvalidArgument(typ) + err = errInvalidArgument(string(typ)) return - case lc(typ, "string"): + case lcb(typ, "string"): var str string if vs, str, ok = tokenval(vs); !ok { err = errInvalidNumberOfArguments return } d.obj = geojson.String(str) - case lc(typ, "point"): + case lcb(typ, "point"): var slat, slon, sz string if vs, slat, ok = tokenval(vs); !ok || slat == "" { err = errInvalidNumberOfArguments @@ -554,7 +552,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( } d.obj = sp } - case lc(typ, "bounds"): + case lcb(typ, "bounds"): var sminlat, sminlon, smaxlat, smaxlon string if vs, sminlat, ok = tokenval(vs); !ok || sminlat == "" { err = errInvalidNumberOfArguments @@ -605,7 +603,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( }, } d.obj = g - case lc(typ, "hash"): + case lcb(typ, "hash"): var sp geojson.SimplePoint var shash string if vs, shash, ok = tokenval(vs); !ok || shash == "" { @@ -620,7 +618,7 @@ func (c *Controller) parseSetArgs(vs []resp.Value) ( sp.X = lon sp.Y = lat d.obj = sp - case lc(typ, "object"): + case lcb(typ, "object"): var object string if vs, object, ok = tokenval(vs); !ok || object == "" { err = errInvalidNumberOfArguments @@ -682,6 +680,7 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, c.expireAt(d.key, d.id, d.timestamp.Add(time.Duration(float64(time.Second)*(*ex)))) } switch msg.OutputType { + default: case server.JSON: res = `{"ok":true,"elapsed":"` + time.Now().Sub(start).String() + "\"}" case server.RESP: @@ -690,6 +689,7 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, return notok: switch msg.OutputType { + default: case server.JSON: res = `{"ok":false,"elapsed":"` + time.Now().Sub(start).String() + "\"}" case server.RESP: diff --git a/controller/token.go b/controller/token.go index f24201fa..39cfa58a 100644 --- a/controller/token.go +++ b/controller/token.go @@ -40,6 +40,15 @@ func tokenval(vs []resp.Value) (nvs []resp.Value, token string, ok bool) { return } +func tokenvalbytes(vs []resp.Value) (nvs []resp.Value, token []byte, ok bool) { + if len(vs) > 0 { + token = vs[0].Bytes() + nvs = vs[1:] + ok = true + } + return +} + func tokenlc(line string) (newLine, token string) { for i := 0; i < len(line); i++ { ch := line[i] @@ -69,7 +78,22 @@ func tokenlc(line string) (newLine, token string) { } return "", line } - +func lcb(s1 []byte, s2 string) bool { + if len(s1) != len(s2) { + return false + } + for i := 0; i < len(s1); i++ { + ch := s1[i] + if ch >= 'A' && ch <= 'Z' { + if ch+32 != s2[i] { + return false + } + } else if ch != s2[i] { + return false + } + } + return true +} func lc(s1, s2 string) bool { if len(s1) != len(s2) { return false