
The big change is that the GeoJSON package has been completely rewritten to fix a few of geometry calculation bugs, increase performance, and to better follow the GeoJSON spec RFC 7946. GeoJSON updates - A LineString now requires at least two points. - All json members, even foreign, now persist with the object. - The bbox member persists too but is no longer used for geometry calculations. This is change in behavior. Previously Tile38 would treat the bbox as the object's physical rectangle. - Corrections to geometry intersects and within calculations. Faster spatial queries - The performance of Point-in-polygon and object intersect operations are greatly improved for complex polygons and line strings. It went from O(n) to roughly O(log n). - The same for all collection types with many children, including FeatureCollection, GeometryCollection, MultiPoint, MultiLineString, and MultiPolygon. Codebase changes - The pkg directory has been renamed to internal - The GeoJSON internal package has been moved to a seperate repo at https://github.com/tidwall/geojson. It's now vendored. Please look out for higher memory usage for datasets using complex shapes. A complex shape is one that has 64 or more points. For these shapes it's expected that there will be increase of least 54 bytes per point.
2054 lines
52 KiB
Go
2054 lines
52 KiB
Go
// Copyright 2012-2018 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package test
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/nats-io/gnatsd/server"
|
|
"github.com/nats-io/gnatsd/test"
|
|
"github.com/nats-io/go-nats"
|
|
)
|
|
|
|
func TestDefaultConnection(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
nc.Close()
|
|
}
|
|
|
|
func TestConnectionStatus(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
if nc.Status() != nats.CONNECTED {
|
|
t.Fatal("Should have status set to CONNECTED")
|
|
}
|
|
if !nc.IsConnected() {
|
|
t.Fatal("Should have status set to CONNECTED")
|
|
}
|
|
nc.Close()
|
|
if nc.Status() != nats.CLOSED {
|
|
t.Fatal("Should have status set to CLOSED")
|
|
}
|
|
if !nc.IsClosed() {
|
|
t.Fatal("Should have status set to CLOSED")
|
|
}
|
|
}
|
|
|
|
func TestConnClosedCB(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
ch := make(chan bool)
|
|
o := nats.GetDefaultOptions()
|
|
o.Url = nats.DefaultURL
|
|
o.ClosedCB = func(_ *nats.Conn) {
|
|
ch <- true
|
|
}
|
|
nc, err := o.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Should have connected ok: %v", err)
|
|
}
|
|
nc.Close()
|
|
if e := Wait(ch); e != nil {
|
|
t.Fatalf("Closed callback not triggered\n")
|
|
}
|
|
}
|
|
|
|
func TestCloseDisconnectedCB(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
ch := make(chan bool)
|
|
o := nats.GetDefaultOptions()
|
|
o.Url = nats.DefaultURL
|
|
o.AllowReconnect = false
|
|
o.DisconnectedCB = func(_ *nats.Conn) {
|
|
ch <- true
|
|
}
|
|
nc, err := o.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Should have connected ok: %v", err)
|
|
}
|
|
nc.Close()
|
|
if e := Wait(ch); e != nil {
|
|
t.Fatal("Disconnected callback not triggered")
|
|
}
|
|
}
|
|
|
|
func TestServerStopDisconnectedCB(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
ch := make(chan bool)
|
|
o := nats.GetDefaultOptions()
|
|
o.Url = nats.DefaultURL
|
|
o.AllowReconnect = false
|
|
o.DisconnectedCB = func(nc *nats.Conn) {
|
|
ch <- true
|
|
}
|
|
nc, err := o.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Should have connected ok: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
s.Shutdown()
|
|
if e := Wait(ch); e != nil {
|
|
t.Fatalf("Disconnected callback not triggered\n")
|
|
}
|
|
}
|
|
|
|
func TestServerSecureConnections(t *testing.T) {
|
|
s, opts := RunServerWithConfig("./configs/tls.conf")
|
|
defer s.Shutdown()
|
|
|
|
endpoint := fmt.Sprintf("%s:%d", opts.Host, opts.Port)
|
|
secureURL := fmt.Sprintf("nats://%s:%s@%s/", opts.Username, opts.Password, endpoint)
|
|
|
|
// Make sure this succeeds
|
|
nc, err := nats.Connect(secureURL, nats.Secure())
|
|
if err != nil {
|
|
t.Fatalf("Failed to create secure (TLS) connection: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
omsg := []byte("Hello World")
|
|
checkRecv := make(chan bool)
|
|
|
|
received := 0
|
|
nc.Subscribe("foo", func(m *nats.Msg) {
|
|
received++
|
|
if !bytes.Equal(m.Data, omsg) {
|
|
t.Fatal("Message received does not match")
|
|
}
|
|
checkRecv <- true
|
|
})
|
|
err = nc.Publish("foo", omsg)
|
|
if err != nil {
|
|
t.Fatalf("Failed to publish on secure (TLS) connection: %v", err)
|
|
}
|
|
nc.Flush()
|
|
|
|
if err := Wait(checkRecv); err != nil {
|
|
t.Fatal("Failed receiving message")
|
|
}
|
|
|
|
nc.Close()
|
|
|
|
// Server required, but not specified in Connect(), should switch automatically
|
|
nc, err = nats.Connect(secureURL)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create secure (TLS) connection: %v", err)
|
|
}
|
|
nc.Close()
|
|
|
|
// Test flag mismatch
|
|
// Wanted but not available..
|
|
ds := RunDefaultServer()
|
|
defer ds.Shutdown()
|
|
|
|
nc, err = nats.Connect(nats.DefaultURL, nats.Secure())
|
|
if err == nil || nc != nil || err != nats.ErrSecureConnWanted {
|
|
if nc != nil {
|
|
nc.Close()
|
|
}
|
|
t.Fatalf("Should have failed to create connection: %v", err)
|
|
}
|
|
|
|
// Let's be more TLS correct and verify servername, endpoint etc.
|
|
// Now do more advanced checking, verifying servername and using rootCA.
|
|
// Setup our own TLSConfig using RootCA from our self signed cert.
|
|
rootPEM, err := ioutil.ReadFile("./configs/certs/ca.pem")
|
|
if err != nil || rootPEM == nil {
|
|
t.Fatalf("failed to read root certificate")
|
|
}
|
|
pool := x509.NewCertPool()
|
|
ok := pool.AppendCertsFromPEM([]byte(rootPEM))
|
|
if !ok {
|
|
t.Fatal("failed to parse root certificate")
|
|
}
|
|
|
|
tls1 := &tls.Config{
|
|
ServerName: opts.Host,
|
|
RootCAs: pool,
|
|
MinVersion: tls.VersionTLS12,
|
|
}
|
|
|
|
nc, err = nats.Connect(secureURL, nats.Secure(tls1))
|
|
if err != nil {
|
|
t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
tls2 := &tls.Config{
|
|
ServerName: "OtherHostName",
|
|
RootCAs: pool,
|
|
MinVersion: tls.VersionTLS12,
|
|
}
|
|
|
|
nc2, err := nats.Connect(secureURL, nats.Secure(tls1, tls2))
|
|
if err == nil {
|
|
nc2.Close()
|
|
t.Fatal("Was expecting an error!")
|
|
}
|
|
}
|
|
|
|
func TestClientCertificate(t *testing.T) {
|
|
|
|
s, opts := RunServerWithConfig("./configs/tlsverify.conf")
|
|
defer s.Shutdown()
|
|
|
|
endpoint := fmt.Sprintf("%s:%d", opts.Host, opts.Port)
|
|
secureURL := fmt.Sprintf("nats://%s", endpoint)
|
|
|
|
// Make sure this fails
|
|
nc, err := nats.Connect(secureURL, nats.Secure())
|
|
if err == nil {
|
|
nc.Close()
|
|
t.Fatal("Should have failed (TLS) connection without client certificate")
|
|
}
|
|
|
|
// Check parameters validity
|
|
nc, err = nats.Connect(secureURL, nats.ClientCert("", ""))
|
|
if err == nil {
|
|
nc.Close()
|
|
t.Fatal("Should have failed due to invalid parameters")
|
|
}
|
|
|
|
// Should fail because wrong key
|
|
nc, err = nats.Connect(secureURL,
|
|
nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/key.pem"))
|
|
if err == nil {
|
|
nc.Close()
|
|
t.Fatal("Should have failed due to invalid key")
|
|
}
|
|
|
|
// Should fail because no CA
|
|
nc, err = nats.Connect(secureURL,
|
|
nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem"))
|
|
if err == nil {
|
|
nc.Close()
|
|
t.Fatal("Should have failed due to missing ca")
|
|
}
|
|
|
|
nc, err = nats.Connect(secureURL,
|
|
nats.RootCAs("./configs/certs/ca.pem"),
|
|
nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem"))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create (TLS) connection: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
omsg := []byte("Hello!")
|
|
checkRecv := make(chan bool)
|
|
|
|
received := 0
|
|
nc.Subscribe("foo", func(m *nats.Msg) {
|
|
received++
|
|
if !bytes.Equal(m.Data, omsg) {
|
|
t.Fatal("Message received does not match")
|
|
}
|
|
checkRecv <- true
|
|
})
|
|
err = nc.Publish("foo", omsg)
|
|
if err != nil {
|
|
t.Fatalf("Failed to publish on secure (TLS) connection: %v", err)
|
|
}
|
|
nc.Flush()
|
|
|
|
if err := Wait(checkRecv); err != nil {
|
|
t.Fatal("Failed to receive message")
|
|
}
|
|
}
|
|
|
|
func TestServerTLSHintConnections(t *testing.T) {
|
|
s, opts := RunServerWithConfig("./configs/tls.conf")
|
|
defer s.Shutdown()
|
|
|
|
endpoint := fmt.Sprintf("%s:%d", opts.Host, opts.Port)
|
|
secureURL := fmt.Sprintf("tls://%s:%s@%s/", opts.Username, opts.Password, endpoint)
|
|
|
|
nc, err := nats.Connect(secureURL, nats.RootCAs("./configs/certs/badca.pem"))
|
|
if err == nil {
|
|
nc.Close()
|
|
t.Fatal("Expected an error from bad RootCA file")
|
|
}
|
|
|
|
nc, err = nats.Connect(secureURL, nats.RootCAs("./configs/certs/ca.pem"))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create secure (TLS) connection: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
}
|
|
|
|
func TestClosedConnections(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
sub, _ := nc.SubscribeSync("foo")
|
|
if sub == nil {
|
|
t.Fatal("Failed to create valid subscription")
|
|
}
|
|
|
|
// Test all API endpoints do the right thing with a closed connection.
|
|
nc.Close()
|
|
if err := nc.Publish("foo", nil); err != nats.ErrConnectionClosed {
|
|
t.Fatalf("Publish on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
if err := nc.PublishMsg(&nats.Msg{Subject: "foo"}); err != nats.ErrConnectionClosed {
|
|
t.Fatalf("PublishMsg on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
if err := nc.Flush(); err != nats.ErrConnectionClosed {
|
|
t.Fatalf("Flush on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
_, err := nc.Subscribe("foo", nil)
|
|
if err != nats.ErrConnectionClosed {
|
|
t.Fatalf("Subscribe on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
_, err = nc.SubscribeSync("foo")
|
|
if err != nats.ErrConnectionClosed {
|
|
t.Fatalf("SubscribeSync on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
_, err = nc.QueueSubscribe("foo", "bar", nil)
|
|
if err != nats.ErrConnectionClosed {
|
|
t.Fatalf("QueueSubscribe on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
_, err = nc.Request("foo", []byte("help"), 10*time.Millisecond)
|
|
if err != nats.ErrConnectionClosed {
|
|
t.Fatalf("Request on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
if _, err = sub.NextMsg(10); err != nats.ErrConnectionClosed {
|
|
t.Fatalf("NextMessage on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
if err = sub.Unsubscribe(); err != nats.ErrConnectionClosed {
|
|
t.Fatalf("Unsubscribe on closed conn did not fail properly: %v\n", err)
|
|
}
|
|
}
|
|
|
|
func TestErrOnConnectAndDeadlock(t *testing.T) {
|
|
// We will hand run a fake server that will timeout and not return a proper
|
|
// INFO proto. This is to test that we do not deadlock. Issue #18
|
|
|
|
l, e := net.Listen("tcp", ":0")
|
|
if e != nil {
|
|
t.Fatal("Could not listen on an ephemeral port")
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
|
|
go func() {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
t.Fatalf("Error accepting client connection: %v\n", err)
|
|
}
|
|
defer conn.Close()
|
|
// Send back a mal-formed INFO.
|
|
conn.Write([]byte("INFOZ \r\n"))
|
|
}()
|
|
|
|
// Used to synchronize
|
|
ch := make(chan bool)
|
|
|
|
go func() {
|
|
natsURL := fmt.Sprintf("nats://localhost:%d/", addr.Port)
|
|
nc, err := nats.Connect(natsURL)
|
|
if err == nil {
|
|
nc.Close()
|
|
t.Fatal("Expected bad INFO err, got none")
|
|
}
|
|
ch <- true
|
|
}()
|
|
|
|
// Setup a timer to watch for deadlock
|
|
select {
|
|
case <-ch:
|
|
break
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("Connect took too long, deadlock?")
|
|
}
|
|
}
|
|
|
|
func TestMoreErrOnConnect(t *testing.T) {
|
|
l, e := net.Listen("tcp", "127.0.0.1:0")
|
|
if e != nil {
|
|
t.Fatal("Could not listen on an ephemeral port")
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
|
|
done := make(chan bool)
|
|
case1 := make(chan bool)
|
|
case2 := make(chan bool)
|
|
case3 := make(chan bool)
|
|
case4 := make(chan bool)
|
|
|
|
go func() {
|
|
for i := 0; i < 5; i++ {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
t.Fatalf("Error accepting client connection: %v\n", err)
|
|
}
|
|
switch i {
|
|
case 0:
|
|
// Send back a partial INFO and close the connection.
|
|
conn.Write([]byte("INFO"))
|
|
case 1:
|
|
// Send just INFO
|
|
conn.Write([]byte("INFO\r\n"))
|
|
// Stick around a bit
|
|
<-case1
|
|
case 2:
|
|
info := fmt.Sprintf("INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n", addr.IP, addr.Port)
|
|
// Send complete INFO
|
|
conn.Write([]byte(info))
|
|
// Read connect and ping commands sent from the client
|
|
br := bufio.NewReaderSize(conn, 1024)
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected CONNECT from client, got: %s", err)
|
|
}
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected PING from client, got: %s", err)
|
|
}
|
|
// Client expect +OK, send it but then something else than PONG
|
|
conn.Write([]byte("+OK\r\n"))
|
|
// Stick around a bit
|
|
<-case2
|
|
case 3:
|
|
info := fmt.Sprintf("INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n", addr.IP, addr.Port)
|
|
// Send complete INFO
|
|
conn.Write([]byte(info))
|
|
// Read connect and ping commands sent from the client
|
|
br := bufio.NewReaderSize(conn, 1024)
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected CONNECT from client, got: %s", err)
|
|
}
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected PING from client, got: %s", err)
|
|
}
|
|
// Client expect +OK, send it but then something else than PONG
|
|
conn.Write([]byte("+OK\r\nXXX\r\n"))
|
|
// Stick around a bit
|
|
<-case3
|
|
case 4:
|
|
info := fmt.Sprintf("INFO {'x'}\r\n")
|
|
// Send INFO with JSON marshall error
|
|
conn.Write([]byte(info))
|
|
// Stick around a bit
|
|
<-case4
|
|
}
|
|
|
|
conn.Close()
|
|
}
|
|
|
|
// Hang around until asked to quit
|
|
<-done
|
|
}()
|
|
|
|
natsURL := fmt.Sprintf("nats://localhost:%d", addr.Port)
|
|
|
|
if nc, err := nats.Connect(natsURL, nats.Timeout(20*time.Millisecond)); err == nil {
|
|
nc.Close()
|
|
t.Fatal("Expected error, got none")
|
|
}
|
|
|
|
if nc, err := nats.Connect(natsURL, nats.Timeout(20*time.Millisecond)); err == nil {
|
|
close(case1)
|
|
nc.Close()
|
|
t.Fatal("Expected error, got none")
|
|
}
|
|
|
|
close(case1)
|
|
|
|
opts := nats.GetDefaultOptions()
|
|
opts.Servers = []string{natsURL}
|
|
opts.Timeout = 20 * time.Millisecond
|
|
opts.Verbose = true
|
|
|
|
if nc, err := opts.Connect(); err == nil {
|
|
close(case2)
|
|
nc.Close()
|
|
t.Fatal("Expected error, got none")
|
|
}
|
|
|
|
close(case2)
|
|
|
|
if nc, err := opts.Connect(); err == nil {
|
|
close(case3)
|
|
nc.Close()
|
|
t.Fatal("Expected error, got none")
|
|
}
|
|
|
|
close(case3)
|
|
|
|
if nc, err := opts.Connect(); err == nil {
|
|
close(case4)
|
|
nc.Close()
|
|
t.Fatal("Expected error, got none")
|
|
}
|
|
|
|
close(case4)
|
|
|
|
close(done)
|
|
}
|
|
|
|
func TestErrOnMaxPayloadLimit(t *testing.T) {
|
|
expectedMaxPayload := int64(10)
|
|
serverInfo := "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":%d}\r\n"
|
|
|
|
l, e := net.Listen("tcp", "127.0.0.1:0")
|
|
if e != nil {
|
|
t.Fatal("Could not listen on an ephemeral port")
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
|
|
// Send back an INFO message with custom max payload size on connect.
|
|
var conn net.Conn
|
|
var err error
|
|
|
|
go func() {
|
|
conn, err = l.Accept()
|
|
if err != nil {
|
|
t.Fatalf("Error accepting client connection: %v\n", err)
|
|
}
|
|
defer conn.Close()
|
|
info := fmt.Sprintf(serverInfo, addr.IP, addr.Port, expectedMaxPayload)
|
|
conn.Write([]byte(info))
|
|
|
|
// Read connect and ping commands sent from the client
|
|
line := make([]byte, 111)
|
|
_, err := conn.Read(line)
|
|
if err != nil {
|
|
t.Fatalf("Expected CONNECT and PING from client, got: %s", err)
|
|
}
|
|
conn.Write([]byte("PONG\r\n"))
|
|
// Hang around a bit to not err on EOF in client.
|
|
time.Sleep(250 * time.Millisecond)
|
|
}()
|
|
|
|
// Wait for server mock to start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
natsURL := fmt.Sprintf("nats://%s:%d", addr.IP, addr.Port)
|
|
opts := nats.GetDefaultOptions()
|
|
opts.Servers = []string{natsURL}
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Expected INFO message with custom max payload, got: %s", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
got := nc.MaxPayload()
|
|
if got != expectedMaxPayload {
|
|
t.Fatalf("Expected MaxPayload to be %d, got: %d", expectedMaxPayload, got)
|
|
}
|
|
err = nc.Publish("hello", []byte("hello world"))
|
|
if err != nats.ErrMaxPayload {
|
|
t.Fatalf("Expected to fail trying to send more than max payload, got: %s", err)
|
|
}
|
|
err = nc.Publish("hello", []byte("a"))
|
|
if err != nil {
|
|
t.Fatalf("Expected to succeed trying to send less than max payload, got: %s", err)
|
|
}
|
|
}
|
|
|
|
func TestConnectVerbose(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
o := nats.GetDefaultOptions()
|
|
o.Verbose = true
|
|
|
|
nc, err := o.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Should have connected ok: %v", err)
|
|
}
|
|
nc.Close()
|
|
}
|
|
|
|
func getStacks(all bool) string {
|
|
var (
|
|
stacks []byte
|
|
stacksSize = 10000
|
|
n int
|
|
)
|
|
for {
|
|
stacks = make([]byte, stacksSize)
|
|
n = runtime.Stack(stacks, all)
|
|
if n == stacksSize {
|
|
stacksSize *= 2
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
return string(stacks[:n])
|
|
}
|
|
|
|
func isRunningInAsyncCBDispatcher() error {
|
|
strStacks := getStacks(false)
|
|
if strings.Contains(strStacks, "asyncCBDispatcher") {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("callback not executed from dispatcher:\n %s", strStacks)
|
|
}
|
|
|
|
func isAsyncDispatcherRunning() bool {
|
|
strStacks := getStacks(true)
|
|
return strings.Contains(strStacks, "asyncCBDispatcher")
|
|
}
|
|
|
|
func TestCallbacksOrder(t *testing.T) {
|
|
authS, authSOpts := RunServerWithConfig("./configs/tls.conf")
|
|
defer authS.Shutdown()
|
|
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
firstDisconnect := true
|
|
dtime1 := time.Time{}
|
|
dtime2 := time.Time{}
|
|
rtime := time.Time{}
|
|
atime1 := time.Time{}
|
|
atime2 := time.Time{}
|
|
ctime := time.Time{}
|
|
|
|
cbErrors := make(chan error, 20)
|
|
|
|
reconnected := make(chan bool)
|
|
closed := make(chan bool)
|
|
asyncErr := make(chan bool, 2)
|
|
recvCh := make(chan bool, 2)
|
|
recvCh1 := make(chan bool)
|
|
recvCh2 := make(chan bool)
|
|
|
|
dch := func(nc *nats.Conn) {
|
|
if err := isRunningInAsyncCBDispatcher(); err != nil {
|
|
cbErrors <- err
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
if firstDisconnect {
|
|
firstDisconnect = false
|
|
dtime1 = time.Now()
|
|
} else {
|
|
dtime2 = time.Now()
|
|
}
|
|
}
|
|
|
|
rch := func(nc *nats.Conn) {
|
|
if err := isRunningInAsyncCBDispatcher(); err != nil {
|
|
cbErrors <- err
|
|
reconnected <- true
|
|
return
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
rtime = time.Now()
|
|
reconnected <- true
|
|
}
|
|
|
|
ech := func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
|
if err := isRunningInAsyncCBDispatcher(); err != nil {
|
|
cbErrors <- err
|
|
asyncErr <- true
|
|
return
|
|
}
|
|
if sub.Subject == "foo" {
|
|
time.Sleep(20 * time.Millisecond)
|
|
atime1 = time.Now()
|
|
} else {
|
|
atime2 = time.Now()
|
|
}
|
|
asyncErr <- true
|
|
}
|
|
|
|
cch := func(nc *nats.Conn) {
|
|
if err := isRunningInAsyncCBDispatcher(); err != nil {
|
|
cbErrors <- err
|
|
closed <- true
|
|
return
|
|
}
|
|
ctime = time.Now()
|
|
closed <- true
|
|
}
|
|
|
|
url := net.JoinHostPort(authSOpts.Host, strconv.Itoa(authSOpts.Port))
|
|
url = "nats://" + url + "," + nats.DefaultURL
|
|
|
|
nc, err := nats.Connect(url,
|
|
nats.DisconnectHandler(dch),
|
|
nats.ReconnectHandler(rch),
|
|
nats.ClosedHandler(cch),
|
|
nats.ErrorHandler(ech),
|
|
nats.ReconnectWait(50*time.Millisecond),
|
|
nats.DontRandomize())
|
|
|
|
if err != nil {
|
|
t.Fatalf("Unable to connect: %v\n", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
ncp, err := nats.Connect(nats.DefaultURL,
|
|
nats.ReconnectWait(50*time.Millisecond))
|
|
if err != nil {
|
|
t.Fatalf("Unable to connect: %v\n", err)
|
|
}
|
|
defer ncp.Close()
|
|
|
|
// Wait to make sure that if we have closed (incorrectly) the
|
|
// asyncCBDispatcher during the connect process, this is caught here.
|
|
time.Sleep(time.Second)
|
|
|
|
s.Shutdown()
|
|
|
|
s = RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
if err := Wait(reconnected); err != nil {
|
|
t.Fatal("Did not get the reconnected callback")
|
|
}
|
|
|
|
var sub1 *nats.Subscription
|
|
var sub2 *nats.Subscription
|
|
|
|
recv := func(m *nats.Msg) {
|
|
// Signal that one message is received
|
|
recvCh <- true
|
|
|
|
// We will now block
|
|
if m.Subject == "foo" {
|
|
<-recvCh1
|
|
} else {
|
|
<-recvCh2
|
|
}
|
|
m.Sub.Unsubscribe()
|
|
}
|
|
|
|
sub1, err = nc.Subscribe("foo", recv)
|
|
if err != nil {
|
|
t.Fatalf("Unable to create subscription: %v\n", err)
|
|
}
|
|
sub1.SetPendingLimits(1, 100000)
|
|
|
|
sub2, err = nc.Subscribe("bar", recv)
|
|
if err != nil {
|
|
t.Fatalf("Unable to create subscription: %v\n", err)
|
|
}
|
|
sub2.SetPendingLimits(1, 100000)
|
|
|
|
nc.Flush()
|
|
|
|
ncp.Publish("foo", []byte("test"))
|
|
ncp.Publish("bar", []byte("test"))
|
|
ncp.Flush()
|
|
|
|
// Wait notification that message were received
|
|
err = Wait(recvCh)
|
|
if err == nil {
|
|
err = Wait(recvCh)
|
|
}
|
|
if err != nil {
|
|
t.Fatal("Did not receive message")
|
|
}
|
|
|
|
for i := 0; i < 2; i++ {
|
|
ncp.Publish("foo", []byte("test"))
|
|
ncp.Publish("bar", []byte("test"))
|
|
}
|
|
ncp.Flush()
|
|
|
|
if err := Wait(asyncErr); err != nil {
|
|
t.Fatal("Did not get the async callback")
|
|
}
|
|
if err := Wait(asyncErr); err != nil {
|
|
t.Fatal("Did not get the async callback")
|
|
}
|
|
|
|
close(recvCh1)
|
|
close(recvCh2)
|
|
|
|
nc.Close()
|
|
|
|
if err := Wait(closed); err != nil {
|
|
t.Fatal("Did not get the close callback")
|
|
}
|
|
|
|
if len(cbErrors) > 0 {
|
|
t.Fatalf("%v", <-cbErrors)
|
|
}
|
|
|
|
if (dtime1 == time.Time{}) || (dtime2 == time.Time{}) || (rtime == time.Time{}) || (atime1 == time.Time{}) || (atime2 == time.Time{}) || (ctime == time.Time{}) {
|
|
t.Fatalf("Some callbacks did not fire:\n%v\n%v\n%v\n%v\n%v\n%v", dtime1, rtime, atime1, atime2, dtime2, ctime)
|
|
}
|
|
|
|
if rtime.Before(dtime1) || dtime2.Before(rtime) || atime2.Before(atime1) || ctime.Before(atime2) {
|
|
t.Fatalf("Wrong callback order:\n%v\n%v\n%v\n%v\n%v\n%v", dtime1, rtime, atime1, atime2, dtime2, ctime)
|
|
}
|
|
|
|
// Close the other connection
|
|
ncp.Close()
|
|
|
|
// Check that the go routine is gone. Allow plenty of time
|
|
// to avoid flappers.
|
|
timeout := time.Now().Add(5 * time.Second)
|
|
for time.Now().Before(timeout) {
|
|
if !isAsyncDispatcherRunning() {
|
|
// Good, we are done!
|
|
return
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
t.Fatalf("The async callback dispatcher(s) should have stopped")
|
|
}
|
|
|
|
func TestFlushReleaseOnClose(t *testing.T) {
|
|
serverInfo := "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n"
|
|
|
|
l, e := net.Listen("tcp", "127.0.0.1:0")
|
|
if e != nil {
|
|
t.Fatal("Could not listen on an ephemeral port")
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
done := make(chan bool)
|
|
|
|
go func() {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
t.Fatalf("Error accepting client connection: %v\n", err)
|
|
}
|
|
defer conn.Close()
|
|
info := fmt.Sprintf(serverInfo, addr.IP, addr.Port)
|
|
conn.Write([]byte(info))
|
|
|
|
// Read connect and ping commands sent from the client
|
|
br := bufio.NewReaderSize(conn, 1024)
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected CONNECT from client, got: %s", err)
|
|
}
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected PING from client, got: %s", err)
|
|
}
|
|
conn.Write([]byte("PONG\r\n"))
|
|
|
|
// Hang around until asked to quit
|
|
<-done
|
|
}()
|
|
|
|
// Wait for server mock to start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
natsURL := fmt.Sprintf("nats://%s:%d", addr.IP, addr.Port)
|
|
opts := nats.GetDefaultOptions()
|
|
opts.AllowReconnect = false
|
|
opts.Servers = []string{natsURL}
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Expected INFO message with custom max payload, got: %s", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// First try a FlushTimeout() and make sure we timeout
|
|
if err := nc.FlushTimeout(50 * time.Millisecond); err == nil || err != nats.ErrTimeout {
|
|
t.Fatalf("Expected a timeout error, got: %v", err)
|
|
}
|
|
|
|
go func() {
|
|
time.Sleep(50 * time.Millisecond)
|
|
nc.Close()
|
|
}()
|
|
|
|
if err := nc.Flush(); err == nil {
|
|
t.Fatal("Expected error on Flush() released by Close()")
|
|
}
|
|
|
|
close(done)
|
|
}
|
|
|
|
func TestMaxPendingOut(t *testing.T) {
|
|
serverInfo := "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n"
|
|
|
|
l, e := net.Listen("tcp", "127.0.0.1:0")
|
|
if e != nil {
|
|
t.Fatal("Could not listen on an ephemeral port")
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
done := make(chan bool)
|
|
cch := make(chan bool)
|
|
|
|
go func() {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
t.Fatalf("Error accepting client connection: %v\n", err)
|
|
}
|
|
defer conn.Close()
|
|
info := fmt.Sprintf(serverInfo, addr.IP, addr.Port)
|
|
conn.Write([]byte(info))
|
|
|
|
// Read connect and ping commands sent from the client
|
|
br := bufio.NewReaderSize(conn, 1024)
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected CONNECT from client, got: %s", err)
|
|
}
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected PING from client, got: %s", err)
|
|
}
|
|
conn.Write([]byte("PONG\r\n"))
|
|
|
|
// Hang around until asked to quit
|
|
<-done
|
|
}()
|
|
|
|
// Wait for server mock to start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
natsURL := fmt.Sprintf("nats://%s:%d", addr.IP, addr.Port)
|
|
opts := nats.GetDefaultOptions()
|
|
opts.PingInterval = 20 * time.Millisecond
|
|
opts.MaxPingsOut = 2
|
|
opts.AllowReconnect = false
|
|
opts.ClosedCB = func(_ *nats.Conn) { cch <- true }
|
|
opts.Servers = []string{natsURL}
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Expected INFO message with custom max payload, got: %s", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// After 60 ms, we should have closed the connection
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if err := Wait(cch); err != nil {
|
|
t.Fatal("Failed to get ClosedCB")
|
|
}
|
|
if nc.LastError() != nats.ErrStaleConnection {
|
|
t.Fatalf("Expected to get %v, got %v", nats.ErrStaleConnection, nc.LastError())
|
|
}
|
|
|
|
close(done)
|
|
}
|
|
|
|
func TestErrInReadLoop(t *testing.T) {
|
|
serverInfo := "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n"
|
|
|
|
l, e := net.Listen("tcp", "127.0.0.1:0")
|
|
if e != nil {
|
|
t.Fatal("Could not listen on an ephemeral port")
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
done := make(chan bool)
|
|
cch := make(chan bool)
|
|
|
|
go func() {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
t.Fatalf("Error accepting client connection: %v\n", err)
|
|
}
|
|
defer conn.Close()
|
|
info := fmt.Sprintf(serverInfo, addr.IP, addr.Port)
|
|
conn.Write([]byte(info))
|
|
|
|
// Read connect and ping commands sent from the client
|
|
br := bufio.NewReaderSize(conn, 1024)
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected CONNECT from client, got: %s", err)
|
|
}
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected PING from client, got: %s", err)
|
|
}
|
|
conn.Write([]byte("PONG\r\n"))
|
|
|
|
// Read (and ignore) the SUB from the client
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected SUB from client, got: %s", err)
|
|
}
|
|
|
|
// Send something that should make the subscriber fail.
|
|
conn.Write([]byte("Ivan"))
|
|
|
|
// Hang around until asked to quit
|
|
<-done
|
|
}()
|
|
|
|
// Wait for server mock to start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
natsURL := fmt.Sprintf("nats://%s:%d", addr.IP, addr.Port)
|
|
opts := nats.GetDefaultOptions()
|
|
opts.AllowReconnect = false
|
|
opts.ClosedCB = func(_ *nats.Conn) { cch <- true }
|
|
opts.Servers = []string{natsURL}
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Expected INFO message with custom max payload, got: %s", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
received := int64(0)
|
|
|
|
nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
atomic.AddInt64(&received, 1)
|
|
})
|
|
|
|
if err := Wait(cch); err != nil {
|
|
t.Fatal("Failed to get ClosedCB")
|
|
}
|
|
|
|
recv := int(atomic.LoadInt64(&received))
|
|
if recv != 0 {
|
|
t.Fatalf("Should not have received messages, got: %d", recv)
|
|
}
|
|
|
|
close(done)
|
|
}
|
|
|
|
func TestErrStaleConnection(t *testing.T) {
|
|
serverInfo := "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n"
|
|
|
|
l, e := net.Listen("tcp", "127.0.0.1:0")
|
|
if e != nil {
|
|
t.Fatal("Could not listen on an ephemeral port")
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
done := make(chan bool)
|
|
dch := make(chan bool)
|
|
rch := make(chan bool)
|
|
cch := make(chan bool)
|
|
sch := make(chan bool)
|
|
|
|
firstDisconnect := true
|
|
|
|
go func() {
|
|
for i := 0; i < 2; i++ {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
t.Fatalf("Error accepting client connection: %v\n", err)
|
|
}
|
|
defer conn.Close()
|
|
info := fmt.Sprintf(serverInfo, addr.IP, addr.Port)
|
|
conn.Write([]byte(info))
|
|
|
|
// Read connect and ping commands sent from the client
|
|
br := bufio.NewReaderSize(conn, 1024)
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected CONNECT from client, got: %s", err)
|
|
}
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected PING from client, got: %s", err)
|
|
}
|
|
conn.Write([]byte("PONG\r\n"))
|
|
|
|
if i == 0 {
|
|
// Wait a tiny, and simulate a Stale Connection
|
|
time.Sleep(50 * time.Millisecond)
|
|
conn.Write([]byte("-ERR 'Stale Connection'\r\n"))
|
|
|
|
// The client should try to reconnect. When getting the
|
|
// disconnected callback, it will close this channel.
|
|
<-sch
|
|
|
|
// Close the connection and go back to accept the new
|
|
// connection.
|
|
conn.Close()
|
|
} else {
|
|
// Hang around a bit
|
|
<-done
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait for server mock to start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
natsURL := fmt.Sprintf("nats://%s:%d", addr.IP, addr.Port)
|
|
opts := nats.GetDefaultOptions()
|
|
opts.AllowReconnect = true
|
|
opts.DisconnectedCB = func(_ *nats.Conn) {
|
|
// Interested only in the first disconnect cb
|
|
if firstDisconnect {
|
|
firstDisconnect = false
|
|
close(sch)
|
|
dch <- true
|
|
}
|
|
}
|
|
opts.ReconnectedCB = func(_ *nats.Conn) { rch <- true }
|
|
opts.ClosedCB = func(_ *nats.Conn) { cch <- true }
|
|
opts.ReconnectWait = 20 * time.Millisecond
|
|
opts.MaxReconnect = 100
|
|
opts.Servers = []string{natsURL}
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Expected INFO message with custom max payload, got: %s", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// We should first gets disconnected
|
|
if err := Wait(dch); err != nil {
|
|
t.Fatal("Failed to get DisconnectedCB")
|
|
}
|
|
|
|
// Then reconneted..
|
|
if err := Wait(rch); err != nil {
|
|
t.Fatal("Failed to get ReconnectedCB")
|
|
}
|
|
|
|
// Now close the connection
|
|
nc.Close()
|
|
|
|
// We should get the closed cb
|
|
if err := Wait(cch); err != nil {
|
|
t.Fatal("Failed to get ClosedCB")
|
|
}
|
|
|
|
close(done)
|
|
}
|
|
|
|
func TestServerErrorClosesConnection(t *testing.T) {
|
|
serverInfo := "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":1048576}\r\n"
|
|
|
|
l, e := net.Listen("tcp", "127.0.0.1:0")
|
|
if e != nil {
|
|
t.Fatal("Could not listen on an ephemeral port")
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
done := make(chan bool)
|
|
dch := make(chan bool)
|
|
cch := make(chan bool)
|
|
|
|
serverSentError := "Any Error"
|
|
reconnected := int64(0)
|
|
|
|
go func() {
|
|
conn, err := l.Accept()
|
|
if err != nil {
|
|
t.Fatalf("Error accepting client connection: %v\n", err)
|
|
}
|
|
defer conn.Close()
|
|
info := fmt.Sprintf(serverInfo, addr.IP, addr.Port)
|
|
conn.Write([]byte(info))
|
|
|
|
// Read connect and ping commands sent from the client
|
|
br := bufio.NewReaderSize(conn, 1024)
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected CONNECT from client, got: %s", err)
|
|
}
|
|
if _, err := br.ReadString('\n'); err != nil {
|
|
t.Fatalf("Expected PING from client, got: %s", err)
|
|
}
|
|
conn.Write([]byte("PONG\r\n"))
|
|
|
|
// Wait a tiny, and simulate a Stale Connection
|
|
time.Sleep(50 * time.Millisecond)
|
|
conn.Write([]byte("-ERR '" + serverSentError + "'\r\n"))
|
|
|
|
// Hang around a bit
|
|
<-done
|
|
}()
|
|
|
|
// Wait for server mock to start
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
natsURL := fmt.Sprintf("nats://%s:%d", addr.IP, addr.Port)
|
|
opts := nats.GetDefaultOptions()
|
|
opts.AllowReconnect = true
|
|
opts.DisconnectedCB = func(_ *nats.Conn) { dch <- true }
|
|
opts.ReconnectedCB = func(_ *nats.Conn) { atomic.AddInt64(&reconnected, 1) }
|
|
opts.ClosedCB = func(_ *nats.Conn) { cch <- true }
|
|
opts.ReconnectWait = 20 * time.Millisecond
|
|
opts.MaxReconnect = 100
|
|
opts.Servers = []string{natsURL}
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Expected INFO message with custom max payload, got: %s", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// The server sends an error that should cause the client to simply close
|
|
// the connection.
|
|
|
|
// We should first gets disconnected
|
|
if err := Wait(dch); err != nil {
|
|
t.Fatal("Failed to get DisconnectedCB")
|
|
}
|
|
|
|
// We should get the closed cb
|
|
if err := Wait(cch); err != nil {
|
|
t.Fatal("Failed to get ClosedCB")
|
|
}
|
|
|
|
// We should not have been reconnected
|
|
if atomic.LoadInt64(&reconnected) != 0 {
|
|
t.Fatal("ReconnectedCB should not have been invoked")
|
|
}
|
|
|
|
// Check LastError(), it should be "nats: <server error in lower case>"
|
|
lastErr := nc.LastError().Error()
|
|
expectedErr := "nats: " + strings.ToLower(serverSentError)
|
|
if lastErr != expectedErr {
|
|
t.Fatalf("Expected error: '%v', got '%v'", expectedErr, lastErr)
|
|
}
|
|
|
|
close(done)
|
|
}
|
|
|
|
func TestUseDefaultTimeout(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := &nats.Options{
|
|
Servers: []string{nats.DefaultURL},
|
|
}
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
if nc.Opts.Timeout != nats.DefaultTimeout {
|
|
t.Fatalf("Expected Timeout to be set to %v, got %v", nats.DefaultTimeout, nc.Opts.Timeout)
|
|
}
|
|
}
|
|
|
|
func TestNoRaceOnLastError(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
// Access LastError in disconnection and closed handlers to make sure
|
|
// that there is no race. It is possible in some cases that
|
|
// nc.LastError() returns a non nil error. We don't care here about the
|
|
// returned value.
|
|
dch := func(c *nats.Conn) {
|
|
c.LastError()
|
|
}
|
|
closedCh := make(chan struct{})
|
|
cch := func(c *nats.Conn) {
|
|
c.LastError()
|
|
closedCh <- struct{}{}
|
|
}
|
|
nc, err := nats.Connect(nats.DefaultURL,
|
|
nats.DisconnectHandler(dch),
|
|
nats.ClosedHandler(cch),
|
|
nats.MaxReconnects(-1),
|
|
nats.ReconnectWait(5*time.Millisecond))
|
|
if err != nil {
|
|
t.Fatalf("Unable to connect: %v\n", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
// Restart the server several times to trigger a reconnection.
|
|
for i := 0; i < 10; i++ {
|
|
s.Shutdown()
|
|
time.Sleep(10 * time.Millisecond)
|
|
s = RunDefaultServer()
|
|
}
|
|
nc.Close()
|
|
s.Shutdown()
|
|
select {
|
|
case <-closedCh:
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Timeout waiting for the closed callback")
|
|
}
|
|
}
|
|
|
|
type customDialer struct {
|
|
ch chan bool
|
|
}
|
|
|
|
func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
|
|
cd.ch <- true
|
|
return nil, fmt.Errorf("on purpose")
|
|
}
|
|
|
|
func TestUseCustomDialer(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
dialer := &net.Dialer{
|
|
Timeout: 10 * time.Second,
|
|
DualStack: true,
|
|
}
|
|
opts := &nats.Options{
|
|
Servers: []string{nats.DefaultURL},
|
|
Dialer: dialer,
|
|
}
|
|
nc, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
if nc.Opts.Dialer != dialer {
|
|
t.Fatalf("Expected Dialer to be set to %v, got %v", dialer, nc.Opts.Dialer)
|
|
}
|
|
|
|
// Should be possible to set via variadic func based Option setter
|
|
dialer2 := &net.Dialer{
|
|
Timeout: 5 * time.Second,
|
|
DualStack: true,
|
|
}
|
|
nc2, err := nats.Connect(nats.DefaultURL, nats.Dialer(dialer2))
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
if !nc2.Opts.Dialer.DualStack {
|
|
t.Fatalf("Expected for dialer to be customized to use dual stack support")
|
|
}
|
|
|
|
// By default, dialer still uses the DefaultTimeout
|
|
nc3, err := nats.Connect(nats.DefaultURL)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on connect: %v", err)
|
|
}
|
|
defer nc3.Close()
|
|
if nc3.Opts.Dialer.Timeout != nats.DefaultTimeout {
|
|
t.Fatalf("Expected DialTimeout to be set to %v, got %v", nats.DefaultTimeout, nc.Opts.Dialer.Timeout)
|
|
}
|
|
|
|
// Create custom dialer that return error on Dial().
|
|
cdialer := &customDialer{ch: make(chan bool, 1)}
|
|
|
|
// When both Dialer and CustomDialer are set, CustomDialer
|
|
// should take precedence. That means that the connection
|
|
// should fail for these two set of options.
|
|
options := []*nats.Options{
|
|
{Dialer: dialer, CustomDialer: cdialer},
|
|
{CustomDialer: cdialer},
|
|
}
|
|
for _, o := range options {
|
|
o.Servers = []string{nats.DefaultURL}
|
|
nc, err := o.Connect()
|
|
// As of now, Connect() would not return the actual dialer error,
|
|
// instead it returns "no server available for connections".
|
|
// So use go channel to ensure that custom dialer's Dial() method
|
|
// was invoked.
|
|
if err == nil {
|
|
if nc != nil {
|
|
nc.Close()
|
|
}
|
|
t.Fatal("Expected error, got none")
|
|
}
|
|
if err := Wait(cdialer.ch); err != nil {
|
|
t.Fatal("Did not get our notification")
|
|
}
|
|
}
|
|
// Same with variadic
|
|
foptions := [][]nats.Option{
|
|
{nats.Dialer(dialer), nats.SetCustomDialer(cdialer)},
|
|
{nats.SetCustomDialer(cdialer)},
|
|
}
|
|
for _, fos := range foptions {
|
|
nc, err := nats.Connect(nats.DefaultURL, fos...)
|
|
if err == nil {
|
|
if nc != nil {
|
|
nc.Close()
|
|
}
|
|
t.Fatal("Expected error, got none")
|
|
}
|
|
if err := Wait(cdialer.ch); err != nil {
|
|
t.Fatal("Did not get our notification")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDefaultOptionsDialer(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
opts1 := nats.DefaultOptions
|
|
opts2 := nats.DefaultOptions
|
|
|
|
nc1, err := opts1.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on connect: %v", err)
|
|
}
|
|
defer nc1.Close()
|
|
|
|
nc2, err := opts2.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
if nc1.Opts.Dialer == nc2.Opts.Dialer {
|
|
t.Fatalf("Expected each connection to have its own dialer")
|
|
}
|
|
}
|
|
|
|
func TestCustomFlusherTimeout(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
opts := &nats.Options{
|
|
Servers: []string{nats.DefaultURL},
|
|
|
|
// Reasonably large flusher timeout will not induce errors
|
|
// when we can flush fast
|
|
FlusherTimeout: 10 * time.Second,
|
|
}
|
|
nc1, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Expected to be able to connect, got: %s", err)
|
|
}
|
|
doneCh := make(chan struct{})
|
|
payload := ""
|
|
for i := 0; i < 8192; i++ {
|
|
payload += "A"
|
|
}
|
|
payloadBytes := []byte(payload)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-time.After(200 * time.Millisecond):
|
|
err := nc1.Publish("hello", payloadBytes)
|
|
if err != nil {
|
|
t.Errorf("Error during publish: %s", err)
|
|
}
|
|
case <-time.After(5 * time.Second):
|
|
t.Errorf("Timeout publishing messages")
|
|
return
|
|
case <-doneCh:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
defer nc1.Close()
|
|
|
|
opts = &nats.Options{
|
|
Servers: []string{nats.DefaultURL},
|
|
|
|
// Use short flusher timeout to trigger the error
|
|
FlusherTimeout: 1 * time.Microsecond,
|
|
|
|
// Upon failure to be able to exercice ping pong interval
|
|
// then we will hit this timeout and disconnect
|
|
PingInterval: 500 * time.Millisecond,
|
|
}
|
|
|
|
opts.DisconnectedCB = func(nc *nats.Conn) {
|
|
// Ping loops that test is done
|
|
doneCh <- struct{}{}
|
|
}
|
|
|
|
nc2, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Expected to be able to connect, got: %s", err)
|
|
}
|
|
defer nc2.Close()
|
|
|
|
// Consume messages to make the reading loop work
|
|
_, err = nc2.Subscribe(">", func(_ *nats.Msg) {})
|
|
if err != nil {
|
|
t.Fatalf("Expected to be able to create subscription, got: %s", err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-time.After(100 * time.Millisecond):
|
|
// Some of the publishes will succeed and others fail with i/o timeout error
|
|
// but eventually ping interval will fail and close the connection.
|
|
err = nc2.Publish("world", payloadBytes)
|
|
if err == nats.ErrConnectionClosed {
|
|
return
|
|
}
|
|
case <-time.After(5 * time.Second):
|
|
t.Errorf("Timeout publishing messages")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestNewServers(t *testing.T) {
|
|
s1Opts := test.DefaultTestOptions
|
|
s1Opts.Host = "127.0.0.1"
|
|
s1Opts.Port = 4222
|
|
s1Opts.Cluster.Host = "localhost"
|
|
s1Opts.Cluster.Port = 6222
|
|
s1 := test.RunServer(&s1Opts)
|
|
defer s1.Shutdown()
|
|
|
|
s2Opts := test.DefaultTestOptions
|
|
s2Opts.Host = "127.0.0.1"
|
|
s2Opts.Port = 4223
|
|
s2Opts.Port = s1Opts.Port + 1
|
|
s2Opts.Cluster.Host = "localhost"
|
|
s2Opts.Cluster.Port = 6223
|
|
s2Opts.Routes = server.RoutesFromStr("nats://localhost:6222")
|
|
s2 := test.RunServer(&s2Opts)
|
|
defer s2.Shutdown()
|
|
|
|
ch := make(chan bool)
|
|
cb := func(_ *nats.Conn) {
|
|
ch <- true
|
|
}
|
|
url := fmt.Sprintf("nats://%s:%d", s1Opts.Host, s1Opts.Port)
|
|
nc1, err := nats.Connect(url, nats.DiscoveredServersHandler(cb))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc1.Close()
|
|
|
|
nc2, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc2.Close()
|
|
nc2.SetDiscoveredServersHandler(cb)
|
|
|
|
opts := nats.GetDefaultOptions()
|
|
opts.Url = nats.DefaultURL
|
|
opts.DiscoveredServersCB = cb
|
|
nc3, err := opts.Connect()
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc3.Close()
|
|
|
|
// Make sure that handler is not invoked on initial connect.
|
|
select {
|
|
case <-ch:
|
|
t.Fatalf("Handler should not have been invoked")
|
|
case <-time.After(500 * time.Millisecond):
|
|
}
|
|
|
|
// Start a new server.
|
|
s3Opts := test.DefaultTestOptions
|
|
s1Opts.Host = "127.0.0.1"
|
|
s1Opts.Port = 4224
|
|
s3Opts.Port = s2Opts.Port + 1
|
|
s3Opts.Cluster.Host = "localhost"
|
|
s3Opts.Cluster.Port = 6224
|
|
s3Opts.Routes = server.RoutesFromStr("nats://localhost:6222")
|
|
s3 := test.RunServer(&s3Opts)
|
|
defer s3.Shutdown()
|
|
|
|
// The callbacks should have been invoked
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Did not get our callback")
|
|
}
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Did not get our callback")
|
|
}
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Did not get our callback")
|
|
}
|
|
}
|
|
|
|
func TestBarrier(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
nc := NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
pubMsgs := int32(0)
|
|
ch := make(chan bool, 1)
|
|
|
|
sub1, err := nc.Subscribe("pub", func(_ *nats.Msg) {
|
|
atomic.AddInt32(&pubMsgs, 1)
|
|
time.Sleep(250 * time.Millisecond)
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
|
|
sub2, err := nc.Subscribe("close", func(_ *nats.Msg) {
|
|
// The "close" message was sent/received lat, but
|
|
// because we are dealing with different subscriptions,
|
|
// which are dispatched by different dispatchers, and
|
|
// because the "pub" subscription is delayed, this
|
|
// callback is likely to be invoked before the sub1's
|
|
// second callback is invoked. Using the Barrier call
|
|
// here will ensure that the given function will be invoked
|
|
// after the preceding messages have been dispatched.
|
|
nc.Barrier(func() {
|
|
res := atomic.LoadInt32(&pubMsgs) == 2
|
|
ch <- res
|
|
})
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
|
|
// Send 2 "pub" messages followed by a "close" message
|
|
for i := 0; i < 2; i++ {
|
|
if err := nc.Publish("pub", []byte("pub msg")); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
}
|
|
if err := nc.Publish("close", []byte("closing")); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
|
|
select {
|
|
case ok := <-ch:
|
|
if !ok {
|
|
t.Fatal("The barrier function was invoked before the second message")
|
|
}
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Waited for too long...")
|
|
}
|
|
|
|
// Remove all subs
|
|
sub1.Unsubscribe()
|
|
sub2.Unsubscribe()
|
|
|
|
// Barrier should be invoked in place. Since we use buffered channel
|
|
// we are ok.
|
|
nc.Barrier(func() { ch <- true })
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Barrier function was not invoked")
|
|
}
|
|
|
|
if _, err := nc.Subscribe("foo", func(m *nats.Msg) {
|
|
// To check that the Barrier() function works if the subscription
|
|
// is unsubscribed after the call was made, sleep a bit here.
|
|
time.Sleep(250 * time.Millisecond)
|
|
m.Sub.Unsubscribe()
|
|
}); err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := nc.Publish("foo", []byte("hello")); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
// We need to Flush here to make sure that message has been received
|
|
// and posted to subscription's internal queue before calling Barrier.
|
|
if err := nc.Flush(); err != nil {
|
|
t.Fatalf("Error on flush: %v", err)
|
|
}
|
|
nc.Barrier(func() { ch <- true })
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Barrier function was not invoked")
|
|
}
|
|
|
|
// Test with AutoUnsubscribe now...
|
|
sub1, err = nc.Subscribe("foo", func(m *nats.Msg) {
|
|
// Since we auto-unsubscribe with 1, there should not be another
|
|
// invocation of this callback, but the Barrier should still be
|
|
// invoked.
|
|
nc.Barrier(func() { ch <- true })
|
|
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
sub1.AutoUnsubscribe(1)
|
|
// Send 2 messages and flush
|
|
for i := 0; i < 2; i++ {
|
|
if err := nc.Publish("foo", []byte("hello")); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
}
|
|
if err := nc.Flush(); err != nil {
|
|
t.Fatalf("Error on flush: %v", err)
|
|
}
|
|
// Check barrier was invoked
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Barrier function was not invoked")
|
|
}
|
|
|
|
// Check that Barrier only affects asynchronous subscriptions
|
|
sub1, err = nc.Subscribe("foo", func(m *nats.Msg) {
|
|
nc.Barrier(func() { ch <- true })
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
syncSub, err := nc.SubscribeSync("foo")
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
msgChan := make(chan *nats.Msg, 1)
|
|
chanSub, err := nc.ChanSubscribe("foo", msgChan)
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := nc.Publish("foo", []byte("hello")); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
if err := nc.Flush(); err != nil {
|
|
t.Fatalf("Error on flush: %v", err)
|
|
}
|
|
// Check barrier was invoked even if we did not yet consume
|
|
// from the 2 other type of subscriptions
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Barrier function was not invoked")
|
|
}
|
|
if _, err := syncSub.NextMsg(time.Second); err != nil {
|
|
t.Fatalf("Sync sub did not receive the message")
|
|
}
|
|
select {
|
|
case <-msgChan:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("Chan sub did not receive the message")
|
|
}
|
|
chanSub.Unsubscribe()
|
|
syncSub.Unsubscribe()
|
|
sub1.Unsubscribe()
|
|
|
|
atomic.StoreInt32(&pubMsgs, 0)
|
|
// Check barrier does not prevent new messages to be delivered.
|
|
sub1, err = nc.Subscribe("foo", func(_ *nats.Msg) {
|
|
if pm := atomic.AddInt32(&pubMsgs, 1); pm == 1 {
|
|
nc.Barrier(func() {
|
|
nc.Publish("foo", []byte("second"))
|
|
nc.Flush()
|
|
})
|
|
} else if pm == 2 {
|
|
ch <- true
|
|
}
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := nc.Publish("foo", []byte("first")); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Barrier function was not invoked")
|
|
}
|
|
sub1.Unsubscribe()
|
|
|
|
// Check that barrier works if called before connection
|
|
// is closed.
|
|
if _, err := nc.Subscribe("bar", func(_ *nats.Msg) {
|
|
nc.Barrier(func() { ch <- true })
|
|
nc.Close()
|
|
}); err != nil {
|
|
t.Fatalf("Error on subscribe: %v", err)
|
|
}
|
|
if err := nc.Publish("bar", []byte("hello")); err != nil {
|
|
t.Fatalf("Error on publish: %v", err)
|
|
}
|
|
// This could fail if the connection is closed before we get
|
|
// here.
|
|
nc.Flush()
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Barrier function was not invoked")
|
|
}
|
|
|
|
// Finally, check that if connection is closed, Barrier returns
|
|
// an error.
|
|
if err := nc.Barrier(func() { ch <- true }); err != nats.ErrConnectionClosed {
|
|
t.Fatalf("Expected error %v, got %v", nats.ErrConnectionClosed, err)
|
|
}
|
|
|
|
// Check that one can call connection methods from Barrier
|
|
// when there is no async subscriptions
|
|
nc = NewDefaultConnection(t)
|
|
defer nc.Close()
|
|
|
|
if err := nc.Barrier(func() {
|
|
ch <- nc.TLSRequired()
|
|
}); err != nil {
|
|
t.Fatalf("Error on Barrier: %v", err)
|
|
}
|
|
if err := Wait(ch); err != nil {
|
|
t.Fatal("Barrier was blocked")
|
|
}
|
|
}
|
|
|
|
func TestReceiveInfoRightAfterFirstPong(t *testing.T) {
|
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
t.Fatalf("Error on listen: %v", err)
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
addr := tl.Addr().(*net.TCPAddr)
|
|
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
c, err := tl.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer c.Close()
|
|
// Send the initial INFO
|
|
c.Write([]byte("INFO {}\r\n"))
|
|
buf := make([]byte, 0, 100)
|
|
b := make([]byte, 100)
|
|
for {
|
|
n, err := c.Read(b)
|
|
if err != nil {
|
|
return
|
|
}
|
|
buf = append(buf, b[:n]...)
|
|
if bytes.Contains(buf, []byte("PING\r\n")) {
|
|
break
|
|
}
|
|
}
|
|
// Send PONG and following INFO in one go (or at least try).
|
|
// The processing of PONG in sendConnect() should leave the
|
|
// rest for the readLoop to process.
|
|
c.Write([]byte(fmt.Sprintf("PONG\r\nINFO {\"connect_urls\":[\"127.0.0.1:%d\", \"me:1\"]}\r\n", addr.Port)))
|
|
// Wait for client to disconnect
|
|
for {
|
|
if _, err := c.Read(buf); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
nc, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", addr.Port))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
var (
|
|
ds []string
|
|
timeout = time.Now().Add(2 * time.Second)
|
|
ok = false
|
|
)
|
|
for time.Now().Before(timeout) {
|
|
ds = nc.DiscoveredServers()
|
|
if len(ds) == 1 && ds[0] == "nats://me:1" {
|
|
ok = true
|
|
break
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
nc.Close()
|
|
wg.Wait()
|
|
if !ok {
|
|
t.Fatalf("Unexpected discovered servers: %v", ds)
|
|
}
|
|
}
|
|
|
|
func TestReceiveInfoWithEmptyConnectURLs(t *testing.T) {
|
|
ready := make(chan bool, 2)
|
|
ch := make(chan bool, 1)
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
ports := []int{4222, 4223}
|
|
for i := 0; i < 2; i++ {
|
|
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", ports[i]))
|
|
if err != nil {
|
|
t.Fatalf("Error on listen: %v", err)
|
|
}
|
|
tl := l.(*net.TCPListener)
|
|
defer tl.Close()
|
|
|
|
ready <- true
|
|
|
|
c, err := tl.Accept()
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer c.Close()
|
|
|
|
// Send the initial INFO
|
|
c.Write([]byte(fmt.Sprintf("INFO {\"server_id\":\"server%d\"}\r\n", (i + 1))))
|
|
buf := make([]byte, 0, 100)
|
|
b := make([]byte, 100)
|
|
for {
|
|
n, err := c.Read(b)
|
|
if err != nil {
|
|
return
|
|
}
|
|
buf = append(buf, b[:n]...)
|
|
if bytes.Contains(buf, []byte("PING\r\n")) {
|
|
break
|
|
}
|
|
}
|
|
if i == 0 {
|
|
// Send PONG and following INFO in one go (or at least try).
|
|
// The processing of PONG in sendConnect() should leave the
|
|
// rest for the readLoop to process.
|
|
c.Write([]byte("PONG\r\nINFO {\"server_id\":\"server1\",\"connect_urls\":[\"127.0.0.1:4222\", \"127.0.0.1:4223\", \"127.0.0.1:4224\"]}\r\n"))
|
|
// Wait for the notication
|
|
<-ch
|
|
// Close the connection in our side and go back into accept
|
|
c.Close()
|
|
} else {
|
|
// Send no connect ULRs (as if this was an older server that could in some cases
|
|
// send an empty array)
|
|
c.Write([]byte(fmt.Sprintf("PONG\r\nINFO {\"server_id\":\"server2\"}\r\n")))
|
|
// Wait for client to disconnect
|
|
for {
|
|
if _, err := c.Read(buf); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait for listener to be up and running
|
|
if err := Wait(ready); err != nil {
|
|
t.Fatal("Listener not ready")
|
|
}
|
|
|
|
rch := make(chan bool)
|
|
nc, err := nats.Connect("nats://127.0.0.1:4222",
|
|
nats.ReconnectWait(50*time.Millisecond),
|
|
nats.ReconnectHandler(func(_ *nats.Conn) {
|
|
rch <- true
|
|
}))
|
|
if err != nil {
|
|
t.Fatalf("Error on connect: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
var (
|
|
ds []string
|
|
timeout = time.Now().Add(2 * time.Second)
|
|
ok = false
|
|
)
|
|
for time.Now().Before(timeout) {
|
|
ds = nc.DiscoveredServers()
|
|
if len(ds) == 2 {
|
|
if (ds[0] == "nats://127.0.0.1:4223" && ds[1] == "nats://127.0.0.1:4224") ||
|
|
(ds[0] == "nats://127.0.0.1:4224" && ds[1] == "nats://127.0.0.1:4223") {
|
|
ok = true
|
|
break
|
|
}
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
if !ok {
|
|
t.Fatalf("Unexpected discovered servers: %v", ds)
|
|
}
|
|
// Make the server close our connection
|
|
ch <- true
|
|
// Wait for the reconnect
|
|
if err := Wait(rch); err != nil {
|
|
t.Fatal("Did not reconnect")
|
|
}
|
|
// Discovered servers should still contain nats://me:1
|
|
ds = nc.DiscoveredServers()
|
|
if len(ds) != 2 ||
|
|
!((ds[0] == "nats://127.0.0.1:4223" && ds[1] == "nats://127.0.0.1:4224") ||
|
|
(ds[0] == "nats://127.0.0.1:4224" && ds[1] == "nats://127.0.0.1:4223")) {
|
|
t.Fatalf("Unexpected discovered servers list: %v", ds)
|
|
}
|
|
nc.Close()
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestConnectWithSimplifiedURLs(t *testing.T) {
|
|
urls := []string{
|
|
"nats://127.0.0.1:4222",
|
|
"nats://127.0.0.1:",
|
|
"nats://127.0.0.1",
|
|
"127.0.0.1:",
|
|
"127.0.0.1",
|
|
}
|
|
|
|
connect := func(t *testing.T, url string) {
|
|
t.Helper()
|
|
nc, err := nats.Connect(url)
|
|
if err != nil {
|
|
t.Fatalf("URL %q expected to connect, got %v", url, err)
|
|
}
|
|
nc.Close()
|
|
}
|
|
|
|
// Start a server that listens on default port 4222.
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
// Try for every connection in the urls array.
|
|
for _, u := range urls {
|
|
connect(t, u)
|
|
}
|
|
|
|
s.Shutdown()
|
|
|
|
// Use this to build the options for us...
|
|
s, opts := RunServerWithConfig("configs/tls.conf")
|
|
s.Shutdown()
|
|
// Now change listen port to 4222 and remove auth
|
|
opts.Port = 4222
|
|
opts.Username = ""
|
|
opts.Password = ""
|
|
// and restart the server
|
|
s = RunServerWithOptions(*opts)
|
|
defer s.Shutdown()
|
|
|
|
// Test again against a server that wants TLS and check
|
|
// that we automatically switch to Secure.
|
|
for _, u := range urls {
|
|
connect(t, u)
|
|
}
|
|
}
|
|
|
|
func TestNilOpts(t *testing.T) {
|
|
s := RunDefaultServer()
|
|
defer s.Shutdown()
|
|
|
|
// Test a single nil option
|
|
var o1, o2, o3 nats.Option
|
|
_, err := nats.Connect(nats.DefaultURL, o1)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error with one nil option: %v", err)
|
|
}
|
|
|
|
// Test nil, opt, nil
|
|
o2 = nats.ReconnectBufSize(2222)
|
|
nc, err := nats.Connect(nats.DefaultURL, o1, o2, o3)
|
|
if err != nil {
|
|
t.Fatalf("Unexpected error with multiple nil options: %v", err)
|
|
}
|
|
// check that the opt was set
|
|
if nc.Opts.ReconnectBufSize != 2222 {
|
|
t.Fatal("Unexpected error: option not set.")
|
|
}
|
|
}
|