Sync hook msg ttl with server time

This commit is contained in:
Josh Baker 2018-04-17 15:40:11 -07:00
parent cd08d7fa7d
commit b99cd397d6
2 changed files with 20 additions and 31 deletions

View File

@ -219,7 +219,7 @@ func (c *Controller) queueHooks(d *commandDetailsT) error {
for _, msg := range hmsgs { for _, msg := range hmsgs {
c.qidx++ // increment the log id c.qidx++ // increment the log id
key := hookLogPrefix + uint64ToString(c.qidx) key := hookLogPrefix + uint64ToString(c.qidx)
_, _, err := tx.Set(key, string(msg), hookLogSetDefaults()) _, _, err := tx.Set(key, string(msg), hookLogSetDefaults)
if err != nil { if err != nil {
return err return err
} }

View File

@ -17,16 +17,9 @@ import (
"github.com/tidwall/tile38/controller/server" "github.com/tidwall/tile38/controller/server"
) )
const hookLogTTL = time.Second * 30 var hookLogSetDefaults = &buntdb.SetOptions{
func hookLogSetDefaults() *buntdb.SetOptions {
if hookLogTTL > 0 {
return &buntdb.SetOptions{
Expires: true, // automatically delete after 30 seconds Expires: true, // automatically delete after 30 seconds
TTL: hookLogTTL, TTL: time.Second * 30,
}
}
return nil
} }
type hooksByName []*Hook type hooksByName []*Hook
@ -424,8 +417,8 @@ func (h *Hook) Close() {
// notify the manager that there may be something new in the queue. // notify the manager that there may be something new in the queue.
func (h *Hook) Signal() { func (h *Hook) Signal() {
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock()
h.cond.Broadcast() h.cond.Broadcast()
h.mu.Unlock()
} }
// the manager is a forever loop that calls proc whenever there's a signal. // the manager is a forever loop that calls proc whenever there's a signal.
@ -456,8 +449,8 @@ func (h *Hook) manager() {
func (h *Hook) proc() (ok bool) { func (h *Hook) proc() (ok bool) {
var keys, vals []string var keys, vals []string
var ttls []time.Duration var ttls []time.Duration
start := time.Now()
err := h.db.Update(func(tx *buntdb.Tx) error { err := h.db.Update(func(tx *buntdb.Tx) error {
// get keys and vals // get keys and vals
err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool { err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool {
if strings.HasPrefix(key, hookLogPrefix) { if strings.HasPrefix(key, hookLogPrefix) {
@ -472,7 +465,6 @@ func (h *Hook) proc() (ok bool) {
// delete the keys // delete the keys
for _, key := range keys { for _, key := range keys {
if hookLogTTL > 0 {
ttl, err := tx.TTL(key) ttl, err := tx.TTL(key)
if err != nil { if err != nil {
if err != buntdb.ErrNotFound { if err != buntdb.ErrNotFound {
@ -480,7 +472,6 @@ func (h *Hook) proc() (ok bool) {
} }
} }
ttls = append(ttls, ttl) ttls = append(ttls, ttl)
}
_, err = tx.Delete(key) _, err = tx.Delete(key)
if err != nil { if err != nil {
if err != buntdb.ErrNotFound { if err != buntdb.ErrNotFound {
@ -514,24 +505,22 @@ func (h *Hook) proc() (ok bool) {
// failed to send. try to reinsert the remaining. if this fails we lose log entries. // failed to send. try to reinsert the remaining. if this fails we lose log entries.
keys = keys[i:] keys = keys[i:]
vals = vals[i:] vals = vals[i:]
if hookLogTTL > 0 {
ttls = ttls[i:] ttls = ttls[i:]
}
h.db.Update(func(tx *buntdb.Tx) error { h.db.Update(func(tx *buntdb.Tx) error {
for i, key := range keys { for i, key := range keys {
val := vals[i] val := vals[i]
var opts *buntdb.SetOptions ttl := ttls[i] - time.Since(start)
if hookLogTTL > 0 { if ttl > 0 {
opts = &buntdb.SetOptions{ opts := &buntdb.SetOptions{
Expires: true, Expires: true,
TTL: ttls[i], TTL: ttl,
}
} }
_, _, err := tx.Set(key, val, opts) _, _, err := tx.Set(key, val, opts)
if err != nil { if err != nil {
return err return err
} }
} }
}
return nil return nil
}) })
return false return false