Sync hook msg ttl with server time

This commit is contained in:
Josh Baker 2018-04-17 15:40:11 -07:00
parent cd08d7fa7d
commit b99cd397d6
2 changed files with 20 additions and 31 deletions

View File

@ -219,7 +219,7 @@ func (c *Controller) queueHooks(d *commandDetailsT) error {
for _, msg := range hmsgs { for _, msg := range hmsgs {
c.qidx++ // increment the log id c.qidx++ // increment the log id
key := hookLogPrefix + uint64ToString(c.qidx) key := hookLogPrefix + uint64ToString(c.qidx)
_, _, err := tx.Set(key, string(msg), hookLogSetDefaults()) _, _, err := tx.Set(key, string(msg), hookLogSetDefaults)
if err != nil { if err != nil {
return err return err
} }

View File

@ -17,16 +17,9 @@ import (
"github.com/tidwall/tile38/controller/server" "github.com/tidwall/tile38/controller/server"
) )
const hookLogTTL = time.Second * 30 var hookLogSetDefaults = &buntdb.SetOptions{
Expires: true, // automatically delete after 30 seconds
func hookLogSetDefaults() *buntdb.SetOptions { TTL: time.Second * 30,
if hookLogTTL > 0 {
return &buntdb.SetOptions{
Expires: true, // automatically delete after 30 seconds
TTL: hookLogTTL,
}
}
return nil
} }
type hooksByName []*Hook type hooksByName []*Hook
@ -424,8 +417,8 @@ func (h *Hook) Close() {
// notify the manager that there may be something new in the queue. // notify the manager that there may be something new in the queue.
func (h *Hook) Signal() { func (h *Hook) Signal() {
h.mu.Lock() h.mu.Lock()
defer h.mu.Unlock()
h.cond.Broadcast() h.cond.Broadcast()
h.mu.Unlock()
} }
// the manager is a forever loop that calls proc whenever there's a signal. // the manager is a forever loop that calls proc whenever there's a signal.
@ -456,8 +449,8 @@ func (h *Hook) manager() {
func (h *Hook) proc() (ok bool) { func (h *Hook) proc() (ok bool) {
var keys, vals []string var keys, vals []string
var ttls []time.Duration var ttls []time.Duration
start := time.Now()
err := h.db.Update(func(tx *buntdb.Tx) error { err := h.db.Update(func(tx *buntdb.Tx) error {
// get keys and vals // get keys and vals
err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool { err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool {
if strings.HasPrefix(key, hookLogPrefix) { if strings.HasPrefix(key, hookLogPrefix) {
@ -472,15 +465,13 @@ func (h *Hook) proc() (ok bool) {
// delete the keys // delete the keys
for _, key := range keys { for _, key := range keys {
if hookLogTTL > 0 { ttl, err := tx.TTL(key)
ttl, err := tx.TTL(key) if err != nil {
if err != nil { if err != buntdb.ErrNotFound {
if err != buntdb.ErrNotFound { return err
return err
}
} }
ttls = append(ttls, ttl)
} }
ttls = append(ttls, ttl)
_, err = tx.Delete(key) _, err = tx.Delete(key)
if err != nil { if err != nil {
if err != buntdb.ErrNotFound { if err != buntdb.ErrNotFound {
@ -514,22 +505,20 @@ func (h *Hook) proc() (ok bool) {
// failed to send. try to reinsert the remaining. if this fails we lose log entries. // failed to send. try to reinsert the remaining. if this fails we lose log entries.
keys = keys[i:] keys = keys[i:]
vals = vals[i:] vals = vals[i:]
if hookLogTTL > 0 { ttls = ttls[i:]
ttls = ttls[i:]
}
h.db.Update(func(tx *buntdb.Tx) error { h.db.Update(func(tx *buntdb.Tx) error {
for i, key := range keys { for i, key := range keys {
val := vals[i] val := vals[i]
var opts *buntdb.SetOptions ttl := ttls[i] - time.Since(start)
if hookLogTTL > 0 { if ttl > 0 {
opts = &buntdb.SetOptions{ opts := &buntdb.SetOptions{
Expires: true, Expires: true,
TTL: ttls[i], TTL: ttl,
}
_, _, err := tx.Set(key, val, opts)
if err != nil {
return err
} }
}
_, _, err := tx.Set(key, val, opts)
if err != nil {
return err
} }
} }
return nil return nil