diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index aebf4370..ca95bf4f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -22,6 +22,7 @@ import ( "github.com/tidwall/tile38/pkg/collection" "github.com/tidwall/tile38/pkg/core" "github.com/tidwall/tile38/pkg/endpoint" + "github.com/tidwall/tile38/pkg/expire" "github.com/tidwall/tile38/pkg/geojson" "github.com/tidwall/tile38/pkg/log" "github.com/tidwall/tile38/pkg/server" @@ -113,6 +114,7 @@ type Controller struct { luapool *lStatePool pubsub *pubsub + hookex expire.List } // ListenAndServe starts a new tile38 server @@ -143,6 +145,12 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http http: http, pubsub: newPubsub(), } + c.hookex.Expired = func(item expire.Item) { + switch v := item.(type) { + case *Hook: + c.possiblyExpireHook(v) + } + } c.epc = endpoint.NewManager(c) c.luascripts = c.NewScriptMap() c.luapool = c.NewPool() diff --git a/pkg/controller/hooks.go b/pkg/controller/hooks.go index 8ab4dced..52febce6 100644 --- a/pkg/controller/hooks.go +++ b/pkg/controller/hooks.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "sort" + "strconv" "strings" "sync" "time" @@ -65,8 +66,8 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) ( var commandvs []resp.Value var cmdlc string var types []string - // var expires float64 - // var expiresSet bool + var expires float64 + var expiresSet bool metaMap := make(map[string]string) for { commandvs = vs @@ -88,18 +89,18 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) ( } metaMap[metakey] = metaval continue - // case "ex": - // var s string - // if vs, s, ok = tokenval(vs); !ok || s == "" { - // return server.NOMessage, d, errInvalidNumberOfArguments - // } - // v, err := strconv.ParseFloat(s, 64) - - // if err != nil { - // return server.NOMessage, d, errInvalidArgument(s) - // } - // expires = v - // expiresSet = true + case "ex": + var s string + if vs, s, ok = tokenval(vs); !ok || s == "" { + return server.NOMessage, d, errInvalidNumberOfArguments + } + v, err := strconv.ParseFloat(s, 64) + if err != nil { + return server.NOMessage, d, errInvalidArgument(s) + } + expires = v + expiresSet = true + continue case "nearby": types = nearbyTypes case "within", "intersects": @@ -116,7 +117,6 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) ( return server.NOMessage, d, errors.New("missing FENCE argument") } s.cmd = cmdlc - cmsg := &server.Message{} *cmsg = *msg cmsg.Values = make([]resp.Value, len(commandvs)) @@ -142,10 +142,10 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) ( channel: chanCmd, cond: sync.NewCond(&sync.Mutex{}), } - // if expiresSet { - // hook.expires = - // time.Now().Add(time.Duration(expires * float64(time.Second))) - // } + if expiresSet { + hook.expires = + time.Now().Add(time.Duration(expires * float64(time.Second))) + } if !chanCmd { hook.db = c.qdb } @@ -154,9 +154,9 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) ( &wr, cmsg, s.key, s.output, s.precision, s.glob, false, s.cursor, s.limit, s.wheres, s.whereins, s.whereevals, s.nofields) if err != nil { + return server.NOMessage, d, err } - if h, ok := c.hooks[name]; ok { if h.channel != chanCmd { return server.NOMessage, d, @@ -166,6 +166,9 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) ( // it was a match so we do nothing. But let's signal just // for good measure. h.Signal() + if !hook.expires.IsZero() { + c.hookex.Push(hook) + } switch msg.OutputType { case server.JSON: return server.OKMessage(msg, start), d, nil @@ -180,6 +183,7 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) ( } delete(c.hooks, h.Name) } + d.updated = true d.timestamp = time.Now() c.hooks[name] = hook @@ -190,6 +194,9 @@ func (c *Controller) cmdSetHook(msg *server.Message, chanCmd bool) ( } hm[name] = hook hook.Open() + if !hook.expires.IsZero() { + c.hookex.Push(hook) + } switch msg.OutputType { case server.JSON: return server.OKMessage(msg, start), d, nil @@ -278,6 +285,34 @@ func (c *Controller) cmdPDelHook(msg *server.Message, channel bool) ( return } +func (c *Controller) possiblyExpireHook(h *Hook) { + c.mu.Lock() + if h, ok := c.hooks[h.Name]; ok { + if !h.expires.IsZero() && time.Now().After(h.expires) { + // purge from database + msg := &server.Message{} + if h.channel { + msg.Values = resp.MultiBulkValue("delchan", h.Name).Array() + msg.Command = "delchan" + } else { + msg.Values = resp.MultiBulkValue("delhook", h.Name).Array() + msg.Command = "delhook" + } + _, d, err := c.cmdDelHook(msg, h.channel) + if err != nil { + c.mu.Unlock() + panic(err) + } + if err := c.writeAOF(resp.ArrayValue(msg.Values), &d); err != nil { + c.mu.Unlock() + panic(err) + } + log.Debugf("purged hook %v", h.Name) + } + } + c.mu.Unlock() +} + func (c *Controller) cmdHooks(msg *server.Message, channel bool) ( res resp.Value, err error, ) { @@ -396,6 +431,11 @@ type Hook struct { expires time.Time } +// Expires returns when the hook expires. Required by the expire.Item interface. +func (h *Hook) Expires() time.Time { + return h.expires +} + // Equals returns true if two hooks are equal func (h *Hook) Equals(hook *Hook) bool { if h.Key != hook.Key || @@ -404,6 +444,9 @@ func (h *Hook) Equals(hook *Hook) bool { len(h.Metas) != len(hook.Metas) { return false } + if !h.expires.Equal(hook.expires) { + return false + } for i, endpoint := range h.Endpoints { if endpoint != hook.Endpoints[i] { return false @@ -415,6 +458,7 @@ func (h *Hook) Equals(hook *Hook) bool { return false } } + return resp.ArrayValue(h.Message.Values).Equals( resp.ArrayValue(hook.Message.Values)) }