hook/channel expiration

This commit is contained in:
tidwall 2018-08-13 17:36:02 -07:00
parent eef5f3c7b6
commit 25a60a2497
2 changed files with 72 additions and 20 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/tidwall/tile38/pkg/collection"
"github.com/tidwall/tile38/pkg/core"
"github.com/tidwall/tile38/pkg/endpoint"
"github.com/tidwall/tile38/pkg/expire"
"github.com/tidwall/tile38/pkg/geojson"
"github.com/tidwall/tile38/pkg/log"
"github.com/tidwall/tile38/pkg/server"
@ -113,6 +114,7 @@ type Controller struct {
luapool *lStatePool
pubsub *pubsub
hookex expire.List
}
// ListenAndServe starts a new tile38 server
@ -143,6 +145,12 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
http: http,
pubsub: newPubsub(),
}
c.hookex.Expired = func(item expire.Item) {
switch v := item.(type) {
case *Hook:
c.possiblyExpireHook(v)
}
}
c.epc = endpoint.NewManager(c)
c.luascripts = c.NewScriptMap()
c.luapool = c.NewPool()

View File

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -65,8 +66,8 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
var commandvs []resp.Value
var cmdlc string
var types []string
// var expires float64
// var expiresSet bool
var expires float64
var expiresSet bool
metaMap := make(map[string]string)
for {
commandvs = vs
@ -88,18 +89,18 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
}
metaMap[metakey] = metaval
continue
// case "ex":
// var s string
// if vs, s, ok = tokenval(vs); !ok || s == "" {
// return server.NOMessage, d, errInvalidNumberOfArguments
// }
// v, err := strconv.ParseFloat(s, 64)
// if err != nil {
// return server.NOMessage, d, errInvalidArgument(s)
// }
// expires = v
// expiresSet = true
case "ex":
var s string
if vs, s, ok = tokenval(vs); !ok || s == "" {
return server.NOMessage, d, errInvalidNumberOfArguments
}
v, err := strconv.ParseFloat(s, 64)
if err != nil {
return server.NOMessage, d, errInvalidArgument(s)
}
expires = v
expiresSet = true
continue
case "nearby":
types = nearbyTypes
case "within", "intersects":
@ -116,7 +117,6 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
return server.NOMessage, d, errors.New("missing FENCE argument")
}
s.cmd = cmdlc
cmsg := &server.Message{}
*cmsg = *msg
cmsg.Values = make([]resp.Value, len(commandvs))
@ -142,10 +142,10 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
channel: chanCmd,
cond: sync.NewCond(&sync.Mutex{}),
}
// if expiresSet {
// hook.expires =
// time.Now().Add(time.Duration(expires * float64(time.Second)))
// }
if expiresSet {
hook.expires =
time.Now().Add(time.Duration(expires * float64(time.Second)))
}
if !chanCmd {
hook.db = c.qdb
}
@ -154,9 +154,9 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
&wr, cmsg, s.key, s.output, s.precision, s.glob, false,
s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields)
if err != nil {
return server.NOMessage, d, err
}
if h, ok := c.hooks[name]; ok {
if h.channel != chanCmd {
return server.NOMessage, d,
@ -166,6 +166,9 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
// it was a match so we do nothing. But let's signal just
// for good measure.
h.Signal()
if !hook.expires.IsZero() {
c.hookex.Push(hook)
}
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
@ -180,6 +183,7 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
}
delete(c.hooks, h.Name)
}
d.updated = true
d.timestamp = time.Now()
c.hooks[name] = hook
@ -190,6 +194,9 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) (
}
hm[name] = hook
hook.Open()
if !hook.expires.IsZero() {
c.hookex.Push(hook)
}
switch msg.OutputType {
case server.JSON:
return server.OKMessage(msg, start), d, nil
@ -278,6 +285,34 @@ func (c *Controller) cmdPDelHook(msg *server.Message, channel bool) (
return
}
func (c *Controller) possiblyExpireHook(h *Hook) {
c.mu.Lock()
if h, ok := c.hooks[h.Name]; ok {
if !h.expires.IsZero() && time.Now().After(h.expires) {
// purge from database
msg := &server.Message{}
if h.channel {
msg.Values = resp.MultiBulkValue("delchan", h.Name).Array()
msg.Command = "delchan"
} else {
msg.Values = resp.MultiBulkValue("delhook", h.Name).Array()
msg.Command = "delhook"
}
_, d, err := c.cmdDelHook(msg, h.channel)
if err != nil {
c.mu.Unlock()
panic(err)
}
if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil {
c.mu.Unlock()
panic(err)
}
log.Debugf("purged hook %v", h.Name)
}
}
c.mu.Unlock()
}
func (c *Controller) cmdHooks(msg *server.Message, channel bool) (
res resp.Value, err error,
) {
@ -396,6 +431,11 @@ type Hook struct {
expires time.Time
}
// Expires returns when the hook expires. Required by the expire.Item interface.
func (h *Hook) Expires() time.Time {
return h.expires
}
// Equals returns true if two hooks are equal
func (h *Hook) Equals(hook *Hook) bool {
if h.Key != hook.Key ||
@ -404,6 +444,9 @@ func (h *Hook) Equals(hook *Hook) bool {
len(h.Metas) != len(hook.Metas) {
return false
}
if !h.expires.Equal(hook.expires) {
return false
}
for i, endpoint := range h.Endpoints {
if endpoint != hook.Endpoints[i] {
return false
@ -415,6 +458,7 @@ func (h *Hook) Equals(hook *Hook) bool {
return false
}
}
return resp.ArrayValue(h.Message.Values).Equals(
resp.ArrayValue(hook.Message.Values))
}