
The big change is that the GeoJSON package has been completely rewritten to fix a few of geometry calculation bugs, increase performance, and to better follow the GeoJSON spec RFC 7946. GeoJSON updates - A LineString now requires at least two points. - All json members, even foreign, now persist with the object. - The bbox member persists too but is no longer used for geometry calculations. This is change in behavior. Previously Tile38 would treat the bbox as the object's physical rectangle. - Corrections to geometry intersects and within calculations. Faster spatial queries - The performance of Point-in-polygon and object intersect operations are greatly improved for complex polygons and line strings. It went from O(n) to roughly O(log n). - The same for all collection types with many children, including FeatureCollection, GeometryCollection, MultiPoint, MultiLineString, and MultiPolygon. Codebase changes - The pkg directory has been renamed to internal - The GeoJSON internal package has been moved to a seperate repo at https://github.com/tidwall/geojson. It's now vendored. Please look out for higher memory usage for datasets using complex shapes. A complex shape is one that has 64 or more points. For these shapes it's expected that there will be increase of least 54 bytes per point.
1496 lines
34 KiB
Go
1496 lines
34 KiB
Go
// Copyright 2013-2018 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"fmt"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/go-nats"
|
|
)
|
|
|
|
// More advanced tests on subscriptions
|
|
|
|
func TestServerAutoUnsub(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
received := int32(0)
|
|
max := int32(10)
|
|
|
|
// Call this to make sure that we have everything setup connection wise
|
|
nc.Flush()
|
|
|
|
base := runtime.NumGoroutine()
|
|
|
|
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
atomic.AddInt32(&received, 1)
|
|
})
|
|
if err != nil {
|
|
t.Fatal("Failed to subscribe: ", err)
|
|
}
|
|
sub.AutoUnsubscribe(int(max))
|
|
total := 100
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
nc.Flush()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if atomic.LoadInt32(&received) != max {
|
|
t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
|
|
}
|
|
if sub.IsValid() {
|
|
t.Fatal("Expected subscription to be invalid after hitting max")
|
|
}
|
|
if err := sub.AutoUnsubscribe(10); err == nil {
|
|
t.Fatal("Calling AutoUnsubscribe() on closed subscription should fail")
|
|
}
|
|
waitFor(t, 2*time.Second, 100*time.Millisecond, func() error {
|
|
delta := (runtime.NumGoroutine() - base)
|
|
if delta > 0 {
|
|
return fmt.Errorf("%d Go routines still exist post max subscriptions hit", delta)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestClientSyncAutoUnsub(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
received := 0
|
|
max := 10
|
|
sub, _ := nc.SubscribeSync("foo")
|
|
sub.AutoUnsubscribe(max)
|
|
total := 100
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
nc.Flush()
|
|
for {
|
|
_, err := sub.NextMsg(10 * time.Millisecond)
|
|
if err != nil {
|
|
if err != nats.ErrMaxMessages {
|
|
t.Fatalf("Expected '%v', but got: '%v'\n", nats.ErrBadSubscription, err.Error())
|
|
}
|
|
break
|
|
}
|
|
received++
|
|
}
|
|
if received != max {
|
|
t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
|
|
}
|
|
if sub.IsValid() {
|
|
t.Fatal("Expected subscription to be invalid after hitting max")
|
|
}
|
|
if err := sub.AutoUnsubscribe(10); err == nil {
|
|
t.Fatal("Calling AutoUnsubscribe() ob closed subscription should fail")
|
|
}
|
|
}
|
|
|
|
func TestClientASyncAutoUnsub(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
received := int32(0)
|
|
max := int32(10)
|
|
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
atomic.AddInt32(&received, 1)
|
|
})
|
|
if err != nil {
|
|
t.Fatal("Failed to subscribe: ", err)
|
|
}
|
|
sub.AutoUnsubscribe(int(max))
|
|
total := 100
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
nc.Flush()
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
if atomic.LoadInt32(&received) != max {
|
|
t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
|
|
}
|
|
if err := sub.AutoUnsubscribe(10); err == nil {
|
|
t.Fatal("Calling AutoUnsubscribe() on closed subscription should fail")
|
|
}
|
|
}
|
|
|
|
func TestAutoUnsubAndReconnect(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
rch := make(chan bool)
|
|
|
|
nc, err := nats.Connect(nats.DefaultURL,
|
|
nats.ReconnectWait(50*time.Millisecond),
|
|
nats.ReconnectHandler(func(_ *nats.Conn) { rch <- true }))
|
|
if err != nil {
|
|
t.Fatalf("Unable to connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
received := int32(0)
|
|
max := int32(10)
|
|
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
atomic.AddInt32(&received, 1)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Failed to subscribe: %v", err)
|
|
}
|
|
sub.AutoUnsubscribe(int(max))
|
|
|
|
// Send less than the max
|
|
total := int(max / 2)
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
nc.Flush()
|
|
|
|
// Restart the server
|
|
s.Shutdown()
|
|
s = RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
// and wait to reconnect
|
|
if err := Wait(rch); err != nil {
|
|
t.Fatal("Failed to get the reconnect cb")
|
|
}
|
|
|
|
// Now send more than the total max.
|
|
total = int(3 * max)
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
nc.Flush()
|
|
|
|
// Wait a bit before checking.
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
// We should have received only up-to-max messages.
|
|
if atomic.LoadInt32(&received) != max {
|
|
t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
|
|
}
|
|
}
|
|
|
|
func TestAutoUnsubWithParallelNextMsgCalls(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
rch := make(chan bool, 1)
|
|
|
|
nc, err := nats.Connect(nats.DefaultURL,
|
|
nats.ReconnectWait(50*time.Millisecond),
|
|
nats.ReconnectHandler(func(_ *nats.Conn) { rch <- true }))
|
|
if err != nil {
|
|
t.Fatalf("Unable to connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
numRoutines := 3
|
|
max := 100
|
|
total := max * 2
|
|
received := int64(0)
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
sub, err := nc.SubscribeSync("foo")
|
|
if err != nil {
|
|
t.Fatalf("Failed to subscribe: %v", err)
|
|
}
|
|
sub.AutoUnsubscribe(int(max))
|
|
nc.Flush()
|
|
|
|
wg.Add(numRoutines)
|
|
|
|
for i := 0; i < numRoutines; i++ {
|
|
go func(s *nats.Subscription, idx int) {
|
|
for {
|
|
// The first to reach the max delivered will cause the
|
|
// subscription to be removed, which will kick out all
|
|
// other calls to NextMsg. So don't be afraid of the long
|
|
// timeout.
|
|
_, err := s.NextMsg(3 * time.Second)
|
|
if err != nil {
|
|
break
|
|
}
|
|
atomic.AddInt64(&received, 1)
|
|
}
|
|
wg.Done()
|
|
}(sub, i)
|
|
}
|
|
|
|
msg := []byte("Hello")
|
|
for i := 0; i < max/2; i++ {
|
|
nc.Publish("foo", msg)
|
|
}
|
|
nc.Flush()
|
|
|
|
s.Shutdown()
|
|
s = RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
// Make sure we got the reconnected cb
|
|
if err := Wait(rch); err != nil {
|
|
t.Fatal("Failed to get reconnected cb")
|
|
}
|
|
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", msg)
|
|
}
|
|
nc.Flush()
|
|
|
|
wg.Wait()
|
|
if atomic.LoadInt64(&received) != int64(max) {
|
|
t.Fatalf("Wrong number of received msg: %v instead of %v", atomic.LoadInt64(&received), max)
|
|
}
|
|
}
|
|
|
|
func TestAutoUnsubscribeFromCallback(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc, err := nats.Connect(nats.DefaultURL)
|
|
if err != nil {
|
|
t.Fatalf("Unable to connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
max := 10
|
|
resetUnsubMark := int64(max / 2)
|
|
limit := int64(100)
|
|
received := int64(0)
|
|
|
|
msg := []byte("Hello")
|
|
|
|
// Auto-unsubscribe within the callback with a value lower
|
|
// than what was already received.
|
|
|
|
sub, err := nc.Subscribe("foo", func(m *nats.Msg) {
|
|
r := atomic.AddInt64(&received, 1)
|
|
if r == resetUnsubMark {
|
|
m.Sub.AutoUnsubscribe(int(r - 1))
|
|
nc.Flush()
|
|
}
|
|
if r == limit {
|
|
// Something went wrong... fail now
|
|
t.Fatal("Got more messages than expected")
|
|
}
|
|
nc.Publish("foo", msg)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Failed to subscribe: %v", err)
|
|
}
|
|
sub.AutoUnsubscribe(int(max))
|
|
nc.Flush()
|
|
|
|
// Trigger the first message, the other are sent from the callback.
|
|
nc.Publish("foo", msg)
|
|
nc.Flush()
|
|
|
|
waitFor(t, time.Second, 100*time.Millisecond, func() error {
|
|
recv := atomic.LoadInt64(&received)
|
|
if recv != resetUnsubMark {
|
|
return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
|
|
max, resetUnsubMark, recv)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Now check with AutoUnsubscribe with higher value than original
|
|
received = int64(0)
|
|
newMax := int64(2 * max)
|
|
|
|
sub, err = nc.Subscribe("foo", func(m *nats.Msg) {
|
|
r := atomic.AddInt64(&received, 1)
|
|
if r == resetUnsubMark {
|
|
m.Sub.AutoUnsubscribe(int(newMax))
|
|
nc.Flush()
|
|
}
|
|
if r == limit {
|
|
// Something went wrong... fail now
|
|
t.Fatal("Got more messages than expected")
|
|
}
|
|
nc.Publish("foo", msg)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Failed to subscribe: %v", err)
|
|
}
|
|
sub.AutoUnsubscribe(int(max))
|
|
nc.Flush()
|
|
|
|
// Trigger the first message, the other are sent from the callback.
|
|
nc.Publish("foo", msg)
|
|
nc.Flush()
|
|
|
|
waitFor(t, time.Second, 100*time.Millisecond, func() error {
|
|
recv := atomic.LoadInt64(&received)
|
|
if recv != newMax {
|
|
return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
|
|
max, newMax, recv)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func TestCloseSubRelease(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.SubscribeSync("foo")
|
|
start := time.Now()
|
|
go func() {
|
|
time.Sleep(5 * time.Millisecond)
|
|
nc.Close()
|
|
}()
|
|
_, err := sub.NextMsg(50 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("Expected an error from NextMsg")
|
|
}
|
|
elapsed := time.Since(start)
|
|
|
|
// On Windows, the minimum waitTime is at least 15ms.
|
|
if elapsed > 20*time.Millisecond {
|
|
t.Fatalf("Too much time has elapsed to release NextMsg: %dms",
|
|
(elapsed / time.Millisecond))
|
|
}
|
|
}
|
|
|
|
func TestIsValidSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
sub, err := nc.SubscribeSync("foo")
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if !sub.IsValid() {
|
|
t.Fatalf("Subscription should be valid")
|
|
}
|
|
for i := 0; i < 10; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
nc.Flush()
|
|
_, err = sub.NextMsg(200 * time.Millisecond)
|
|
if err != nil {
|
|
t.Fatalf("NextMsg returned an error")
|
|
}
|
|
sub.Unsubscribe()
|
|
_, err = sub.NextMsg(200 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("NextMsg should have returned an error")
|
|
}
|
|
}
|
|
|
|
func TestSlowSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.SubscribeSync("foo")
|
|
sub.SetPendingLimits(100, 1024)
|
|
|
|
for i := 0; i < 200; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
timeout := 5 * time.Second
|
|
start := time.Now()
|
|
nc.FlushTimeout(timeout)
|
|
elapsed := time.Since(start)
|
|
if elapsed >= timeout {
|
|
t.Fatalf("Flush did not return before timeout: %d > %d", elapsed, timeout)
|
|
}
|
|
// Make sure NextMsg returns an error to indicate slow consumer
|
|
_, err := sub.NextMsg(200 * time.Millisecond)
|
|
if err == nil {
|
|
t.Fatalf("NextMsg did not return an error")
|
|
}
|
|
}
|
|
|
|
func TestSlowChanSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
ch := make(chan *nats.Msg, 64)
|
|
sub, _ := nc.ChanSubscribe("foo", ch)
|
|
sub.SetPendingLimits(100, 1024)
|
|
|
|
for i := 0; i < 200; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
timeout := 5 * time.Second
|
|
start := time.Now()
|
|
nc.FlushTimeout(timeout)
|
|
elapsed := time.Since(start)
|
|
if elapsed >= timeout {
|
|
t.Fatalf("Flush did not return before timeout: %d > %d", elapsed, timeout)
|
|
}
|
|
}
|
|
|
|
func TestSlowAsyncSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
bch := make(chan bool)
|
|
|
|
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
// block to back us up..
|
|
<-bch
|
|
})
|
|
// Make sure these are the defaults
|
|
pm, pb, _ := sub.PendingLimits()
|
|
if pm != nats.DefaultSubPendingMsgsLimit {
|
|
t.Fatalf("Pending limit for number of msgs incorrect, expected %d, got %d\n", nats.DefaultSubPendingMsgsLimit, pm)
|
|
}
|
|
if pb != nats.DefaultSubPendingBytesLimit {
|
|
t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", nats.DefaultSubPendingBytesLimit, pb)
|
|
}
|
|
|
|
// Set new limits
|
|
pml := 100
|
|
pbl := 1024 * 1024
|
|
|
|
sub.SetPendingLimits(pml, pbl)
|
|
|
|
// Make sure the set is correct
|
|
pm, pb, _ = sub.PendingLimits()
|
|
if pm != pml {
|
|
t.Fatalf("Pending limit for number of msgs incorrect, expected %d, got %d\n", pml, pm)
|
|
}
|
|
if pb != pbl {
|
|
t.Fatalf("Pending limit for number of bytes incorrect, expected %d, got %d\n", pbl, pb)
|
|
}
|
|
|
|
for i := 0; i < (int(pml) + 100); i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
|
|
timeout := 5 * time.Second
|
|
start := time.Now()
|
|
err := nc.FlushTimeout(timeout)
|
|
elapsed := time.Since(start)
|
|
if elapsed >= timeout {
|
|
t.Fatalf("Flush did not return before timeout")
|
|
}
|
|
// We want flush to work, so expect no error for it.
|
|
if err != nil {
|
|
t.Fatalf("Expected no error from Flush()\n")
|
|
}
|
|
if nc.LastError() != nats.ErrSlowConsumer {
|
|
t.Fatal("Expected LastError to indicate slow consumer")
|
|
}
|
|
// release the sub
|
|
bch <- true
|
|
}
|
|
|
|
func TestAsyncErrHandler(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := nats.GetDefaultOptions()
|
|
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Could not connect to server: %v\n", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
subj := "async_test"
|
|
bch := make(chan bool)
|
|
|
|
sub, err := nc.Subscribe(subj, func(_ *nats.Msg) {
|
|
// block to back us up..
|
|
<-bch
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Could not subscribe: %v\n", err)
|
|
}
|
|
|
|
limit := 10
|
|
toSend := 100
|
|
|
|
// Limit internal subchan length to trip condition easier.
|
|
sub.SetPendingLimits(limit, 1024)
|
|
|
|
ch := make(chan bool)
|
|
|
|
aeCalled := int64(0)
|
|
|
|
nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) {
|
|
atomic.AddInt64(&aeCalled, 1)
|
|
|
|
if s != sub {
|
|
t.Fatal("Did not receive proper subscription")
|
|
}
|
|
if e != nats.ErrSlowConsumer {
|
|
t.Fatalf("Did not receive proper error: %v vs %v", e, nats.ErrSlowConsumer)
|
|
}
|
|
// Suppress additional calls
|
|
if atomic.LoadInt64(&aeCalled) == 1 {
|
|
// release the sub
|
|
defer close(bch)
|
|
// release the test
|
|
ch <- true
|
|
}
|
|
})
|
|
|
|
b := []byte("Hello World!")
|
|
// First one trips the ch wait in subscription callback.
|
|
nc.Publish(subj, b)
|
|
nc.Flush()
|
|
for i := 0; i < toSend; i++ {
|
|
nc.Publish(subj, b)
|
|
}
|
|
if err := nc.Flush(); err != nil {
|
|
t.Fatalf("Got an error on Flush:%v", err)
|
|
}
|
|
|
|
if e := Wait(ch); e != nil {
|
|
t.Fatal("Failed to call async err handler")
|
|
}
|
|
// Make sure dropped stats is correct.
|
|
if d, _ := sub.Dropped(); d != toSend-limit+1 {
|
|
t.Fatalf("Expected Dropped to be %d, got %d", toSend-limit+1, d)
|
|
}
|
|
if ae := atomic.LoadInt64(&aeCalled); ae != 1 {
|
|
t.Fatalf("Expected err handler to be called only once, got %d", ae)
|
|
}
|
|
|
|
sub.Unsubscribe()
|
|
if _, err := sub.Dropped(); err == nil {
|
|
t.Fatal("Calling Dropped() on closed subscription should fail")
|
|
}
|
|
}
|
|
|
|
func TestAsyncErrHandlerChanSubscription(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := nats.GetDefaultOptions()
|
|
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Could not connect to server: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
subj := "chan_test"
|
|
|
|
limit := 10
|
|
toSend := 100
|
|
|
|
// Create our own channel.
|
|
mch := make(chan *nats.Msg, limit)
|
|
sub, err := nc.ChanSubscribe(subj, mch)
|
|
if err != nil {
|
|
t.Fatalf("Could not subscribe: %v", err)
|
|
}
|
|
ch := make(chan bool)
|
|
aeCalled := int64(0)
|
|
|
|
nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) {
|
|
atomic.AddInt64(&aeCalled, 1)
|
|
if e != nats.ErrSlowConsumer {
|
|
t.Fatalf("Did not receive proper error: %v vs %v",
|
|
e, nats.ErrSlowConsumer)
|
|
}
|
|
// Suppress additional calls
|
|
if atomic.LoadInt64(&aeCalled) == 1 {
|
|
// release the test
|
|
ch <- true
|
|
}
|
|
})
|
|
|
|
b := []byte("Hello World!")
|
|
for i := 0; i < toSend; i++ {
|
|
nc.Publish(subj, b)
|
|
}
|
|
nc.Flush()
|
|
|
|
if e := Wait(ch); e != nil {
|
|
t.Fatal("Failed to call async err handler")
|
|
}
|
|
// Make sure dropped stats is correct.
|
|
if d, _ := sub.Dropped(); d != toSend-limit {
|
|
t.Fatalf("Expected Dropped to be %d, go %d", toSend-limit, d)
|
|
}
|
|
if ae := atomic.LoadInt64(&aeCalled); ae != 1 {
|
|
t.Fatalf("Expected err handler to be called once, got %d", ae)
|
|
}
|
|
|
|
sub.Unsubscribe()
|
|
if _, err := sub.Dropped(); err == nil {
|
|
t.Fatal("Calling Dropped() on closed subscription should fail")
|
|
}
|
|
}
|
|
|
|
// Test to make sure that we can send and async receive messages on
|
|
// different subjects within a callback.
|
|
func TestAsyncSubscriberStarvation(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Helper
|
|
nc.Subscribe("helper", func(m *nats.Msg) {
|
|
nc.Publish(m.Reply, []byte("Hello"))
|
|
})
|
|
|
|
ch := make(chan bool)
|
|
|
|
// Kickoff
|
|
nc.Subscribe("start", func(m *nats.Msg) {
|
|
// Helper Response
|
|
response := nats.NewInbox()
|
|
nc.Subscribe(response, func(_ *nats.Msg) {
|
|
ch <- true
|
|
})
|
|
nc.PublishRequest("helper", response, []byte("Help Me!"))
|
|
})
|
|
|
|
nc.Publish("start", []byte("Begin"))
|
|
nc.Flush()
|
|
|
|
if e := Wait(ch); e != nil {
|
|
t.Fatal("Was stalled inside of callback waiting on another callback")
|
|
}
|
|
}
|
|
|
|
func TestAsyncSubscribersOnClose(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
toSend := 10
|
|
callbacks := int32(0)
|
|
ch := make(chan bool, toSend)
|
|
|
|
nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
atomic.AddInt32(&callbacks, 1)
|
|
<-ch
|
|
})
|
|
|
|
for i := 0; i < toSend; i++ {
|
|
nc.Publish("foo", []byte("Hello World!"))
|
|
}
|
|
nc.Flush()
|
|
time.Sleep(10 * time.Millisecond)
|
|
nc.Close()
|
|
|
|
// Release callbacks
|
|
for i := 1; i < toSend; i++ {
|
|
ch <- true
|
|
}
|
|
|
|
// Wait for some time.
|
|
time.Sleep(10 * time.Millisecond)
|
|
seen := atomic.LoadInt32(&callbacks)
|
|
if seen != 1 {
|
|
t.Fatalf("Expected only one callback, received %d callbacks", seen)
|
|
}
|
|
}
|
|
|
|
func TestNextMsgCallOnAsyncSub(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
})
|
|
if err != nil {
|
|
t.Fatal("Failed to subscribe: ", err)
|
|
}
|
|
_, err = sub.NextMsg(time.Second)
|
|
if err == nil {
|
|
t.Fatal("Expected an error call NextMsg() on AsyncSubscriber")
|
|
}
|
|
}
|
|
|
|
func TestNextMsgCallOnClosedSub(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
sub, err := nc.SubscribeSync("foo")
|
|
if err != nil {
|
|
t.Fatal("Failed to subscribe: ", err)
|
|
}
|
|
|
|
if err = sub.Unsubscribe(); err != nil {
|
|
t.Fatal("Unsubscribe failed with err:", err)
|
|
}
|
|
|
|
_, err = sub.NextMsg(time.Second)
|
|
if err == nil {
|
|
t.Fatal("Expected an error calling NextMsg() on closed subscription")
|
|
} else if err != nats.ErrBadSubscription {
|
|
t.Fatalf("Expected '%v', but got: '%v'", nats.ErrBadSubscription, err.Error())
|
|
}
|
|
}
|
|
|
|
func TestChanSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Create our own channel.
|
|
ch := make(chan *nats.Msg, 128)
|
|
|
|
// Channel is mandatory
|
|
if _, err := nc.ChanSubscribe("foo", nil); err == nil {
|
|
t.Fatal("Creating subscription without channel should have failed")
|
|
}
|
|
|
|
_, err := nc.ChanSubscribe("foo", ch)
|
|
if err != nil {
|
|
t.Fatal("Failed to subscribe: ", err)
|
|
}
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
|
|
received := 0
|
|
tm := time.NewTimer(5 * time.Second)
|
|
defer tm.Stop()
|
|
|
|
// Go ahead and receive
|
|
for {
|
|
select {
|
|
case _, ok := <-ch:
|
|
if !ok {
|
|
t.Fatalf("Got an error reading from channel")
|
|
}
|
|
case <-tm.C:
|
|
t.Fatalf("Timed out waiting on messages")
|
|
}
|
|
received++
|
|
if received >= total {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestChanQueueSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Create our own channel.
|
|
ch1 := make(chan *nats.Msg, 64)
|
|
ch2 := make(chan *nats.Msg, 64)
|
|
|
|
nc.ChanQueueSubscribe("foo", "bar", ch1)
|
|
nc.ChanQueueSubscribe("foo", "bar", ch2)
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
|
|
received := 0
|
|
tm := time.NewTimer(5 * time.Second)
|
|
defer tm.Stop()
|
|
|
|
chk := func(ok bool) {
|
|
if !ok {
|
|
t.Fatalf("Got an error reading from channel")
|
|
} else {
|
|
received++
|
|
}
|
|
}
|
|
|
|
// Go ahead and receive
|
|
for {
|
|
select {
|
|
case _, ok := <-ch1:
|
|
chk(ok)
|
|
case _, ok := <-ch2:
|
|
chk(ok)
|
|
case <-tm.C:
|
|
t.Fatalf("Timed out waiting on messages")
|
|
}
|
|
if received >= total {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestChanSubscriberPendingLimits(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
ncp := NewDefaultConnection(t)
|
|
defer ncp.Close()
|
|
|
|
// There was a defect that prevented to receive more than
|
|
// the default pending message limit. Trying to send more
|
|
// than this limit.
|
|
total := nats.DefaultSubPendingMsgsLimit + 100
|
|
|
|
for typeSubs := 0; typeSubs < 3; typeSubs++ {
|
|
|
|
func() {
|
|
// Create our own channel.
|
|
ch := make(chan *nats.Msg, total)
|
|
|
|
var err error
|
|
var sub *nats.Subscription
|
|
switch typeSubs {
|
|
case 0:
|
|
sub, err = nc.ChanSubscribe("foo", ch)
|
|
case 1:
|
|
sub, err = nc.ChanQueueSubscribe("foo", "bar", ch)
|
|
case 2:
|
|
sub, err = nc.QueueSubscribeSyncWithChan("foo", "bar", ch)
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on subscribe: %v", err)
|
|
}
|
|
defer sub.Unsubscribe()
|
|
nc.Flush()
|
|
|
|
// Send some messages
|
|
for i := 0; i < total; i++ {
|
|
if err := ncp.Publish("foo", []byte("Hello")); err != nil {
|
|
t.Fatalf("Unexpected error on publish: %v", err)
|
|
}
|
|
}
|
|
|
|
received := 0
|
|
tm := time.NewTimer(10 * time.Second)
|
|
defer tm.Stop()
|
|
|
|
chk := func(ok bool) {
|
|
if !ok {
|
|
t.Fatalf("Got an error reading from channel")
|
|
} else {
|
|
received++
|
|
}
|
|
}
|
|
|
|
// Go ahead and receive
|
|
for {
|
|
select {
|
|
case _, ok := <-ch:
|
|
chk(ok)
|
|
if received >= total {
|
|
return
|
|
}
|
|
case <-tm.C:
|
|
t.Fatalf("Timed out waiting on messages for test %d, received %d", typeSubs, received)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func TestQueueChanQueueSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Create our own channel.
|
|
ch1 := make(chan *nats.Msg, 64)
|
|
ch2 := make(chan *nats.Msg, 64)
|
|
|
|
nc.QueueSubscribeSyncWithChan("foo", "bar", ch1)
|
|
nc.QueueSubscribeSyncWithChan("foo", "bar", ch2)
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
|
|
recv1 := 0
|
|
recv2 := 0
|
|
tm := time.NewTimer(5 * time.Second)
|
|
defer tm.Stop()
|
|
runTimer := time.NewTimer(500 * time.Millisecond)
|
|
defer runTimer.Stop()
|
|
|
|
chk := func(ok bool, which int) {
|
|
if !ok {
|
|
t.Fatalf("Got an error reading from channel")
|
|
} else {
|
|
if which == 1 {
|
|
recv1++
|
|
} else {
|
|
recv2++
|
|
}
|
|
}
|
|
}
|
|
|
|
// Go ahead and receive
|
|
recvLoop:
|
|
for {
|
|
select {
|
|
case _, ok := <-ch1:
|
|
chk(ok, 1)
|
|
case _, ok := <-ch2:
|
|
chk(ok, 2)
|
|
case <-tm.C:
|
|
t.Fatalf("Timed out waiting on messages")
|
|
case <-runTimer.C:
|
|
break recvLoop
|
|
}
|
|
}
|
|
|
|
if recv1+recv2 > total {
|
|
t.Fatalf("Received more messages than expected: %v vs %v", (recv1 + recv2), total)
|
|
}
|
|
}
|
|
|
|
func TestUnsubscribeChanOnSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Create our own channel.
|
|
ch := make(chan *nats.Msg, 8)
|
|
sub, _ := nc.ChanSubscribe("foo", ch)
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
|
|
sub.Unsubscribe()
|
|
for len(ch) > 0 {
|
|
<-ch
|
|
}
|
|
// Make sure we can send to the channel still.
|
|
// Test that we do not close it.
|
|
ch <- &nats.Msg{}
|
|
}
|
|
|
|
func TestCloseChanOnSubscriber(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Create our own channel.
|
|
ch := make(chan *nats.Msg, 8)
|
|
nc.ChanSubscribe("foo", ch)
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", []byte("Hello"))
|
|
}
|
|
|
|
nc.Close()
|
|
for len(ch) > 0 {
|
|
<-ch
|
|
}
|
|
// Make sure we can send to the channel still.
|
|
// Test that we do not close it.
|
|
ch <- &nats.Msg{}
|
|
}
|
|
|
|
func TestAsyncSubscriptionPending(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
msg := []byte("0123456789")
|
|
|
|
inCb := make(chan bool)
|
|
block := make(chan bool)
|
|
|
|
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
inCb <- true
|
|
<-block
|
|
})
|
|
defer sub.Unsubscribe()
|
|
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", msg)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Wait that a message is received, so checks are safe
|
|
if err := Wait(inCb); err != nil {
|
|
t.Fatal("No message received")
|
|
}
|
|
|
|
// Test old way
|
|
q, _ := sub.QueuedMsgs()
|
|
if q != total && q != total-1 {
|
|
t.Fatalf("Expected %d or %d, got %d", total, total-1, q)
|
|
}
|
|
|
|
// New way, make sure the same and check bytes.
|
|
m, b, _ := sub.Pending()
|
|
mlen := len(msg)
|
|
totalSize := total * mlen
|
|
|
|
if m != total && m != total-1 {
|
|
t.Fatalf("Expected msgs of %d or %d, got %d", total, total-1, m)
|
|
}
|
|
if b != totalSize && b != totalSize-mlen {
|
|
t.Fatalf("Expected bytes of %d or %d, got %d",
|
|
totalSize, totalSize-mlen, b)
|
|
}
|
|
|
|
// Make sure max has been set. Since we block after the first message is
|
|
// received, MaxPending should be >= total - 1 and <= total
|
|
mm, bm, _ := sub.MaxPending()
|
|
if mm < total-1 || mm > total {
|
|
t.Fatalf("Expected max msgs (%d) to be between %d and %d",
|
|
mm, total-1, total)
|
|
}
|
|
if bm < totalSize-mlen || bm > totalSize {
|
|
t.Fatalf("Expected max bytes (%d) to be between %d and %d",
|
|
bm, totalSize, totalSize-mlen)
|
|
}
|
|
// Check that clear works.
|
|
sub.ClearMaxPending()
|
|
mm, bm, _ = sub.MaxPending()
|
|
if mm != 0 {
|
|
t.Fatalf("Expected max msgs to be 0 vs %d after clearing", mm)
|
|
}
|
|
if bm != 0 {
|
|
t.Fatalf("Expected max bytes to be 0 vs %d after clearing", bm)
|
|
}
|
|
|
|
close(block)
|
|
sub.Unsubscribe()
|
|
|
|
// These calls should fail once the subscription is closed.
|
|
if _, _, err := sub.Pending(); err == nil {
|
|
t.Fatal("Calling Pending() on closed subscription should fail")
|
|
}
|
|
if _, _, err := sub.MaxPending(); err == nil {
|
|
t.Fatal("Calling MaxPending() on closed subscription should fail")
|
|
}
|
|
if err := sub.ClearMaxPending(); err == nil {
|
|
t.Fatal("Calling ClearMaxPending() on closed subscription should fail")
|
|
}
|
|
}
|
|
|
|
func TestAsyncSubscriptionPendingDrain(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
msg := []byte("0123456789")
|
|
|
|
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {})
|
|
defer sub.Unsubscribe()
|
|
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", msg)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Wait for all delivered.
|
|
for d, _ := sub.Delivered(); d != int64(total); d, _ = sub.Delivered() {
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
m, b, _ := sub.Pending()
|
|
if m != 0 {
|
|
t.Fatalf("Expected msgs of 0, got %d", m)
|
|
}
|
|
if b != 0 {
|
|
t.Fatalf("Expected bytes of 0, got %d", b)
|
|
}
|
|
|
|
sub.Unsubscribe()
|
|
if _, err := sub.Delivered(); err == nil {
|
|
t.Fatal("Calling Delivered() on closed subscription should fail")
|
|
}
|
|
}
|
|
|
|
func TestSyncSubscriptionPendingDrain(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
msg := []byte("0123456789")
|
|
|
|
sub, _ := nc.SubscribeSync("foo")
|
|
defer sub.Unsubscribe()
|
|
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", msg)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Wait for all delivered.
|
|
for d, _ := sub.Delivered(); d != int64(total); d, _ = sub.Delivered() {
|
|
sub.NextMsg(10 * time.Millisecond)
|
|
}
|
|
|
|
m, b, _ := sub.Pending()
|
|
if m != 0 {
|
|
t.Fatalf("Expected msgs of 0, got %d", m)
|
|
}
|
|
if b != 0 {
|
|
t.Fatalf("Expected bytes of 0, got %d", b)
|
|
}
|
|
|
|
sub.Unsubscribe()
|
|
if _, err := sub.Delivered(); err == nil {
|
|
t.Fatal("Calling Delivered() on closed subscription should fail")
|
|
}
|
|
}
|
|
|
|
func TestSyncSubscriptionPending(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.SubscribeSync("foo")
|
|
defer sub.Unsubscribe()
|
|
|
|
// Send some messages to ourselves.
|
|
total := 100
|
|
msg := []byte("0123456789")
|
|
for i := 0; i < total; i++ {
|
|
nc.Publish("foo", msg)
|
|
}
|
|
nc.Flush()
|
|
|
|
// Test old way
|
|
q, _ := sub.QueuedMsgs()
|
|
if q != total && q != total-1 {
|
|
t.Fatalf("Expected %d or %d, got %d", total, total-1, q)
|
|
}
|
|
|
|
// New way, make sure the same and check bytes.
|
|
m, b, _ := sub.Pending()
|
|
mlen := len(msg)
|
|
|
|
if m != total {
|
|
t.Fatalf("Expected msgs of %d, got %d", total, m)
|
|
}
|
|
if b != total*mlen {
|
|
t.Fatalf("Expected bytes of %d, got %d", total*mlen, b)
|
|
}
|
|
|
|
// Now drain some down and make sure pending is correct
|
|
for i := 0; i < total-1; i++ {
|
|
sub.NextMsg(10 * time.Millisecond)
|
|
}
|
|
m, b, _ = sub.Pending()
|
|
if m != 1 {
|
|
t.Fatalf("Expected msgs of 1, got %d", m)
|
|
}
|
|
if b != mlen {
|
|
t.Fatalf("Expected bytes of %d, got %d", mlen, b)
|
|
}
|
|
}
|
|
|
|
func TestSetPendingLimits(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
payload := []byte("hello")
|
|
payloadLen := len(payload)
|
|
toSend := 100
|
|
|
|
var sub *nats.Subscription
|
|
|
|
// Check for invalid values
|
|
invalid := func() error {
|
|
if err := sub.SetPendingLimits(0, 1); err == nil {
|
|
return fmt.Errorf("Setting limit with 0 should fail")
|
|
}
|
|
if err := sub.SetPendingLimits(1, 0); err == nil {
|
|
return fmt.Errorf("Setting limit with 0 should fail")
|
|
}
|
|
return nil
|
|
}
|
|
// function to send messages
|
|
send := func(subject string, count int) {
|
|
for i := 0; i < count; i++ {
|
|
if err := nc.Publish(subject, payload); err != nil {
|
|
t.Fatalf("Unexpected error on publish: %v", err)
|
|
}
|
|
}
|
|
nc.Flush()
|
|
}
|
|
|
|
// Check pending vs expected values
|
|
var limitCount, limitBytes int
|
|
var expectedCount, expectedBytes int
|
|
checkPending := func() error {
|
|
lc, lb, err := sub.PendingLimits()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if lc != limitCount || lb != limitBytes {
|
|
return fmt.Errorf("Unexpected limits, expected %v msgs %v bytes, got %v msgs %v bytes",
|
|
limitCount, limitBytes, lc, lb)
|
|
}
|
|
msgs, bytes, err := sub.Pending()
|
|
if err != nil {
|
|
return fmt.Errorf("Unexpected error getting pending counts: %v", err)
|
|
}
|
|
if (msgs != expectedCount && msgs != expectedCount-1) ||
|
|
(bytes != expectedBytes && bytes != expectedBytes-payloadLen) {
|
|
return fmt.Errorf("Unexpected counts, expected %v msgs %v bytes, got %v msgs %v bytes",
|
|
expectedCount, expectedBytes, msgs, bytes)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
recv := make(chan bool)
|
|
block := make(chan bool)
|
|
cb := func(m *nats.Msg) {
|
|
recv <- true
|
|
<-block
|
|
m.Sub.Unsubscribe()
|
|
}
|
|
subj := "foo"
|
|
sub, err := nc.Subscribe(subj, cb)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on subscribe: %v", err)
|
|
}
|
|
defer sub.Unsubscribe()
|
|
if err := invalid(); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
// Check we apply limit only for size
|
|
limitCount = -1
|
|
limitBytes = (toSend / 2) * payloadLen
|
|
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
|
|
t.Fatalf("Unexpected error setting limits: %v", err)
|
|
}
|
|
// Send messages
|
|
send(subj, toSend)
|
|
// Wait for message to be received
|
|
if err := Wait(recv); err != nil {
|
|
t.Fatal("Did not get our message")
|
|
}
|
|
expectedBytes = limitBytes
|
|
expectedCount = limitBytes / payloadLen
|
|
if err := checkPending(); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
// Release callback
|
|
block <- true
|
|
|
|
subj = "bar"
|
|
sub, err = nc.Subscribe(subj, cb)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on subscribe: %v", err)
|
|
}
|
|
defer sub.Unsubscribe()
|
|
// Check we apply limit only for count
|
|
limitCount = toSend / 4
|
|
limitBytes = -1
|
|
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
|
|
t.Fatalf("Unexpected error setting limits: %v", err)
|
|
}
|
|
// Send messages
|
|
send(subj, toSend)
|
|
// Wait for message to be received
|
|
if err := Wait(recv); err != nil {
|
|
t.Fatal("Did not get our message")
|
|
}
|
|
expectedCount = limitCount
|
|
expectedBytes = limitCount * payloadLen
|
|
if err := checkPending(); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
// Release callback
|
|
block <- true
|
|
|
|
subj = "baz"
|
|
sub, err = nc.SubscribeSync(subj)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on subscribe: %v", err)
|
|
}
|
|
defer sub.Unsubscribe()
|
|
if err := invalid(); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
// Check we apply limit only for size
|
|
limitCount = -1
|
|
limitBytes = (toSend / 2) * payloadLen
|
|
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
|
|
t.Fatalf("Unexpected error setting limits: %v", err)
|
|
}
|
|
// Send messages
|
|
send(subj, toSend)
|
|
expectedBytes = limitBytes
|
|
expectedCount = limitBytes / payloadLen
|
|
if err := checkPending(); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
sub.Unsubscribe()
|
|
nc.Flush()
|
|
|
|
subj = "boz"
|
|
sub, err = nc.SubscribeSync(subj)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on subscribe: %v", err)
|
|
}
|
|
defer sub.Unsubscribe()
|
|
// Check we apply limit only for count
|
|
limitCount = toSend / 4
|
|
limitBytes = -1
|
|
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
|
|
t.Fatalf("Unexpected error setting limits: %v", err)
|
|
}
|
|
// Send messages
|
|
send(subj, toSend)
|
|
expectedCount = limitCount
|
|
expectedBytes = limitCount * payloadLen
|
|
if err := checkPending(); err != nil {
|
|
t.Fatalf("%v", err)
|
|
}
|
|
sub.Unsubscribe()
|
|
nc.Flush()
|
|
}
|
|
|
|
func TestSubscriptionTypes(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {})
|
|
defer sub.Unsubscribe()
|
|
if st := sub.Type(); st != nats.AsyncSubscription {
|
|
t.Fatalf("Expected AsyncSubscription, got %v", st)
|
|
}
|
|
// Check Pending
|
|
if err := sub.SetPendingLimits(1, 100); err != nil {
|
|
t.Fatalf("We should be able to SetPendingLimits()")
|
|
}
|
|
if _, _, err := sub.Pending(); err != nil {
|
|
t.Fatalf("We should be able to call Pending()")
|
|
}
|
|
sub.Unsubscribe()
|
|
if err := sub.SetPendingLimits(1, 100); err == nil {
|
|
t.Fatal("Calling SetPendingLimits() on closed subscription should fail")
|
|
}
|
|
if _, _, err := sub.PendingLimits(); err == nil {
|
|
t.Fatal("Calling PendingLimits() on closed subscription should fail")
|
|
}
|
|
|
|
sub, _ = nc.SubscribeSync("foo")
|
|
defer sub.Unsubscribe()
|
|
if st := sub.Type(); st != nats.SyncSubscription {
|
|
t.Fatalf("Expected SyncSubscription, got %v", st)
|
|
}
|
|
// Check Pending
|
|
if err := sub.SetPendingLimits(1, 100); err != nil {
|
|
t.Fatalf("We should be able to SetPendingLimits()")
|
|
}
|
|
if _, _, err := sub.Pending(); err != nil {
|
|
t.Fatalf("We should be able to call Pending()")
|
|
}
|
|
sub.Unsubscribe()
|
|
if err := sub.SetPendingLimits(1, 100); err == nil {
|
|
t.Fatal("Calling SetPendingLimits() on closed subscription should fail")
|
|
}
|
|
if _, _, err := sub.PendingLimits(); err == nil {
|
|
t.Fatal("Calling PendingLimits() on closed subscription should fail")
|
|
}
|
|
|
|
sub, _ = nc.ChanSubscribe("foo", make(chan *nats.Msg))
|
|
defer sub.Unsubscribe()
|
|
if st := sub.Type(); st != nats.ChanSubscription {
|
|
t.Fatalf("Expected ChanSubscription, got %v", st)
|
|
}
|
|
// Check Pending
|
|
if err := sub.SetPendingLimits(1, 100); err == nil {
|
|
t.Fatalf("We should NOT be able to SetPendingLimits() on ChanSubscriber")
|
|
}
|
|
if _, _, err := sub.Pending(); err == nil {
|
|
t.Fatalf("We should NOT be able to call Pending() on ChanSubscriber")
|
|
}
|
|
if _, _, err := sub.MaxPending(); err == nil {
|
|
t.Fatalf("We should NOT be able to call MaxPending() on ChanSubscriber")
|
|
}
|
|
if err := sub.ClearMaxPending(); err == nil {
|
|
t.Fatalf("We should NOT be able to call ClearMaxPending() on ChanSubscriber")
|
|
}
|
|
if _, _, err := sub.PendingLimits(); err == nil {
|
|
t.Fatalf("We should NOT be able to call PendingLimits() on ChanSubscriber")
|
|
}
|
|
|
|
}
|