Sync hook msg ttl with server time
This commit is contained in:
parent
cd08d7fa7d
commit
b99cd397d6
@ -219,7 +219,7 @@ func (c *Controller) queueHooks(d *commandDetailsT) error {
|
|||||||
for _, msg := range hmsgs {
|
for _, msg := range hmsgs {
|
||||||
c.qidx++ // increment the log id
|
c.qidx++ // increment the log id
|
||||||
key := hookLogPrefix + uint64ToString(c.qidx)
|
key := hookLogPrefix + uint64ToString(c.qidx)
|
||||||
_, _, err := tx.Set(key, string(msg), hookLogSetDefaults())
|
_, _, err := tx.Set(key, string(msg), hookLogSetDefaults)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -17,16 +17,9 @@ import (
|
|||||||
"github.com/tidwall/tile38/controller/server"
|
"github.com/tidwall/tile38/controller/server"
|
||||||
)
|
)
|
||||||
|
|
||||||
const hookLogTTL = time.Second * 30
|
var hookLogSetDefaults = &buntdb.SetOptions{
|
||||||
|
Expires: true, // automatically delete after 30 seconds
|
||||||
func hookLogSetDefaults() *buntdb.SetOptions {
|
TTL: time.Second * 30,
|
||||||
if hookLogTTL > 0 {
|
|
||||||
return &buntdb.SetOptions{
|
|
||||||
Expires: true, // automatically delete after 30 seconds
|
|
||||||
TTL: hookLogTTL,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type hooksByName []*Hook
|
type hooksByName []*Hook
|
||||||
@ -424,8 +417,8 @@ func (h *Hook) Close() {
|
|||||||
// notify the manager that there may be something new in the queue.
|
// notify the manager that there may be something new in the queue.
|
||||||
func (h *Hook) Signal() {
|
func (h *Hook) Signal() {
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
defer h.mu.Unlock()
|
|
||||||
h.cond.Broadcast()
|
h.cond.Broadcast()
|
||||||
|
h.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// the manager is a forever loop that calls proc whenever there's a signal.
|
// the manager is a forever loop that calls proc whenever there's a signal.
|
||||||
@ -456,8 +449,8 @@ func (h *Hook) manager() {
|
|||||||
func (h *Hook) proc() (ok bool) {
|
func (h *Hook) proc() (ok bool) {
|
||||||
var keys, vals []string
|
var keys, vals []string
|
||||||
var ttls []time.Duration
|
var ttls []time.Duration
|
||||||
|
start := time.Now()
|
||||||
err := h.db.Update(func(tx *buntdb.Tx) error {
|
err := h.db.Update(func(tx *buntdb.Tx) error {
|
||||||
|
|
||||||
// get keys and vals
|
// get keys and vals
|
||||||
err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool {
|
err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool {
|
||||||
if strings.HasPrefix(key, hookLogPrefix) {
|
if strings.HasPrefix(key, hookLogPrefix) {
|
||||||
@ -472,15 +465,13 @@ func (h *Hook) proc() (ok bool) {
|
|||||||
|
|
||||||
// delete the keys
|
// delete the keys
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
if hookLogTTL > 0 {
|
ttl, err := tx.TTL(key)
|
||||||
ttl, err := tx.TTL(key)
|
if err != nil {
|
||||||
if err != nil {
|
if err != buntdb.ErrNotFound {
|
||||||
if err != buntdb.ErrNotFound {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ttls = append(ttls, ttl)
|
|
||||||
}
|
}
|
||||||
|
ttls = append(ttls, ttl)
|
||||||
_, err = tx.Delete(key)
|
_, err = tx.Delete(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != buntdb.ErrNotFound {
|
if err != buntdb.ErrNotFound {
|
||||||
@ -514,22 +505,20 @@ func (h *Hook) proc() (ok bool) {
|
|||||||
// failed to send. try to reinsert the remaining. if this fails we lose log entries.
|
// failed to send. try to reinsert the remaining. if this fails we lose log entries.
|
||||||
keys = keys[i:]
|
keys = keys[i:]
|
||||||
vals = vals[i:]
|
vals = vals[i:]
|
||||||
if hookLogTTL > 0 {
|
ttls = ttls[i:]
|
||||||
ttls = ttls[i:]
|
|
||||||
}
|
|
||||||
h.db.Update(func(tx *buntdb.Tx) error {
|
h.db.Update(func(tx *buntdb.Tx) error {
|
||||||
for i, key := range keys {
|
for i, key := range keys {
|
||||||
val := vals[i]
|
val := vals[i]
|
||||||
var opts *buntdb.SetOptions
|
ttl := ttls[i] - time.Since(start)
|
||||||
if hookLogTTL > 0 {
|
if ttl > 0 {
|
||||||
opts = &buntdb.SetOptions{
|
opts := &buntdb.SetOptions{
|
||||||
Expires: true,
|
Expires: true,
|
||||||
TTL: ttls[i],
|
TTL: ttl,
|
||||||
|
}
|
||||||
|
_, _, err := tx.Set(key, val, opts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
_, _, err := tx.Set(key, val, opts)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user