From b3d3be7e76d085530dd5fe795af0b09bc693ab92 Mon Sep 17 00:00:00 2001 From: tidwall Date: Mon, 13 Aug 2018 20:27:22 -0700 Subject: [PATCH] objects with roam matches --- pkg/controller/aofshrink.go | 6 ++++ pkg/controller/controller.go | 2 +- pkg/controller/fence.go | 60 ++++++++++++++++++------------------ pkg/controller/hooks.go | 7 +++-- pkg/controller/live.go | 8 ++++- pkg/expire/expire.go | 7 ++--- 6 files changed, 52 insertions(+), 38 deletions(-) diff --git a/pkg/controller/aofshrink.go b/pkg/controller/aofshrink.go index 09e1f13e..cd661c60 100644 --- a/pkg/controller/aofshrink.go +++ b/pkg/controller/aofshrink.go @@ -204,6 +204,12 @@ func (c *Controller) aofshrink() { for _, meta := range hook.Metas { values = append(values, "meta", meta.Name, meta.Value) } + if !hook.expires.IsZero() { + ex := float64(hook.expires.Sub(time.Now())) / + float64(time.Second) + values = append(values, "ex", + strconv.FormatFloat(ex, 'f', 1, 64)) + } for _, value := range hook.Message.Values { values = append(values, value.String()) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ca95bf4f..4bdcc92f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -148,7 +148,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http c.hookex.Expired = func(item expire.Item) { switch v := item.(type) { case *Hook: - c.possiblyExpireHook(v) + c.possiblyExpireHook(v.Name) } } c.epc = endpoint.NewManager(c) diff --git a/pkg/controller/fence.go b/pkg/controller/fence.go index 31ffa247..f659e309 100644 --- a/pkg/controller/fence.go +++ b/pkg/controller/fence.go @@ -79,11 +79,14 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, metas } var roamkeys, roamids []string var roammeters []float64 + var roamobjs []geojson.Object var detect = "outside" if fence != nil { if fence.roam.on { if details.command == "set" { - roamkeys, roamids, roammeters = fenceMatchRoam(sw.c, fence, details.key, details.id, details.obj) + roamkeys, roamids, roammeters, roamobjs = + fenceMatchRoam(sw.c, fence, details.key, + details.id, details.obj) } if len(roamids) == 0 || len(roamids) != len(roamkeys) { return nil @@ -223,46 +226,41 @@ func fenceMatch(hookName string, sw *scanWriter, fence *liveFenceSwitches, metas var nmsgs []string msg := msgs[0][:len(msgs[0])-1] for i, id := range roamids { - var nmsg []byte nmsg = append(nmsg, msg...) nmsg = append(nmsg, `,"nearby":{"key":`...) nmsg = appendJSONString(nmsg, roamkeys[i]) nmsg = append(nmsg, `,"id":`...) nmsg = appendJSONString(nmsg, id) + nmsg = append(nmsg, `,"object":`...) + nmsg = append(nmsg, roamobjs[i].JSON()...) nmsg = append(nmsg, `,"meters":`...) nmsg = append(nmsg, strconv.FormatFloat(roammeters[i], 'f', -1, 64)...) - if fence.roam.scan != "" { nmsg = append(nmsg, `,"scan":[`...) - - func() { - sw.c.mu.Lock() - defer sw.c.mu.Unlock() - col := sw.c.getCol(roamkeys[i]) - if col != nil { - obj, _, ok := col.Get(id) - if ok { - nmsg = append(nmsg, `{"id":`+jsonString(id)+`,"self":true,"object":`+obj.JSON()+`}`...) - } - pattern := id + fence.roam.scan - iterator := func(oid string, o geojson.Object, fields []float64) bool { - if oid == id { - return true - } - if matched, _ := glob.Match(pattern, oid); matched { - nmsg = append(nmsg, `,{"id":`+jsonString(oid)+`,"object":`+o.JSON()+`}`...) - } + col := sw.c.getCol(roamkeys[i]) + if col != nil { + obj, _, ok := col.Get(id) + if ok { + nmsg = append(nmsg, `{"id":`+jsonString(id)+`,"self":true,"object":`+obj.JSON()+`}`...) + } + pattern := id + fence.roam.scan + iterator := func(oid string, o geojson.Object, fields []float64) bool { + if oid == id { return true } - g := glob.Parse(pattern, false) - if g.Limits[0] == "" && g.Limits[1] == "" { - col.Scan(false, iterator) - } else { - col.ScanRange(g.Limits[0], g.Limits[1], false, iterator) + if matched, _ := glob.Match(pattern, oid); matched { + nmsg = append(nmsg, `,{"id":`+jsonString(oid)+`,"object":`+o.JSON()+`}`...) } + return true } - }() + g := glob.Parse(pattern, false) + if g.Limits[0] == "" && g.Limits[1] == "" { + col.Scan(false, iterator) + } else { + col.ScanRange(g.Limits[0], g.Limits[1], false, iterator) + } + } nmsg = append(nmsg, ']') } @@ -325,9 +323,10 @@ func fenceMatchObject(fence *liveFenceSwitches, obj geojson.Object) bool { return false } -func fenceMatchRoam(c *Controller, fence *liveFenceSwitches, tkey, tid string, obj geojson.Object) (keys, ids []string, meterss []float64) { - c.mu.RLock() - defer c.mu.RUnlock() +func fenceMatchRoam( + c *Controller, fence *liveFenceSwitches, + tkey, tid string, obj geojson.Object, +) (keys, ids []string, meterss []float64, objs []geojson.Object) { col := c.getCol(fence.roam.key) if col == nil { return @@ -348,6 +347,7 @@ func fenceMatchRoam(c *Controller, fence *liveFenceSwitches, tkey, tid string, o keys = append(keys, fence.roam.key) ids = append(ids, id) meterss = append(meterss, obj.CalculatedPoint().DistanceTo(p)) + objs = append(objs, obj) } return true }, diff --git a/pkg/controller/hooks.go b/pkg/controller/hooks.go index 52febce6..94e671fa 100644 --- a/pkg/controller/hooks.go +++ b/pkg/controller/hooks.go @@ -285,9 +285,12 @@ func (c *Controller) cmdPDelHook(msg *server.Message, channel bool) ( return } -func (c *Controller) possiblyExpireHook(h *Hook) { +// possiblyExpireHook will evaluate a hook by it's name for expiration and +// purge it from the database if needed. This operation is called from an +// independent goroutine +func (c *Controller) possiblyExpireHook(name string) { c.mu.Lock() - if h, ok := c.hooks[h.Name]; ok { + if h, ok := c.hooks[name]; ok { if !h.expires.IsZero() && time.Now().After(h.expires) { // purge from database msg := &server.Message{} diff --git a/pkg/controller/live.go b/pkg/controller/live.go index 80d08d68..2843b093 100644 --- a/pkg/controller/live.go +++ b/pkg/controller/live.go @@ -178,7 +178,13 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *server.PipelineReade } fence := lb.fence lb.cond.L.Unlock() - msgs := FenceMatch("", sw, fence, nil, details) + var msgs []string + func() { + // safely lock the fence because we are outside the main loop + c.mu.RLock() + defer c.mu.RUnlock() + msgs = FenceMatch("", sw, fence, nil, details) + }() for _, msg := range msgs { if err := writeLiveMessage(conn, []byte(msg), true, connType, websocket); err != nil { return nil // nil return is fine here diff --git a/pkg/expire/expire.go b/pkg/expire/expire.go index ae5577e3..899df507 100644 --- a/pkg/expire/expire.go +++ b/pkg/expire/expire.go @@ -15,7 +15,6 @@ type List struct { mu sync.Mutex queue queue bgrun bool - arr []Item Expired func(item Item) } @@ -32,7 +31,7 @@ func (list *List) Push(item Item) { } func (list *List) bg() { - unix := time.Now().UnixNano() + now := time.Now().UnixNano() for { list.mu.Lock() if list.queue.len == 0 { @@ -40,7 +39,7 @@ func (list *List) bg() { list.mu.Unlock() break } - if unix > list.queue.peek().unix { + if now > list.queue.peek().unix { // now.After(list.queue.peek().unix) n := list.queue.pop() list.mu.Unlock() if list.Expired != nil { @@ -49,7 +48,7 @@ func (list *List) bg() { } else { list.mu.Unlock() time.Sleep(time.Second / 10) - unix = time.Now().UnixNano() + now = time.Now().UnixNano() } } }