diff --git a/internal/server/aof.go b/internal/server/aof.go index 42a42946..5b7bf7bf 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -115,6 +115,8 @@ func commandErrIsFatal(err error) bool { return true } +// flushAOF flushes all aof buffer data to disk. Set sync to true to sync the +// fsync the file. func (server *Server) flushAOF(sync bool) { if len(server.aofbuf) > 0 { _, err := server.aof.Write(server.aofbuf) @@ -126,15 +128,30 @@ func (server *Server) flushAOF(sync bool) { panic(err) } } - server.aofbuf = server.aofbuf[:0] + if cap(server.aofbuf) > 1024*1024*32 { + server.aofbuf = make([]byte, 0, 1024*1024*32) + } else { + server.aofbuf = server.aofbuf[:0] + } } } +type writeAOFDetails struct { + appendBufferElapsed time.Duration + notifyLiveElapsed time.Duration + geofencesElapsed time.Duration +} + func (server *Server) writeAOF(args []string, d *commandDetails) error { + _, err := server.writeAOFDetails(args, d) + return err +} + +func (server *Server) writeAOFDetails(args []string, d *commandDetails) (details writeAOFDetails, err error) { if d != nil && !d.updated { // just ignore writes if the command did not update - return nil + return details, nil } if server.shrinking { @@ -144,6 +161,7 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { } if server.aof != nil { + start := time.Now() atomic.StoreInt32(&server.aofdirty, 1) // prewrite optimization flag n := len(server.aofbuf) server.aofbuf = redcon.AppendArray(server.aofbuf, len(args)) @@ -151,14 +169,21 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { server.aofbuf = redcon.AppendBulkString(server.aofbuf, arg) } server.aofsz += len(server.aofbuf) - n + details.appendBufferElapsed = time.Since(start) } // notify aof live connections that we have new data + start := time.Now() server.fcond.L.Lock() server.fcond.Broadcast() server.fcond.L.Unlock() + details.notifyLiveElapsed = time.Since(start) // process geofences + start = time.Now() + defer func() { + details.geofencesElapsed = time.Since(start) + }() if d != nil { // webhook geofences if server.config.followHost() == "" { @@ -167,13 +192,13 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { // queue children for _, d := range d.children { if err := server.queueHooks(d); err != nil { - return err + return details, err } } } else { // queue parent if err := server.queueHooks(d); err != nil { - return err + return details, err } } } @@ -194,7 +219,7 @@ func (server *Server) writeAOF(args []string, d *commandDetails) error { } server.lcond.L.Unlock() } - return nil + return details, nil } func (server *Server) getQueueCandidates(d *commandDetails) []*Hook { diff --git a/internal/server/dev.go b/internal/server/dev.go index c1a2562d..06a57317 100644 --- a/internal/server/dev.go +++ b/internal/server/dev.go @@ -74,18 +74,31 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { if err != nil { return NOMessage, errInvalidArgument(snumPoints) } - docmd := func(args []string) error { + + type docmdDetails struct { + writeAOFDetails writeAOFDetails + cmdElapsed time.Duration + aofElapsed time.Duration + } + + docmd := func(args []string) (docmdDetails docmdDetails, err error) { + c.mu.Lock() + defer c.mu.Unlock() var nmsg Message nmsg = *msg nmsg._command = "" nmsg.Args = args var d commandDetails + start := time.Now() _, d, err = c.command(&nmsg, nil) + docmdDetails.cmdElapsed = time.Since(start) if err != nil { - return err + return docmdDetails, err } - - return c.writeAOF(nmsg.Args, &d) + start = time.Now() + docmdDetails.writeAOFDetails, err = c.writeAOFDetails(nmsg.Args, &d) + docmdDetails.aofElapsed = time.Since(start) + return docmdDetails, err } rand.Seed(time.Now().UnixNano()) @@ -128,13 +141,31 @@ func (c *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { strconv.FormatFloat(lon, 'f', -1, 64), ) } - if err := docmd(values); err != nil { + start := time.Now() + docmdDetails, err := docmd(values) + if err != nil { log.Fatal(err) return } + elapsed := time.Since(start) + if elapsed > time.Millisecond*5 { + log.Infof("%d"+ + ", %6.1f cmd, %6.1f aof"+ + ", %6.1f buf, %6.1f not, %6.1f fence"+ + ", %6.1f tot", + len(values), + docmdDetails.cmdElapsed.Seconds()*1000, + docmdDetails.aofElapsed.Seconds()*1000, + docmdDetails.writeAOFDetails.appendBufferElapsed.Seconds()*1000, + docmdDetails.writeAOFDetails.notifyLiveElapsed.Seconds()*1000, + docmdDetails.writeAOFDetails.geofencesElapsed.Seconds()*1000, + elapsed.Seconds()*1000, + ) + } atomic.AddUint64(&k, 1) if j%1000 == 1000-1 { - log.Infof("massinsert: %s %d/%d", key, atomic.LoadUint64(&k), cols*objs) + log.Debugf("mass: %s %d/%d", + key, atomic.LoadUint64(&k), cols*objs) } } }(key) diff --git a/internal/server/server.go b/internal/server/server.go index bf6c8287..9e163766 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -600,10 +600,7 @@ func (server *Server) backgroundSyncAOF() { func() { server.mu.Lock() defer server.mu.Unlock() - if len(server.aofbuf) > 0 { - server.flushAOF(true) - } - server.aofbuf = nil + server.flushAOF(true) }() } } @@ -831,8 +828,6 @@ func (server *Server) handleInputCommand(client *Client, msg *Message) error { case "echo": case "massinsert": // dev operation - server.mu.Lock() - defer server.mu.Unlock() case "sleep": // dev operation server.mu.RLock() diff --git a/internal/server/stats.go b/internal/server/stats.go index 6b66b1f2..ac4f582b 100644 --- a/internal/server/stats.go +++ b/internal/server/stats.go @@ -9,6 +9,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/tidwall/resp" @@ -16,6 +17,30 @@ import ( "github.com/tidwall/tile38/internal/collection" ) +var memStats runtime.MemStats +var memStatsMu sync.Mutex +var memStatsBG bool + +// ReadMemStats returns the latest memstats. It provides an instant response. +func readMemStats() runtime.MemStats { + memStatsMu.Lock() + if !memStatsBG { + runtime.ReadMemStats(&memStats) + go func() { + for { + memStatsMu.Lock() + runtime.ReadMemStats(&memStats) + memStatsMu.Unlock() + time.Sleep(time.Second / 5) + } + }() + memStatsBG = true + } + ms := memStats + memStatsMu.Unlock() + return ms +} + func (c *Server) cmdStats(msg *Message) (res resp.Value, err error) { start := time.Now() vs := msg.Args[1:] @@ -133,8 +158,7 @@ func (c *Server) basicStats(m map[string]interface{}) { m["num_points"] = points m["num_objects"] = objects m["num_strings"] = strings - var mem runtime.MemStats - runtime.ReadMemStats(&mem) + mem := readMemStats() avgsz := 0 if points != 0 { avgsz = int(mem.HeapAlloc) / points @@ -154,9 +178,8 @@ func (c *Server) basicStats(m map[string]interface{}) { // extStats populates the passed map with extended system/go/tile38 statistics func (c *Server) extStats(m map[string]interface{}) { - var mem runtime.MemStats n, _ := runtime.ThreadCreateProfile(nil) - runtime.ReadMemStats(&mem) + mem := readMemStats() // Go/Memory Stats @@ -326,8 +349,7 @@ func (c *Server) writeInfoClients(w *bytes.Buffer) { c.connsmu.RUnlock() } func (c *Server) writeInfoMemory(w *bytes.Buffer) { - var mem runtime.MemStats - runtime.ReadMemStats(&mem) + mem := readMemStats() fmt.Fprintf(w, "used_memory:%d\r\n", mem.Alloc) // total number of bytes allocated by Redis using its allocator (either standard libc, jemalloc, or an alternative allocator such as tcmalloc } func boolInt(t bool) int {