diff --git a/controller/aof.go b/controller/aof.go index 7381e874..efbb4731 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -219,7 +219,7 @@ func (c *Controller) queueHooks(d *commandDetailsT) error { for _, msg := range hmsgs { c.qidx++ // increment the log id key := hookLogPrefix + uint64ToString(c.qidx) - _, _, err := tx.Set(key, string(msg), hookLogSetDefaults()) + _, _, err := tx.Set(key, string(msg), hookLogSetDefaults) if err != nil { return err } diff --git a/controller/hooks.go b/controller/hooks.go index 17ce3eac..3b291940 100644 --- a/controller/hooks.go +++ b/controller/hooks.go @@ -17,16 +17,9 @@ import ( "github.com/tidwall/tile38/controller/server" ) -const hookLogTTL = time.Second * 30 - -func hookLogSetDefaults() *buntdb.SetOptions { - if hookLogTTL > 0 { - return &buntdb.SetOptions{ - Expires: true, // automatically delete after 30 seconds - TTL: hookLogTTL, - } - } - return nil +var hookLogSetDefaults = &buntdb.SetOptions{ + Expires: true, // automatically delete after 30 seconds + TTL: time.Second * 30, } type hooksByName []*Hook @@ -424,8 +417,8 @@ func (h *Hook) Close() { // notify the manager that there may be something new in the queue. func (h *Hook) Signal() { h.mu.Lock() - defer h.mu.Unlock() h.cond.Broadcast() + h.mu.Unlock() } // the manager is a forever loop that calls proc whenever there's a signal. @@ -456,8 +449,8 @@ func (h *Hook) manager() { func (h *Hook) proc() (ok bool) { var keys, vals []string var ttls []time.Duration + start := time.Now() err := h.db.Update(func(tx *buntdb.Tx) error { - // get keys and vals err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool { if strings.HasPrefix(key, hookLogPrefix) { @@ -472,15 +465,13 @@ func (h *Hook) proc() (ok bool) { // delete the keys for _, key := range keys { - if hookLogTTL > 0 { - ttl, err := tx.TTL(key) - if err != nil { - if err != buntdb.ErrNotFound { - return err - } + ttl, err := tx.TTL(key) + if err != nil { + if err != buntdb.ErrNotFound { + return err } - ttls = append(ttls, ttl) } + ttls = append(ttls, ttl) _, err = tx.Delete(key) if err != nil { if err != buntdb.ErrNotFound { @@ -514,22 +505,20 @@ func (h *Hook) proc() (ok bool) { // failed to send. try to reinsert the remaining. if this fails we lose log entries. keys = keys[i:] vals = vals[i:] - if hookLogTTL > 0 { - ttls = ttls[i:] - } + ttls = ttls[i:] h.db.Update(func(tx *buntdb.Tx) error { for i, key := range keys { val := vals[i] - var opts *buntdb.SetOptions - if hookLogTTL > 0 { - opts = &buntdb.SetOptions{ + ttl := ttls[i] - time.Since(start) + if ttl > 0 { + opts := &buntdb.SetOptions{ Expires: true, - TTL: ttls[i], + TTL: ttl, + } + _, _, err := tx.Set(key, val, opts) + if err != nil { + return err } - } - _, _, err := tx.Set(key, val, opts) - if err != nil { - return err } } return nil