From 1780badf1b6f1c1bc91d156d6154802dca9fc495 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sat, 30 Sep 2017 11:06:10 -0700 Subject: [PATCH] isolated expires list mutex --- controller/aof.go | 10 +++---- controller/aofshrink.go | 6 ++-- controller/checksum.go | 14 +++++----- controller/client.go | 8 +++--- controller/controller.go | 35 +++++++++++++---------- controller/crud.go | 5 +++- controller/expire.go | 60 ++++++++++++++++++++++------------------ controller/stats.go | 2 +- 8 files changed, 77 insertions(+), 63 deletions(-) diff --git a/controller/aof.go b/controller/aof.go index 813f719e..46eb2308 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -31,7 +31,7 @@ func (err errAOFHook) Error() string { var errInvalidAOF = errors.New("invalid aof file") func (c *Controller) loadAOF() error { - fi, err := c.f.Stat() + fi, err := c.aof.Stat() if err != nil { return err } @@ -54,7 +54,7 @@ func (c *Controller) loadAOF() error { count, float64(d)/float64(time.Second), ps, byteSpeed) }() var msg server.Message - rd := bufio.NewReader(c.f) + rd := bufio.NewReader(c.aof) for { var nn int ch, err := rd.ReadByte() @@ -194,7 +194,7 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { if err != nil { return err } - n, err := c.f.Write(data) + n, err := c.aof.Write(data) if err != nil { return err } @@ -342,7 +342,7 @@ func (c *Controller) cmdAOF(msg *server.Message) (res string, err error) { if err != nil || pos < 0 { return "", errInvalidArgument(spos) } - f, err := os.Open(c.f.Name()) + f, err := os.Open(c.aof.Name()) if err != nil { return "", err } @@ -375,7 +375,7 @@ func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *server.AnyReaderWrite } c.mu.RLock() - f, err := os.Open(c.f.Name()) + f, err := os.Open(c.aof.Name()) c.mu.RUnlock() if err != nil { return err diff --git a/controller/aofshrink.go b/controller/aofshrink.go index 01e93834..05654c21 100644 --- a/controller/aofshrink.go +++ b/controller/aofshrink.go @@ -263,7 +263,7 @@ func (c *Controller) aofshrink() { // anything below this point is unrecoverable. just log and exit process // back up the live aof, just in case of fatal error - if err := c.f.Close(); err != nil { + if err := c.aof.Close(); err != nil { log.Fatalf("shink live aof close fatal operation: %v", err) } if err := os.Rename(path.Join(c.dir, "appendonly.aof"), path.Join(c.dir, "appendonly.bak")); err != nil { @@ -272,12 +272,12 @@ func (c *Controller) aofshrink() { if err := os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof")); err != nil { log.Fatalf("shink rename fatal operation: %v", err) } - c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600) + c.aof, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600) if err != nil { log.Fatalf("shink openfile fatal operation: %v", err) } var n int64 - n, err = c.f.Seek(0, 2) + n, err = c.aof.Seek(0, 2) if err != nil { log.Fatalf("shink seek end fatal operation: %v", err) } diff --git a/controller/checksum.go b/controller/checksum.go index bca5100e..1e2dcdf1 100644 --- a/controller/checksum.go +++ b/controller/checksum.go @@ -19,7 +19,7 @@ func (c *Controller) checksum(pos, size int64) (sum string, err error) { return "", io.EOF } var f *os.File - f, err = os.Open(c.f.Name()) + f, err = os.Open(c.aof.Name()) if err != nil { return } @@ -186,10 +186,10 @@ func (c *Controller) followCheckSome(addr string, followc int) (pos int64, err e } } fullpos := pos - fname := c.f.Name() + fname := c.aof.Name() if pos == 0 { - c.f.Close() - c.f, err = os.Create(fname) + c.aof.Close() + c.aof, err = os.Create(fname) if err != nil { log.Fatalf("could not recreate aof, possible data loss. %s", err.Error()) return 0, err @@ -199,7 +199,7 @@ func (c *Controller) followCheckSome(addr string, followc int) (pos int64, err e // we want to truncate at a command location // search for nearest command - pos, err = getEndOfLastValuePositionInFile(c.f.Name(), fullpos) + pos, err = getEndOfLastValuePositionInFile(c.aof.Name(), fullpos) if err != nil { return 0, err } @@ -211,12 +211,12 @@ func (c *Controller) followCheckSome(addr string, followc int) (pos int64, err e } log.Warnf("truncating aof to %d", pos) // any errror below are fatal. - c.f.Close() + c.aof.Close() if err := os.Truncate(fname, pos); err != nil { log.Fatalf("could not truncate aof, possible data loss. %s", err.Error()) return 0, err } - c.f, err = os.OpenFile(fname, os.O_CREATE|os.O_RDWR, 0600) + c.aof, err = os.OpenFile(fname, os.O_CREATE|os.O_RDWR, 0600) if err != nil { log.Fatalf("could not create aof, possible data loss. %s", err.Error()) return 0, err diff --git a/controller/client.go b/controller/client.go index 43a6e04d..a29c0925 100644 --- a/controller/client.go +++ b/controller/client.go @@ -82,7 +82,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, } var list []*clientConn c.connsmu.RLock() - for _, cc := range c.conns2 { + for _, cc := range c.conns { list = append(list, cc) } c.connsmu.RUnlock() @@ -115,7 +115,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, } name := "" c.connsmu.RLock() - if cc, ok := c.conns2[conn]; ok { + if cc, ok := c.conns[conn]; ok { name = cc.name.get() } c.connsmu.RUnlock() @@ -141,7 +141,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, } } c.connsmu.RLock() - if cc, ok := c.conns2[conn]; ok { + if cc, ok := c.conns[conn]; ok { cc.name.set(name) } c.connsmu.RUnlock() @@ -187,7 +187,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string, } var cclose *clientConn c.connsmu.RLock() - for _, cc := range c.conns2 { + for _, cc := range c.conns { if useID && fmt.Sprintf("%d", cc.id) == id { cclose = cc break diff --git a/controller/controller.go b/controller/controller.go index 0eb6e1d9..b9c5fc94 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -64,6 +64,7 @@ type Controller struct { host string port int http bool + dir string started time.Time config *Config epc *endpoint.EndpointManager @@ -81,15 +82,19 @@ type Controller struct { outOfMemory abool connsmu sync.RWMutex - conns2 map[*server.Conn]*clientConn + conns map[*server.Conn]*clientConn + + exlistmu sync.RWMutex + exlist []exitem + + mu sync.RWMutex + aof *os.File // active aof file + aofsz int // active size of the aof file + qdb *buntdb.DB // hook queue log + qidx uint64 // hook queue log last idx + cols *btree.BTree // data collections + expires map[string]map[string]time.Time // synced with cols - mu sync.RWMutex - f *os.File - qdb *buntdb.DB // hook queue log - qidx uint64 // hook queue log last idx - cols *btree.BTree - aofsz int - dir string follows map[*bytes.Buffer]bool fcond *sync.Cond lstack []*commandDetailsT @@ -102,8 +107,6 @@ type Controller struct { hooks map[string]*Hook // hook name hookcols map[string]map[string]*Hook // col key aofconnM map[net.Conn]bool - expires map[string]map[string]time.Time - exlist []exitem } // ListenAndServe starts a new tile38 server @@ -126,7 +129,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http aofconnM: make(map[net.Conn]bool), expires: make(map[string]map[string]time.Time), started: time.Now(), - conns2: make(map[*server.Conn]*clientConn), + conns: make(map[*server.Conn]*clientConn), epc: endpoint.NewEndpointManager(), http: http, } @@ -161,6 +164,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http if err != nil { return err } + c.qdb = qdb c.qidx = qidx if err := c.migrateAOF(); err != nil { @@ -170,7 +174,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http if err != nil { return err } - c.f = f + c.aof = f if err := c.loadAOF(); err != nil { return err } @@ -192,7 +196,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http }() handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { c.connsmu.RLock() - if cc, ok := c.conns2[conn]; ok { + if cc, ok := c.conns[conn]; ok { cc.last.set(time.Now()) } c.connsmu.RUnlock() @@ -236,7 +240,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http cc.conn = conn c.connsmu.Lock() - c.conns2[conn] = cc + c.conns[conn] = cc c.connsmu.Unlock() c.statsTotalConns.add(1) @@ -244,9 +248,10 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http closed := func(conn *server.Conn) { c.connsmu.Lock() - delete(c.conns2, conn) + delete(c.conns, conn) c.connsmu.Unlock() } + return server.ListenAndServe(host, port, protected, handler, opened, closed, ln, http) } diff --git a/controller/crud.go b/controller/crud.go index ce028dc0..40d18fa7 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -465,7 +465,10 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai return } c.cols = btree.New(16, 0) - c.clearAllExpires() + c.exlistmu.Lock() + c.exlist = nil + c.exlistmu.Unlock() + c.expires = make(map[string]map[string]time.Time) c.hooks = make(map[string]*Hook) c.hookcols = make(map[string]map[string]*Hook) d.command = "flushdb" diff --git a/controller/expire.go b/controller/expire.go index e49538a1..32addcc3 100644 --- a/controller/expire.go +++ b/controller/expire.go @@ -32,9 +32,16 @@ func (a *exitem) Less(v btree.Item, ctx interface{}) bool { return a.id < b.id } -// clearAllExpires removes all items that are marked at expires. -func (c *Controller) clearAllExpires() { - c.expires = make(map[string]map[string]time.Time) +// fillExpiresList occurs once at startup +func (c *Controller) fillExpiresList() { + c.exlistmu.Lock() + c.exlist = c.exlist[:0] + for key, m := range c.expires { + for id, at := range m { + c.exlist = append(c.exlist, exitem{key, id, at}) + } + } + c.exlistmu.Unlock() } // clearIDExpires clears a single item from the expires list. @@ -67,9 +74,9 @@ func (c *Controller) expireAt(key, id string, at time.Time) { c.expires[key] = m } m[id] = at - if c.exlist != nil { - c.exlist = append(c.exlist, exitem{key, id, at}) - } + c.exlistmu.Lock() + c.exlist = append(c.exlist, exitem{key, id, at}) + c.exlistmu.Unlock() } // getExpires returns the when an item expires. @@ -94,33 +101,35 @@ func (c *Controller) hasExpired(key, id string) bool { return time.Now().After(at) } -func (c *Controller) fillExpiresList() { - c.exlist = make([]exitem, 0) - for key, m := range c.expires { - for id, at := range m { - c.exlist = append(c.exlist, exitem{key, id, at}) - } - } -} - // backgroundExpiring watches for when items that have expired must be purged // from the database. It's executes 10 times a seconds. func (c *Controller) backgroundExpiring() { rand.Seed(time.Now().UnixNano()) + var purgelist []exitem for { - c.mu.Lock() if c.stopBackgroundExpiring.on() { - c.mu.Unlock() return } now := time.Now() - var purged int + purgelist = purgelist[:0] + c.exlistmu.Lock() for i := 0; i < 20 && len(c.exlist) > 0; i++ { ix := rand.Int() % len(c.exlist) if now.After(c.exlist[ix].at) { - if c.hasExpired(c.exlist[ix].key, c.exlist[ix].id) { + // purge from exlist + purgelist = append(purgelist, c.exlist[ix]) + c.exlist[ix] = c.exlist[len(c.exlist)-1] + c.exlist = c.exlist[:len(c.exlist)-1] + } + } + c.exlistmu.Unlock() + if len(purgelist) > 0 { + c.mu.Lock() + for _, item := range purgelist { + if c.hasExpired(item.key, item.id) { + // purge from database msg := &server.Message{} - msg.Values = resp.MultiBulkValue("del", c.exlist[ix].key, c.exlist[ix].id).Array() + msg.Values = resp.MultiBulkValue("del", item.key, item.id).Array() msg.Command = "del" _, d, err := c.cmdDel(msg) if err != nil { @@ -133,15 +142,12 @@ func (c *Controller) backgroundExpiring() { log.Fatal(err) continue } - purged++ } - c.exlist[ix] = c.exlist[len(c.exlist)-1] - c.exlist = c.exlist[:len(c.exlist)-1] } - } - c.mu.Unlock() - if purged > 5 { - continue + c.mu.Unlock() + if len(purgelist) > 5 { + continue + } } time.Sleep(time.Second / 10) } diff --git a/controller/stats.go b/controller/stats.go index 5c7aea9b..5e83235b 100644 --- a/controller/stats.go +++ b/controller/stats.go @@ -146,7 +146,7 @@ func (c *Controller) writeInfoServer(w *bytes.Buffer) { } func (c *Controller) writeInfoClients(w *bytes.Buffer) { c.connsmu.RLock() - fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns2)) // Number of client connections (excluding connections from slaves) + fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns)) // Number of client connections (excluding connections from slaves) c.connsmu.RUnlock() } func (c *Controller) writeInfoMemory(w *bytes.Buffer) {