
The big change is that the GeoJSON package has been completely rewritten to fix a few of geometry calculation bugs, increase performance, and to better follow the GeoJSON spec RFC 7946. GeoJSON updates - A LineString now requires at least two points. - All json members, even foreign, now persist with the object. - The bbox member persists too but is no longer used for geometry calculations. This is change in behavior. Previously Tile38 would treat the bbox as the object's physical rectangle. - Corrections to geometry intersects and within calculations. Faster spatial queries - The performance of Point-in-polygon and object intersect operations are greatly improved for complex polygons and line strings. It went from O(n) to roughly O(log n). - The same for all collection types with many children, including FeatureCollection, GeometryCollection, MultiPoint, MultiLineString, and MultiPolygon. Codebase changes - The pkg directory has been renamed to internal - The GeoJSON internal package has been moved to a seperate repo at https://github.com/tidwall/geojson. It's now vendored. Please look out for higher memory usage for datasets using complex shapes. A complex shape is one that has 64 or more points. For these shapes it's expected that there will be increase of least 54 bytes per point.
3478 lines
87 KiB
Go
3478 lines
87 KiB
Go
// Copyright 2012-2018 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// A Go client for the NATS messaging system (https://nats.io).
|
|
package nats
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net"
|
|
"net/url"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/nats-io/go-nats/util"
|
|
"github.com/nats-io/nuid"
|
|
)
|
|
|
|
// Default Constants
|
|
const (
|
|
Version = "1.6.0"
|
|
DefaultURL = "nats://localhost:4222"
|
|
DefaultPort = 4222
|
|
DefaultMaxReconnect = 60
|
|
DefaultReconnectWait = 2 * time.Second
|
|
DefaultTimeout = 2 * time.Second
|
|
DefaultPingInterval = 2 * time.Minute
|
|
DefaultMaxPingOut = 2
|
|
DefaultMaxChanLen = 8192 // 8k
|
|
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
|
|
RequestChanLen = 8
|
|
DefaultDrainTimeout = 30 * time.Second
|
|
LangString = "go"
|
|
)
|
|
|
|
// STALE_CONNECTION is for detection and proper handling of stale connections.
|
|
const STALE_CONNECTION = "stale connection"
|
|
|
|
// PERMISSIONS_ERR is for when nats server subject authorization has failed.
|
|
const PERMISSIONS_ERR = "permissions violation"
|
|
|
|
// AUTHORIZATION_ERR is for when nats server user authorization has failed.
|
|
const AUTHORIZATION_ERR = "authorization violation"
|
|
|
|
// Errors
|
|
var (
|
|
ErrConnectionClosed = errors.New("nats: connection closed")
|
|
ErrConnectionDraining = errors.New("nats: connection draining")
|
|
ErrDrainTimeout = errors.New("nats: draining connection timed out")
|
|
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
|
|
ErrSecureConnRequired = errors.New("nats: secure connection required")
|
|
ErrSecureConnWanted = errors.New("nats: secure connection not available")
|
|
ErrBadSubscription = errors.New("nats: invalid subscription")
|
|
ErrTypeSubscription = errors.New("nats: invalid subscription type")
|
|
ErrBadSubject = errors.New("nats: invalid subject")
|
|
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
|
|
ErrTimeout = errors.New("nats: timeout")
|
|
ErrBadTimeout = errors.New("nats: timeout invalid")
|
|
ErrAuthorization = errors.New("nats: authorization violation")
|
|
ErrNoServers = errors.New("nats: no servers available for connection")
|
|
ErrJsonParse = errors.New("nats: connect message, json parse error")
|
|
ErrChanArg = errors.New("nats: argument needs to be a channel type")
|
|
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
|
|
ErrMaxMessages = errors.New("nats: maximum messages delivered")
|
|
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
|
|
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
|
|
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
|
|
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
|
|
ErrInvalidConnection = errors.New("nats: invalid connection")
|
|
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
|
|
ErrInvalidArg = errors.New("nats: invalid argument")
|
|
ErrInvalidContext = errors.New("nats: invalid context")
|
|
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
|
|
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
|
|
)
|
|
|
|
// GetDefaultOptions returns default configuration options for the client.
|
|
func GetDefaultOptions() Options {
|
|
return Options{
|
|
AllowReconnect: true,
|
|
MaxReconnect: DefaultMaxReconnect,
|
|
ReconnectWait: DefaultReconnectWait,
|
|
Timeout: DefaultTimeout,
|
|
PingInterval: DefaultPingInterval,
|
|
MaxPingsOut: DefaultMaxPingOut,
|
|
SubChanLen: DefaultMaxChanLen,
|
|
ReconnectBufSize: DefaultReconnectBufSize,
|
|
DrainTimeout: DefaultDrainTimeout,
|
|
}
|
|
}
|
|
|
|
// DEPRECATED: Use GetDefaultOptions() instead.
|
|
// DefaultOptions is not safe for use by multiple clients.
|
|
// For details see #308.
|
|
var DefaultOptions = GetDefaultOptions()
|
|
|
|
// Status represents the state of the connection.
|
|
type Status int
|
|
|
|
const (
|
|
DISCONNECTED = Status(iota)
|
|
CONNECTED
|
|
CLOSED
|
|
RECONNECTING
|
|
CONNECTING
|
|
DRAINING_SUBS
|
|
DRAINING_PUBS
|
|
)
|
|
|
|
// ConnHandler is used for asynchronous events such as
|
|
// disconnected and closed connections.
|
|
type ConnHandler func(*Conn)
|
|
|
|
// ErrHandler is used to process asynchronous errors encountered
|
|
// while processing inbound messages.
|
|
type ErrHandler func(*Conn, *Subscription, error)
|
|
|
|
// asyncCB is used to preserve order for async callbacks.
|
|
type asyncCB struct {
|
|
f func()
|
|
next *asyncCB
|
|
}
|
|
|
|
type asyncCallbacksHandler struct {
|
|
mu sync.Mutex
|
|
cond *sync.Cond
|
|
head *asyncCB
|
|
tail *asyncCB
|
|
}
|
|
|
|
// Option is a function on the options for a connection.
|
|
type Option func(*Options) error
|
|
|
|
// CustomDialer can be used to specify any dialer, not necessarily
|
|
// a *net.Dialer.
|
|
type CustomDialer interface {
|
|
Dial(network, address string) (net.Conn, error)
|
|
}
|
|
|
|
// Options can be used to create a customized connection.
|
|
type Options struct {
|
|
|
|
// Url represents a single NATS server url to which the client
|
|
// will be connecting. If the Servers option is also set, it
|
|
// then becomes the first server in the Servers array.
|
|
Url string
|
|
|
|
// Servers is a configured set of servers which this client
|
|
// will use when attempting to connect.
|
|
Servers []string
|
|
|
|
// NoRandomize configures whether we will randomize the
|
|
// server pool.
|
|
NoRandomize bool
|
|
|
|
// NoEcho configures whether the server will echo back messages
|
|
// that are sent on this connection if we also have matching subscriptions.
|
|
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
|
|
NoEcho bool
|
|
|
|
// Name is an optional name label which will be sent to the server
|
|
// on CONNECT to identify the client.
|
|
Name string
|
|
|
|
// Verbose signals the server to send an OK ack for commands
|
|
// successfully processed by the server.
|
|
Verbose bool
|
|
|
|
// Pedantic signals the server whether it should be doing further
|
|
// validation of subjects.
|
|
Pedantic bool
|
|
|
|
// Secure enables TLS secure connections that skip server
|
|
// verification by default. NOT RECOMMENDED.
|
|
Secure bool
|
|
|
|
// TLSConfig is a custom TLS configuration to use for secure
|
|
// transports.
|
|
TLSConfig *tls.Config
|
|
|
|
// AllowReconnect enables reconnection logic to be used when we
|
|
// encounter a disconnect from the current server.
|
|
AllowReconnect bool
|
|
|
|
// MaxReconnect sets the number of reconnect attempts that will be
|
|
// tried before giving up. If negative, then it will never give up
|
|
// trying to reconnect.
|
|
MaxReconnect int
|
|
|
|
// ReconnectWait sets the time to backoff after attempting a reconnect
|
|
// to a server that we were already connected to previously.
|
|
ReconnectWait time.Duration
|
|
|
|
// Timeout sets the timeout for a Dial operation on a connection.
|
|
Timeout time.Duration
|
|
|
|
// DrainTimeout sets the timeout for a Drain Operation to complete.
|
|
DrainTimeout time.Duration
|
|
|
|
// FlusherTimeout is the maximum time to wait for the flusher loop
|
|
// to be able to finish writing to the underlying connection.
|
|
FlusherTimeout time.Duration
|
|
|
|
// PingInterval is the period at which the client will be sending ping
|
|
// commands to the server, disabled if 0 or negative.
|
|
PingInterval time.Duration
|
|
|
|
// MaxPingsOut is the maximum number of pending ping commands that can
|
|
// be awaiting a response before raising an ErrStaleConnection error.
|
|
MaxPingsOut int
|
|
|
|
// ClosedCB sets the closed handler that is called when a client will
|
|
// no longer be connected.
|
|
ClosedCB ConnHandler
|
|
|
|
// DisconnectedCB sets the disconnected handler that is called
|
|
// whenever the connection is disconnected.
|
|
DisconnectedCB ConnHandler
|
|
|
|
// ReconnectedCB sets the reconnected handler called whenever
|
|
// the connection is successfully reconnected.
|
|
ReconnectedCB ConnHandler
|
|
|
|
// DiscoveredServersCB sets the callback that is invoked whenever a new
|
|
// server has joined the cluster.
|
|
DiscoveredServersCB ConnHandler
|
|
|
|
// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
|
|
AsyncErrorCB ErrHandler
|
|
|
|
// ReconnectBufSize is the size of the backing bufio during reconnect.
|
|
// Once this has been exhausted publish operations will return an error.
|
|
ReconnectBufSize int
|
|
|
|
// SubChanLen is the size of the buffered channel used between the socket
|
|
// Go routine and the message delivery for SyncSubscriptions.
|
|
// NOTE: This does not affect AsyncSubscriptions which are
|
|
// dictated by PendingLimits()
|
|
SubChanLen int
|
|
|
|
// User sets the username to be used when connecting to the server.
|
|
User string
|
|
|
|
// Password sets the password to be used when connecting to a server.
|
|
Password string
|
|
|
|
// Token sets the token to be used when connecting to a server.
|
|
Token string
|
|
|
|
// Dialer allows a custom net.Dialer when forming connections.
|
|
// DEPRECATED: should use CustomDialer instead.
|
|
Dialer *net.Dialer
|
|
|
|
// CustomDialer allows to specify a custom dialer (not necessarily
|
|
// a *net.Dialer).
|
|
CustomDialer CustomDialer
|
|
|
|
// UseOldRequestStyle forces the old method of Requests that utilize
|
|
// a new Inbox and a new Subscription for each request.
|
|
UseOldRequestStyle bool
|
|
}
|
|
|
|
const (
|
|
// Scratch storage for assembling protocol headers
|
|
scratchSize = 512
|
|
|
|
// The size of the bufio reader/writer on top of the socket.
|
|
defaultBufSize = 32768
|
|
|
|
// The buffered size of the flush "kick" channel
|
|
flushChanSize = 1024
|
|
|
|
// Default server pool size
|
|
srvPoolSize = 4
|
|
|
|
// NUID size
|
|
nuidSize = 22
|
|
|
|
// Default port used if none is specified in given URL(s)
|
|
defaultPortString = "4222"
|
|
)
|
|
|
|
// A Conn represents a bare connection to a nats-server.
|
|
// It can send and receive []byte payloads.
|
|
type Conn struct {
|
|
// Keep all members for which we use atomic at the beginning of the
|
|
// struct and make sure they are all 64bits (or use padding if necessary).
|
|
// atomic.* functions crash on 32bit machines if operand is not aligned
|
|
// at 64bit. See https://github.com/golang/go/issues/599
|
|
Statistics
|
|
mu sync.Mutex
|
|
// Opts holds the configuration of the Conn.
|
|
// Modifying the configuration of a running Conn is a race.
|
|
Opts Options
|
|
wg sync.WaitGroup
|
|
url *url.URL
|
|
conn net.Conn
|
|
srvPool []*srv
|
|
urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
|
|
bw *bufio.Writer
|
|
pending *bytes.Buffer
|
|
fch chan struct{}
|
|
info serverInfo
|
|
ssid int64
|
|
subsMu sync.RWMutex
|
|
subs map[int64]*Subscription
|
|
ach *asyncCallbacksHandler
|
|
pongs []chan struct{}
|
|
scratch [scratchSize]byte
|
|
status Status
|
|
initc bool // true if the connection is performing the initial connect
|
|
err error
|
|
ps *parseState
|
|
ptmr *time.Timer
|
|
pout int
|
|
|
|
// New style response handler
|
|
respSub string // The wildcard subject
|
|
respMux *Subscription // A single response subscription
|
|
respMap map[string]chan *Msg // Request map for the response msg channels
|
|
respSetup sync.Once // Ensures response subscription occurs once
|
|
}
|
|
|
|
// A Subscription represents interest in a given subject.
|
|
type Subscription struct {
|
|
mu sync.Mutex
|
|
sid int64
|
|
|
|
// Subject that represents this subscription. This can be different
|
|
// than the received subject inside a Msg if this is a wildcard.
|
|
Subject string
|
|
|
|
// Optional queue group name. If present, all subscriptions with the
|
|
// same name will form a distributed queue, and each message will
|
|
// only be processed by one member of the group.
|
|
Queue string
|
|
|
|
delivered uint64
|
|
max uint64
|
|
conn *Conn
|
|
mcb MsgHandler
|
|
mch chan *Msg
|
|
closed bool
|
|
sc bool
|
|
connClosed bool
|
|
|
|
// Type of Subscription
|
|
typ SubscriptionType
|
|
|
|
// Async linked list
|
|
pHead *Msg
|
|
pTail *Msg
|
|
pCond *sync.Cond
|
|
|
|
// Pending stats, async subscriptions, high-speed etc.
|
|
pMsgs int
|
|
pBytes int
|
|
pMsgsMax int
|
|
pBytesMax int
|
|
pMsgsLimit int
|
|
pBytesLimit int
|
|
dropped int
|
|
}
|
|
|
|
// Msg is a structure used by Subscribers and PublishMsg().
|
|
type Msg struct {
|
|
Subject string
|
|
Reply string
|
|
Data []byte
|
|
Sub *Subscription
|
|
next *Msg
|
|
barrier *barrierInfo
|
|
}
|
|
|
|
type barrierInfo struct {
|
|
refs int64
|
|
f func()
|
|
}
|
|
|
|
// Tracks various stats received and sent on this connection,
|
|
// including counts for messages and bytes.
|
|
type Statistics struct {
|
|
InMsgs uint64
|
|
OutMsgs uint64
|
|
InBytes uint64
|
|
OutBytes uint64
|
|
Reconnects uint64
|
|
}
|
|
|
|
// Tracks individual backend servers.
|
|
type srv struct {
|
|
url *url.URL
|
|
didConnect bool
|
|
reconnects int
|
|
lastAttempt time.Time
|
|
isImplicit bool
|
|
}
|
|
|
|
type serverInfo struct {
|
|
Id string `json:"server_id"`
|
|
Host string `json:"host"`
|
|
Port uint `json:"port"`
|
|
Version string `json:"version"`
|
|
AuthRequired bool `json:"auth_required"`
|
|
TLSRequired bool `json:"tls_required"`
|
|
MaxPayload int64 `json:"max_payload"`
|
|
ConnectURLs []string `json:"connect_urls,omitempty"`
|
|
Proto int `json:"proto,omitempty"`
|
|
}
|
|
|
|
const (
|
|
// clientProtoZero is the original client protocol from 2009.
|
|
// http://nats.io/documentation/internals/nats-protocol/
|
|
/* clientProtoZero */ _ = iota
|
|
// clientProtoInfo signals a client can receive more then the original INFO block.
|
|
// This can be used to update clients on other cluster members, etc.
|
|
clientProtoInfo
|
|
)
|
|
|
|
type connectInfo struct {
|
|
Verbose bool `json:"verbose"`
|
|
Pedantic bool `json:"pedantic"`
|
|
User string `json:"user,omitempty"`
|
|
Pass string `json:"pass,omitempty"`
|
|
Token string `json:"auth_token,omitempty"`
|
|
TLS bool `json:"tls_required"`
|
|
Name string `json:"name"`
|
|
Lang string `json:"lang"`
|
|
Version string `json:"version"`
|
|
Protocol int `json:"protocol"`
|
|
Echo bool `json:"echo"`
|
|
}
|
|
|
|
// MsgHandler is a callback function that processes messages delivered to
|
|
// asynchronous subscribers.
|
|
type MsgHandler func(msg *Msg)
|
|
|
|
// Connect will attempt to connect to the NATS system.
|
|
// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
|
|
// Comma separated arrays are also supported, e.g. urlA, urlB.
|
|
// Options start with the defaults but can be overridden.
|
|
func Connect(url string, options ...Option) (*Conn, error) {
|
|
opts := GetDefaultOptions()
|
|
opts.Servers = processUrlString(url)
|
|
for _, opt := range options {
|
|
if opt != nil {
|
|
if err := opt(&opts); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
return opts.Connect()
|
|
}
|
|
|
|
// Options that can be passed to Connect.
|
|
|
|
// Name is an Option to set the client name.
|
|
func Name(name string) Option {
|
|
return func(o *Options) error {
|
|
o.Name = name
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Secure is an Option to enable TLS secure connections that skip server verification by default.
|
|
// Pass a TLS Configuration for proper TLS.
|
|
func Secure(tls ...*tls.Config) Option {
|
|
return func(o *Options) error {
|
|
o.Secure = true
|
|
// Use of variadic just simplifies testing scenarios. We only take the first one.
|
|
// fixme(DLC) - Could panic if more than one. Could also do TLS option.
|
|
if len(tls) > 1 {
|
|
return ErrMultipleTLSConfigs
|
|
}
|
|
if len(tls) == 1 {
|
|
o.TLSConfig = tls[0]
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is
|
|
// not already set this will set it as well.
|
|
func RootCAs(file ...string) Option {
|
|
return func(o *Options) error {
|
|
pool := x509.NewCertPool()
|
|
for _, f := range file {
|
|
rootPEM, err := ioutil.ReadFile(f)
|
|
if err != nil || rootPEM == nil {
|
|
return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
|
|
}
|
|
ok := pool.AppendCertsFromPEM(rootPEM)
|
|
if !ok {
|
|
return fmt.Errorf("nats: failed to parse root certificate from %q", f)
|
|
}
|
|
}
|
|
if o.TLSConfig == nil {
|
|
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
|
|
}
|
|
o.TLSConfig.RootCAs = pool
|
|
o.Secure = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ClientCert is a helper option to provide the client certificate from a file. If Secure is
|
|
// not already set this will set it as well
|
|
func ClientCert(certFile, keyFile string) Option {
|
|
return func(o *Options) error {
|
|
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
|
|
if err != nil {
|
|
return fmt.Errorf("nats: error loading client certificate: %v", err)
|
|
}
|
|
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
|
|
if err != nil {
|
|
return fmt.Errorf("nats: error parsing client certificate: %v", err)
|
|
}
|
|
if o.TLSConfig == nil {
|
|
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
|
|
}
|
|
o.TLSConfig.Certificates = []tls.Certificate{cert}
|
|
o.Secure = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NoReconnect is an Option to turn off reconnect behavior.
|
|
func NoReconnect() Option {
|
|
return func(o *Options) error {
|
|
o.AllowReconnect = false
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// DontRandomize is an Option to turn off randomizing the server pool.
|
|
func DontRandomize() Option {
|
|
return func(o *Options) error {
|
|
o.NoRandomize = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NoEcho is an Option to turn off messages echoing back from a server.
|
|
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
|
|
func NoEcho() Option {
|
|
return func(o *Options) error {
|
|
o.NoEcho = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ReconnectWait is an Option to set the wait time between reconnect attempts.
|
|
func ReconnectWait(t time.Duration) Option {
|
|
return func(o *Options) error {
|
|
o.ReconnectWait = t
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// MaxReconnects is an Option to set the maximum number of reconnect attempts.
|
|
func MaxReconnects(max int) Option {
|
|
return func(o *Options) error {
|
|
o.MaxReconnect = max
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// PingInterval is an Option to set the period for client ping commands
|
|
func PingInterval(t time.Duration) Option {
|
|
return func(o *Options) error {
|
|
o.PingInterval = t
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ReconnectBufSize sets the buffer size of messages kept while busy reconnecting
|
|
func ReconnectBufSize(size int) Option {
|
|
return func(o *Options) error {
|
|
o.ReconnectBufSize = size
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Timeout is an Option to set the timeout for Dial on a connection.
|
|
func Timeout(t time.Duration) Option {
|
|
return func(o *Options) error {
|
|
o.Timeout = t
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// DrainTimeout is an Option to set the timeout for draining a connection.
|
|
func DrainTimeout(t time.Duration) Option {
|
|
return func(o *Options) error {
|
|
o.DrainTimeout = t
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// DisconnectHandler is an Option to set the disconnected handler.
|
|
func DisconnectHandler(cb ConnHandler) Option {
|
|
return func(o *Options) error {
|
|
o.DisconnectedCB = cb
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ReconnectHandler is an Option to set the reconnected handler.
|
|
func ReconnectHandler(cb ConnHandler) Option {
|
|
return func(o *Options) error {
|
|
o.ReconnectedCB = cb
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ClosedHandler is an Option to set the closed handler.
|
|
func ClosedHandler(cb ConnHandler) Option {
|
|
return func(o *Options) error {
|
|
o.ClosedCB = cb
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// DiscoveredServersHandler is an Option to set the new servers handler.
|
|
func DiscoveredServersHandler(cb ConnHandler) Option {
|
|
return func(o *Options) error {
|
|
o.DiscoveredServersCB = cb
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ErrorHandler is an Option to set the async error handler.
|
|
func ErrorHandler(cb ErrHandler) Option {
|
|
return func(o *Options) error {
|
|
o.AsyncErrorCB = cb
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// UserInfo is an Option to set the username and password to
|
|
// use when not included directly in the URLs.
|
|
func UserInfo(user, password string) Option {
|
|
return func(o *Options) error {
|
|
o.User = user
|
|
o.Password = password
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Token is an Option to set the token to use when not included
|
|
// directly in the URLs.
|
|
func Token(token string) Option {
|
|
return func(o *Options) error {
|
|
o.Token = token
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Dialer is an Option to set the dialer which will be used when
|
|
// attempting to establish a connection.
|
|
// DEPRECATED: Should use CustomDialer instead.
|
|
func Dialer(dialer *net.Dialer) Option {
|
|
return func(o *Options) error {
|
|
o.Dialer = dialer
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// SetCustomDialer is an Option to set a custom dialer which will be
|
|
// used when attempting to establish a connection. If both Dialer
|
|
// and CustomDialer are specified, CustomDialer takes precedence.
|
|
func SetCustomDialer(dialer CustomDialer) Option {
|
|
return func(o *Options) error {
|
|
o.CustomDialer = dialer
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// UseOldRequestStyle is an Option to force usage of the old Request style.
|
|
func UseOldRequestStyle() Option {
|
|
return func(o *Options) error {
|
|
o.UseOldRequestStyle = true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Handler processing
|
|
|
|
// SetDisconnectHandler will set the disconnect event handler.
|
|
func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) {
|
|
if nc == nil {
|
|
return
|
|
}
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
nc.Opts.DisconnectedCB = dcb
|
|
}
|
|
|
|
// SetReconnectHandler will set the reconnect event handler.
|
|
func (nc *Conn) SetReconnectHandler(rcb ConnHandler) {
|
|
if nc == nil {
|
|
return
|
|
}
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
nc.Opts.ReconnectedCB = rcb
|
|
}
|
|
|
|
// SetDiscoveredServersHandler will set the discovered servers handler.
|
|
func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) {
|
|
if nc == nil {
|
|
return
|
|
}
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
nc.Opts.DiscoveredServersCB = dscb
|
|
}
|
|
|
|
// SetClosedHandler will set the reconnect event handler.
|
|
func (nc *Conn) SetClosedHandler(cb ConnHandler) {
|
|
if nc == nil {
|
|
return
|
|
}
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
nc.Opts.ClosedCB = cb
|
|
}
|
|
|
|
// SetErrorHandler will set the async error handler.
|
|
func (nc *Conn) SetErrorHandler(cb ErrHandler) {
|
|
if nc == nil {
|
|
return
|
|
}
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
nc.Opts.AsyncErrorCB = cb
|
|
}
|
|
|
|
// Process the url string argument to Connect. Return an array of
|
|
// urls, even if only one.
|
|
func processUrlString(url string) []string {
|
|
urls := strings.Split(url, ",")
|
|
for i, s := range urls {
|
|
urls[i] = strings.TrimSpace(s)
|
|
}
|
|
return urls
|
|
}
|
|
|
|
// Connect will attempt to connect to a NATS server with multiple options.
|
|
func (o Options) Connect() (*Conn, error) {
|
|
nc := &Conn{Opts: o}
|
|
|
|
// Some default options processing.
|
|
if nc.Opts.MaxPingsOut == 0 {
|
|
nc.Opts.MaxPingsOut = DefaultMaxPingOut
|
|
}
|
|
// Allow old default for channel length to work correctly.
|
|
if nc.Opts.SubChanLen == 0 {
|
|
nc.Opts.SubChanLen = DefaultMaxChanLen
|
|
}
|
|
// Default ReconnectBufSize
|
|
if nc.Opts.ReconnectBufSize == 0 {
|
|
nc.Opts.ReconnectBufSize = DefaultReconnectBufSize
|
|
}
|
|
// Ensure that Timeout is not 0
|
|
if nc.Opts.Timeout == 0 {
|
|
nc.Opts.Timeout = DefaultTimeout
|
|
}
|
|
|
|
// Allow custom Dialer for connecting using DialTimeout by default
|
|
if nc.Opts.Dialer == nil {
|
|
nc.Opts.Dialer = &net.Dialer{
|
|
Timeout: nc.Opts.Timeout,
|
|
}
|
|
}
|
|
|
|
if err := nc.setupServerPool(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create the async callback handler.
|
|
nc.ach = &asyncCallbacksHandler{}
|
|
nc.ach.cond = sync.NewCond(&nc.ach.mu)
|
|
|
|
if err := nc.connect(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Spin up the async cb dispatcher on success
|
|
go nc.ach.asyncCBDispatcher()
|
|
|
|
return nc, nil
|
|
}
|
|
|
|
const (
|
|
_CRLF_ = "\r\n"
|
|
_EMPTY_ = ""
|
|
_SPC_ = " "
|
|
_PUB_P_ = "PUB "
|
|
)
|
|
|
|
const (
|
|
_OK_OP_ = "+OK"
|
|
_ERR_OP_ = "-ERR"
|
|
_PONG_OP_ = "PONG"
|
|
_INFO_OP_ = "INFO"
|
|
)
|
|
|
|
const (
|
|
conProto = "CONNECT %s" + _CRLF_
|
|
pingProto = "PING" + _CRLF_
|
|
pongProto = "PONG" + _CRLF_
|
|
subProto = "SUB %s %s %d" + _CRLF_
|
|
unsubProto = "UNSUB %d %s" + _CRLF_
|
|
okProto = _OK_OP_ + _CRLF_
|
|
)
|
|
|
|
// Return the currently selected server
|
|
func (nc *Conn) currentServer() (int, *srv) {
|
|
for i, s := range nc.srvPool {
|
|
if s == nil {
|
|
continue
|
|
}
|
|
if s.url == nc.url {
|
|
return i, s
|
|
}
|
|
}
|
|
return -1, nil
|
|
}
|
|
|
|
// Pop the current server and put onto the end of the list. Select head of list as long
|
|
// as number of reconnect attempts under MaxReconnect.
|
|
func (nc *Conn) selectNextServer() (*srv, error) {
|
|
i, s := nc.currentServer()
|
|
if i < 0 {
|
|
return nil, ErrNoServers
|
|
}
|
|
sp := nc.srvPool
|
|
num := len(sp)
|
|
copy(sp[i:num-1], sp[i+1:num])
|
|
maxReconnect := nc.Opts.MaxReconnect
|
|
if maxReconnect < 0 || s.reconnects < maxReconnect {
|
|
nc.srvPool[num-1] = s
|
|
} else {
|
|
nc.srvPool = sp[0 : num-1]
|
|
}
|
|
if len(nc.srvPool) <= 0 {
|
|
nc.url = nil
|
|
return nil, ErrNoServers
|
|
}
|
|
nc.url = nc.srvPool[0].url
|
|
return nc.srvPool[0], nil
|
|
}
|
|
|
|
// Will assign the correct server to the nc.Url
|
|
func (nc *Conn) pickServer() error {
|
|
nc.url = nil
|
|
if len(nc.srvPool) <= 0 {
|
|
return ErrNoServers
|
|
}
|
|
for _, s := range nc.srvPool {
|
|
if s != nil {
|
|
nc.url = s.url
|
|
return nil
|
|
}
|
|
}
|
|
return ErrNoServers
|
|
}
|
|
|
|
const tlsScheme = "tls"
|
|
|
|
// Create the server pool using the options given.
|
|
// We will place a Url option first, followed by any
|
|
// Server Options. We will randomize the server pool unless
|
|
// the NoRandomize flag is set.
|
|
func (nc *Conn) setupServerPool() error {
|
|
nc.srvPool = make([]*srv, 0, srvPoolSize)
|
|
nc.urls = make(map[string]struct{}, srvPoolSize)
|
|
|
|
// Create srv objects from each url string in nc.Opts.Servers
|
|
// and add them to the pool
|
|
for _, urlString := range nc.Opts.Servers {
|
|
if err := nc.addURLToPool(urlString, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Randomize if allowed to
|
|
if !nc.Opts.NoRandomize {
|
|
nc.shufflePool()
|
|
}
|
|
|
|
// Normally, if this one is set, Options.Servers should not be,
|
|
// but we always allowed that, so continue to do so.
|
|
if nc.Opts.Url != _EMPTY_ {
|
|
// Add to the end of the array
|
|
if err := nc.addURLToPool(nc.Opts.Url, false); err != nil {
|
|
return err
|
|
}
|
|
// Then swap it with first to guarantee that Options.Url is tried first.
|
|
last := len(nc.srvPool) - 1
|
|
if last > 0 {
|
|
nc.srvPool[0], nc.srvPool[last] = nc.srvPool[last], nc.srvPool[0]
|
|
}
|
|
} else if len(nc.srvPool) <= 0 {
|
|
// Place default URL if pool is empty.
|
|
if err := nc.addURLToPool(DefaultURL, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Check for Scheme hint to move to TLS mode.
|
|
for _, srv := range nc.srvPool {
|
|
if srv.url.Scheme == tlsScheme {
|
|
// FIXME(dlc), this is for all in the pool, should be case by case.
|
|
nc.Opts.Secure = true
|
|
if nc.Opts.TLSConfig == nil {
|
|
nc.Opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nc.pickServer()
|
|
}
|
|
|
|
// addURLToPool adds an entry to the server pool
|
|
func (nc *Conn) addURLToPool(sURL string, implicit bool) error {
|
|
if !strings.Contains(sURL, "://") {
|
|
sURL = "nats://" + sURL
|
|
}
|
|
var (
|
|
u *url.URL
|
|
err error
|
|
)
|
|
for i := 0; i < 2; i++ {
|
|
u, err = url.Parse(sURL)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if u.Port() != "" {
|
|
break
|
|
}
|
|
// In case given URL is of the form "localhost:", just add
|
|
// the port number at the end, otherwise, add ":4222".
|
|
if sURL[len(sURL)-1] != ':' {
|
|
sURL += ":"
|
|
}
|
|
sURL += defaultPortString
|
|
}
|
|
s := &srv{url: u, isImplicit: implicit}
|
|
nc.srvPool = append(nc.srvPool, s)
|
|
nc.urls[u.Host] = struct{}{}
|
|
return nil
|
|
}
|
|
|
|
// shufflePool swaps randomly elements in the server pool
|
|
func (nc *Conn) shufflePool() {
|
|
if len(nc.srvPool) <= 1 {
|
|
return
|
|
}
|
|
source := rand.NewSource(time.Now().UnixNano())
|
|
r := rand.New(source)
|
|
for i := range nc.srvPool {
|
|
j := r.Intn(i + 1)
|
|
nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
|
|
}
|
|
}
|
|
|
|
// createConn will connect to the server and wrap the appropriate
|
|
// bufio structures. It will do the right thing when an existing
|
|
// connection is in place.
|
|
func (nc *Conn) createConn() (err error) {
|
|
if nc.Opts.Timeout < 0 {
|
|
return ErrBadTimeout
|
|
}
|
|
if _, cur := nc.currentServer(); cur == nil {
|
|
return ErrNoServers
|
|
} else {
|
|
cur.lastAttempt = time.Now()
|
|
}
|
|
|
|
// CustomDialer takes precedence. If not set, use Opts.Dialer which
|
|
// is set to a default *net.Dialer (in Connect()) if not explicitly
|
|
// set by the user.
|
|
dialer := nc.Opts.CustomDialer
|
|
if dialer == nil {
|
|
dialer = nc.Opts.Dialer
|
|
}
|
|
nc.conn, err = dialer.Dial("tcp", nc.url.Host)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// No clue why, but this stalls and kills performance on Mac (Mavericks).
|
|
// https://code.google.com/p/go/issues/detail?id=6930
|
|
//if ip, ok := nc.conn.(*net.TCPConn); ok {
|
|
// ip.SetReadBuffer(defaultBufSize)
|
|
//}
|
|
|
|
if nc.pending != nil && nc.bw != nil {
|
|
// Move to pending buffer.
|
|
nc.bw.Flush()
|
|
}
|
|
nc.bw = bufio.NewWriterSize(nc.conn, defaultBufSize)
|
|
return nil
|
|
}
|
|
|
|
// makeTLSConn will wrap an existing Conn using TLS
|
|
func (nc *Conn) makeTLSConn() {
|
|
// Allow the user to configure their own tls.Config structure, otherwise
|
|
// default to InsecureSkipVerify.
|
|
// TODO(dlc) - We should make the more secure version the default.
|
|
if nc.Opts.TLSConfig != nil {
|
|
tlsCopy := util.CloneTLSConfig(nc.Opts.TLSConfig)
|
|
// If its blank we will override it with the current host
|
|
if tlsCopy.ServerName == _EMPTY_ {
|
|
h, _, _ := net.SplitHostPort(nc.url.Host)
|
|
tlsCopy.ServerName = h
|
|
}
|
|
nc.conn = tls.Client(nc.conn, tlsCopy)
|
|
} else {
|
|
nc.conn = tls.Client(nc.conn, &tls.Config{InsecureSkipVerify: true})
|
|
}
|
|
conn := nc.conn.(*tls.Conn)
|
|
conn.Handshake()
|
|
nc.bw = bufio.NewWriterSize(nc.conn, defaultBufSize)
|
|
}
|
|
|
|
// waitForExits will wait for all socket watcher Go routines to
|
|
// be shutdown before proceeding.
|
|
func (nc *Conn) waitForExits() {
|
|
// Kick old flusher forcefully.
|
|
select {
|
|
case nc.fch <- struct{}{}:
|
|
default:
|
|
}
|
|
|
|
// Wait for any previous go routines.
|
|
nc.wg.Wait()
|
|
}
|
|
|
|
// Report the connected server's Url
|
|
func (nc *Conn) ConnectedUrl() string {
|
|
if nc == nil {
|
|
return _EMPTY_
|
|
}
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
if nc.status != CONNECTED {
|
|
return _EMPTY_
|
|
}
|
|
return nc.url.String()
|
|
}
|
|
|
|
// Report the connected server's Id
|
|
func (nc *Conn) ConnectedServerId() string {
|
|
if nc == nil {
|
|
return _EMPTY_
|
|
}
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
if nc.status != CONNECTED {
|
|
return _EMPTY_
|
|
}
|
|
return nc.info.Id
|
|
}
|
|
|
|
// Low level setup for structs, etc
|
|
func (nc *Conn) setup() {
|
|
nc.subs = make(map[int64]*Subscription)
|
|
nc.pongs = make([]chan struct{}, 0, 8)
|
|
|
|
nc.fch = make(chan struct{}, flushChanSize)
|
|
|
|
// Setup scratch outbound buffer for PUB
|
|
pub := nc.scratch[:len(_PUB_P_)]
|
|
copy(pub, _PUB_P_)
|
|
}
|
|
|
|
// Process a connected connection and initialize properly.
|
|
func (nc *Conn) processConnectInit() error {
|
|
|
|
// Set our deadline for the whole connect process
|
|
nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
|
|
defer nc.conn.SetDeadline(time.Time{})
|
|
|
|
// Set our status to connecting.
|
|
nc.status = CONNECTING
|
|
|
|
// Process the INFO protocol received from the server
|
|
err := nc.processExpectedInfo()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Send the CONNECT protocol along with the initial PING protocol.
|
|
// Wait for the PONG response (or any error that we get from the server).
|
|
err = nc.sendConnect()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Reset the number of PING sent out
|
|
nc.pout = 0
|
|
|
|
// Start or reset Timer
|
|
if nc.Opts.PingInterval > 0 {
|
|
if nc.ptmr == nil {
|
|
nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer)
|
|
} else {
|
|
nc.ptmr.Reset(nc.Opts.PingInterval)
|
|
}
|
|
}
|
|
|
|
// Start the readLoop and flusher go routines, we will wait on both on a reconnect event.
|
|
nc.wg.Add(2)
|
|
go nc.readLoop()
|
|
go nc.flusher()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Main connect function. Will connect to the nats-server
|
|
func (nc *Conn) connect() error {
|
|
var returnedErr error
|
|
|
|
// Create actual socket connection
|
|
// For first connect we walk all servers in the pool and try
|
|
// to connect immediately.
|
|
nc.mu.Lock()
|
|
nc.initc = true
|
|
// The pool may change inside the loop iteration due to INFO protocol.
|
|
for i := 0; i < len(nc.srvPool); i++ {
|
|
nc.url = nc.srvPool[i].url
|
|
|
|
if err := nc.createConn(); err == nil {
|
|
// This was moved out of processConnectInit() because
|
|
// that function is now invoked from doReconnect() too.
|
|
nc.setup()
|
|
|
|
err = nc.processConnectInit()
|
|
|
|
if err == nil {
|
|
nc.srvPool[i].didConnect = true
|
|
nc.srvPool[i].reconnects = 0
|
|
returnedErr = nil
|
|
break
|
|
} else {
|
|
returnedErr = err
|
|
nc.mu.Unlock()
|
|
nc.close(DISCONNECTED, false)
|
|
nc.mu.Lock()
|
|
nc.url = nil
|
|
}
|
|
} else {
|
|
// Cancel out default connection refused, will trigger the
|
|
// No servers error conditional
|
|
if strings.Contains(err.Error(), "connection refused") {
|
|
returnedErr = nil
|
|
}
|
|
}
|
|
}
|
|
nc.initc = false
|
|
defer nc.mu.Unlock()
|
|
|
|
if returnedErr == nil && nc.status != CONNECTED {
|
|
returnedErr = ErrNoServers
|
|
}
|
|
return returnedErr
|
|
}
|
|
|
|
// This will check to see if the connection should be
|
|
// secure. This can be dictated from either end and should
|
|
// only be called after the INIT protocol has been received.
|
|
func (nc *Conn) checkForSecure() error {
|
|
// Check to see if we need to engage TLS
|
|
o := nc.Opts
|
|
|
|
// Check for mismatch in setups
|
|
if o.Secure && !nc.info.TLSRequired {
|
|
return ErrSecureConnWanted
|
|
} else if nc.info.TLSRequired && !o.Secure {
|
|
// Switch to Secure since server needs TLS.
|
|
o.Secure = true
|
|
}
|
|
|
|
// Need to rewrap with bufio
|
|
if o.Secure {
|
|
nc.makeTLSConn()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processExpectedInfo will look for the expected first INFO message
|
|
// sent when a connection is established. The lock should be held entering.
|
|
func (nc *Conn) processExpectedInfo() error {
|
|
|
|
c := &control{}
|
|
|
|
// Read the protocol
|
|
err := nc.readOp(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// The nats protocol should send INFO first always.
|
|
if c.op != _INFO_OP_ {
|
|
return ErrNoInfoReceived
|
|
}
|
|
|
|
// Parse the protocol
|
|
if err := nc.processInfo(c.args); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nc.checkForSecure()
|
|
}
|
|
|
|
// Sends a protocol control message by queuing into the bufio writer
|
|
// and kicking the flush Go routine. These writes are protected.
|
|
func (nc *Conn) sendProto(proto string) {
|
|
nc.mu.Lock()
|
|
nc.bw.WriteString(proto)
|
|
nc.kickFlusher()
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// Generate a connect protocol message, issuing user/password if
|
|
// applicable. The lock is assumed to be held upon entering.
|
|
func (nc *Conn) connectProto() (string, error) {
|
|
o := nc.Opts
|
|
var user, pass, token string
|
|
u := nc.url.User
|
|
if u != nil {
|
|
// if no password, assume username is authToken
|
|
if _, ok := u.Password(); !ok {
|
|
token = u.Username()
|
|
} else {
|
|
user = u.Username()
|
|
pass, _ = u.Password()
|
|
}
|
|
} else {
|
|
// Take from options (possibly all empty strings)
|
|
user = nc.Opts.User
|
|
pass = nc.Opts.Password
|
|
token = nc.Opts.Token
|
|
}
|
|
|
|
cinfo := connectInfo{o.Verbose, o.Pedantic, user, pass, token,
|
|
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho}
|
|
|
|
b, err := json.Marshal(cinfo)
|
|
if err != nil {
|
|
return _EMPTY_, ErrJsonParse
|
|
}
|
|
|
|
// Check if NoEcho is set and we have a server that supports it.
|
|
if o.NoEcho && nc.info.Proto < 1 {
|
|
return _EMPTY_, ErrNoEchoNotSupported
|
|
}
|
|
|
|
return fmt.Sprintf(conProto, b), nil
|
|
}
|
|
|
|
// normalizeErr removes the prefix -ERR, trim spaces and remove the quotes.
|
|
func normalizeErr(line string) string {
|
|
s := strings.ToLower(strings.TrimSpace(strings.TrimPrefix(line, _ERR_OP_)))
|
|
s = strings.TrimLeft(strings.TrimRight(s, "'"), "'")
|
|
return s
|
|
}
|
|
|
|
// Send a connect protocol message to the server, issue user/password if
|
|
// applicable. Will wait for a flush to return from the server for error
|
|
// processing.
|
|
func (nc *Conn) sendConnect() error {
|
|
|
|
// Construct the CONNECT protocol string
|
|
cProto, err := nc.connectProto()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Write the protocol into the buffer
|
|
_, err = nc.bw.WriteString(cProto)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Add to the buffer the PING protocol
|
|
_, err = nc.bw.WriteString(pingProto)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Flush the buffer
|
|
err = nc.bw.Flush()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// We don't want to read more than we need here, otherwise
|
|
// we would need to transfer the excess read data to the readLoop.
|
|
// Since in normal situations we just are looking for a PONG\r\n,
|
|
// reading byte-by-byte here is ok.
|
|
proto, err := nc.readProto()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If opts.Verbose is set, handle +OK
|
|
if nc.Opts.Verbose && proto == okProto {
|
|
// Read the rest now...
|
|
proto, err = nc.readProto()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// We expect a PONG
|
|
if proto != pongProto {
|
|
// But it could be something else, like -ERR
|
|
|
|
// Since we no longer use ReadLine(), trim the trailing "\r\n"
|
|
proto = strings.TrimRight(proto, "\r\n")
|
|
|
|
// If it's a server error...
|
|
if strings.HasPrefix(proto, _ERR_OP_) {
|
|
// Remove -ERR, trim spaces and quotes, and convert to lower case.
|
|
proto = normalizeErr(proto)
|
|
return errors.New("nats: " + proto)
|
|
}
|
|
|
|
// Notify that we got an unexpected protocol.
|
|
return fmt.Errorf("nats: expected '%s', got '%s'", _PONG_OP_, proto)
|
|
}
|
|
|
|
// This is where we are truly connected.
|
|
nc.status = CONNECTED
|
|
|
|
return nil
|
|
}
|
|
|
|
// reads a protocol one byte at a time.
|
|
func (nc *Conn) readProto() (string, error) {
|
|
var (
|
|
_buf = [10]byte{}
|
|
buf = _buf[:0]
|
|
b = [1]byte{}
|
|
protoEnd = byte('\n')
|
|
)
|
|
for {
|
|
if _, err := nc.conn.Read(b[:1]); err != nil {
|
|
// Do not report EOF error
|
|
if err == io.EOF {
|
|
return string(buf), nil
|
|
}
|
|
return "", err
|
|
}
|
|
buf = append(buf, b[0])
|
|
if b[0] == protoEnd {
|
|
return string(buf), nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// A control protocol line.
|
|
type control struct {
|
|
op, args string
|
|
}
|
|
|
|
// Read a control line and process the intended op.
|
|
func (nc *Conn) readOp(c *control) error {
|
|
br := bufio.NewReaderSize(nc.conn, defaultBufSize)
|
|
line, err := br.ReadString('\n')
|
|
if err != nil {
|
|
return err
|
|
}
|
|
parseControl(line, c)
|
|
return nil
|
|
}
|
|
|
|
// Parse a control line from the server.
|
|
func parseControl(line string, c *control) {
|
|
toks := strings.SplitN(line, _SPC_, 2)
|
|
if len(toks) == 1 {
|
|
c.op = strings.TrimSpace(toks[0])
|
|
c.args = _EMPTY_
|
|
} else if len(toks) == 2 {
|
|
c.op, c.args = strings.TrimSpace(toks[0]), strings.TrimSpace(toks[1])
|
|
} else {
|
|
c.op = _EMPTY_
|
|
}
|
|
}
|
|
|
|
// flushReconnectPending will push the pending items that were
|
|
// gathered while we were in a RECONNECTING state to the socket.
|
|
func (nc *Conn) flushReconnectPendingItems() {
|
|
if nc.pending == nil {
|
|
return
|
|
}
|
|
if nc.pending.Len() > 0 {
|
|
nc.bw.Write(nc.pending.Bytes())
|
|
}
|
|
}
|
|
|
|
// Stops the ping timer if set.
|
|
// Connection lock is held on entry.
|
|
func (nc *Conn) stopPingTimer() {
|
|
if nc.ptmr != nil {
|
|
nc.ptmr.Stop()
|
|
}
|
|
}
|
|
|
|
// Try to reconnect using the option parameters.
|
|
// This function assumes we are allowed to reconnect.
|
|
func (nc *Conn) doReconnect() {
|
|
// We want to make sure we have the other watchers shutdown properly
|
|
// here before we proceed past this point.
|
|
nc.waitForExits()
|
|
|
|
// FIXME(dlc) - We have an issue here if we have
|
|
// outstanding flush points (pongs) and they were not
|
|
// sent out, but are still in the pipe.
|
|
|
|
// Hold the lock manually and release where needed below,
|
|
// can't do defer here.
|
|
nc.mu.Lock()
|
|
|
|
// Clear any queued pongs, e.g. pending flush calls.
|
|
nc.clearPendingFlushCalls()
|
|
|
|
// Clear any errors.
|
|
nc.err = nil
|
|
// Perform appropriate callback if needed for a disconnect.
|
|
if nc.Opts.DisconnectedCB != nil {
|
|
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
|
|
}
|
|
|
|
// This is used to wait on go routines exit if we start them in the loop
|
|
// but an error occurs after that.
|
|
waitForGoRoutines := false
|
|
|
|
for len(nc.srvPool) > 0 {
|
|
cur, err := nc.selectNextServer()
|
|
if err != nil {
|
|
nc.err = err
|
|
break
|
|
}
|
|
|
|
sleepTime := int64(0)
|
|
|
|
// Sleep appropriate amount of time before the
|
|
// connection attempt if connecting to same server
|
|
// we just got disconnected from..
|
|
if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait {
|
|
sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt))
|
|
}
|
|
|
|
// On Windows, createConn() will take more than a second when no
|
|
// server is running at that address. So it could be that the
|
|
// time elapsed between reconnect attempts is always > than
|
|
// the set option. Release the lock to give a chance to a parallel
|
|
// nc.Close() to break the loop.
|
|
nc.mu.Unlock()
|
|
if sleepTime <= 0 {
|
|
runtime.Gosched()
|
|
} else {
|
|
time.Sleep(time.Duration(sleepTime))
|
|
}
|
|
// If the readLoop, etc.. go routines were started, wait for them to complete.
|
|
if waitForGoRoutines {
|
|
nc.waitForExits()
|
|
waitForGoRoutines = false
|
|
}
|
|
nc.mu.Lock()
|
|
|
|
// Check if we have been closed first.
|
|
if nc.isClosed() {
|
|
break
|
|
}
|
|
|
|
// Mark that we tried a reconnect
|
|
cur.reconnects++
|
|
|
|
// Try to create a new connection
|
|
err = nc.createConn()
|
|
|
|
// Not yet connected, retry...
|
|
// Continue to hold the lock
|
|
if err != nil {
|
|
nc.err = nil
|
|
continue
|
|
}
|
|
|
|
// We are reconnected
|
|
nc.Reconnects++
|
|
|
|
// Process connect logic
|
|
if nc.err = nc.processConnectInit(); nc.err != nil {
|
|
nc.status = RECONNECTING
|
|
// Reset the buffered writer to the pending buffer
|
|
// (was set to a buffered writer on nc.conn in createConn)
|
|
nc.bw.Reset(nc.pending)
|
|
continue
|
|
}
|
|
|
|
// Clear out server stats for the server we connected to..
|
|
cur.didConnect = true
|
|
cur.reconnects = 0
|
|
|
|
// Send existing subscription state
|
|
nc.resendSubscriptions()
|
|
|
|
// Now send off and clear pending buffer
|
|
nc.flushReconnectPendingItems()
|
|
|
|
// Flush the buffer
|
|
nc.err = nc.bw.Flush()
|
|
if nc.err != nil {
|
|
nc.status = RECONNECTING
|
|
// Reset the buffered writer to the pending buffer (bytes.Buffer).
|
|
nc.bw.Reset(nc.pending)
|
|
// Stop the ping timer (if set)
|
|
nc.stopPingTimer()
|
|
// Since processConnectInit() returned without error, the
|
|
// go routines were started, so wait for them to return
|
|
// on the next iteration (after releasing the lock).
|
|
waitForGoRoutines = true
|
|
continue
|
|
}
|
|
|
|
// Done with the pending buffer
|
|
nc.pending = nil
|
|
|
|
// This is where we are truly connected.
|
|
nc.status = CONNECTED
|
|
|
|
// Queue up the reconnect callback.
|
|
if nc.Opts.ReconnectedCB != nil {
|
|
nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
|
|
}
|
|
// Release lock here, we will return below.
|
|
nc.mu.Unlock()
|
|
|
|
// Make sure to flush everything
|
|
nc.Flush()
|
|
|
|
return
|
|
}
|
|
|
|
// Call into close.. We have no servers left..
|
|
if nc.err == nil {
|
|
nc.err = ErrNoServers
|
|
}
|
|
nc.mu.Unlock()
|
|
nc.Close()
|
|
}
|
|
|
|
// processOpErr handles errors from reading or parsing the protocol.
|
|
// The lock should not be held entering this function.
|
|
func (nc *Conn) processOpErr(err error) {
|
|
nc.mu.Lock()
|
|
if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
|
|
nc.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
if nc.Opts.AllowReconnect && nc.status == CONNECTED {
|
|
// Set our new status
|
|
nc.status = RECONNECTING
|
|
// Stop ping timer if set
|
|
nc.stopPingTimer()
|
|
if nc.conn != nil {
|
|
nc.bw.Flush()
|
|
nc.conn.Close()
|
|
nc.conn = nil
|
|
}
|
|
|
|
// Create pending buffer before reconnecting.
|
|
nc.pending = new(bytes.Buffer)
|
|
nc.bw.Reset(nc.pending)
|
|
|
|
go nc.doReconnect()
|
|
nc.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
nc.status = DISCONNECTED
|
|
nc.err = err
|
|
nc.mu.Unlock()
|
|
nc.Close()
|
|
}
|
|
|
|
// dispatch is responsible for calling any async callbacks
|
|
func (ac *asyncCallbacksHandler) asyncCBDispatcher() {
|
|
for {
|
|
ac.mu.Lock()
|
|
// Protect for spurious wakeups. We should get out of the
|
|
// wait only if there is an element to pop from the list.
|
|
for ac.head == nil {
|
|
ac.cond.Wait()
|
|
}
|
|
cur := ac.head
|
|
ac.head = cur.next
|
|
if cur == ac.tail {
|
|
ac.tail = nil
|
|
}
|
|
ac.mu.Unlock()
|
|
|
|
// This signals that the dispatcher has been closed and all
|
|
// previous callbacks have been dispatched.
|
|
if cur.f == nil {
|
|
return
|
|
}
|
|
// Invoke callback outside of handler's lock
|
|
cur.f()
|
|
}
|
|
}
|
|
|
|
// Add the given function to the tail of the list and
|
|
// signals the dispatcher.
|
|
func (ac *asyncCallbacksHandler) push(f func()) {
|
|
ac.pushOrClose(f, false)
|
|
}
|
|
|
|
// Signals that we are closing...
|
|
func (ac *asyncCallbacksHandler) close() {
|
|
ac.pushOrClose(nil, true)
|
|
}
|
|
|
|
// Add the given function to the tail of the list and
|
|
// signals the dispatcher.
|
|
func (ac *asyncCallbacksHandler) pushOrClose(f func(), close bool) {
|
|
ac.mu.Lock()
|
|
defer ac.mu.Unlock()
|
|
// Make sure that library is not calling push with nil function,
|
|
// since this is used to notify the dispatcher that it should stop.
|
|
if !close && f == nil {
|
|
panic("pushing a nil callback")
|
|
}
|
|
cb := &asyncCB{f: f}
|
|
if ac.tail != nil {
|
|
ac.tail.next = cb
|
|
} else {
|
|
ac.head = cb
|
|
}
|
|
ac.tail = cb
|
|
if close {
|
|
ac.cond.Broadcast()
|
|
} else {
|
|
ac.cond.Signal()
|
|
}
|
|
}
|
|
|
|
// readLoop() will sit on the socket reading and processing the
|
|
// protocol from the server. It will dispatch appropriately based
|
|
// on the op type.
|
|
func (nc *Conn) readLoop() {
|
|
// Release the wait group on exit
|
|
defer nc.wg.Done()
|
|
|
|
// Create a parseState if needed.
|
|
nc.mu.Lock()
|
|
if nc.ps == nil {
|
|
nc.ps = &parseState{}
|
|
}
|
|
nc.mu.Unlock()
|
|
|
|
// Stack based buffer.
|
|
b := make([]byte, defaultBufSize)
|
|
|
|
for {
|
|
// FIXME(dlc): RWLock here?
|
|
nc.mu.Lock()
|
|
sb := nc.isClosed() || nc.isReconnecting()
|
|
if sb {
|
|
nc.ps = &parseState{}
|
|
}
|
|
conn := nc.conn
|
|
nc.mu.Unlock()
|
|
|
|
if sb || conn == nil {
|
|
break
|
|
}
|
|
|
|
n, err := conn.Read(b)
|
|
if err != nil {
|
|
nc.processOpErr(err)
|
|
break
|
|
}
|
|
|
|
if err := nc.parse(b[:n]); err != nil {
|
|
nc.processOpErr(err)
|
|
break
|
|
}
|
|
}
|
|
// Clear the parseState here..
|
|
nc.mu.Lock()
|
|
nc.ps = nil
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// waitForMsgs waits on the conditional shared with readLoop and processMsg.
|
|
// It is used to deliver messages to asynchronous subscribers.
|
|
func (nc *Conn) waitForMsgs(s *Subscription) {
|
|
var closed bool
|
|
var delivered, max uint64
|
|
|
|
// Used to account for adjustments to sub.pBytes when we wrap back around.
|
|
msgLen := -1
|
|
|
|
for {
|
|
s.mu.Lock()
|
|
// Do accounting for last msg delivered here so we only lock once
|
|
// and drain state trips after callback has returned.
|
|
if msgLen >= 0 {
|
|
s.pMsgs--
|
|
s.pBytes -= msgLen
|
|
msgLen = -1
|
|
}
|
|
|
|
if s.pHead == nil && !s.closed {
|
|
s.pCond.Wait()
|
|
}
|
|
// Pop the msg off the list
|
|
m := s.pHead
|
|
if m != nil {
|
|
s.pHead = m.next
|
|
if s.pHead == nil {
|
|
s.pTail = nil
|
|
}
|
|
if m.barrier != nil {
|
|
s.mu.Unlock()
|
|
if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
|
|
m.barrier.f()
|
|
}
|
|
continue
|
|
}
|
|
msgLen = len(m.Data)
|
|
}
|
|
mcb := s.mcb
|
|
max = s.max
|
|
closed = s.closed
|
|
if !s.closed {
|
|
s.delivered++
|
|
delivered = s.delivered
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
if closed {
|
|
break
|
|
}
|
|
|
|
// Deliver the message.
|
|
if m != nil && (max == 0 || delivered <= max) {
|
|
mcb(m)
|
|
}
|
|
// If we have hit the max for delivered msgs, remove sub.
|
|
if max > 0 && delivered >= max {
|
|
nc.mu.Lock()
|
|
nc.removeSub(s)
|
|
nc.mu.Unlock()
|
|
break
|
|
}
|
|
}
|
|
// Check for barrier messages
|
|
s.mu.Lock()
|
|
for m := s.pHead; m != nil; m = s.pHead {
|
|
if m.barrier != nil {
|
|
s.mu.Unlock()
|
|
if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
|
|
m.barrier.f()
|
|
}
|
|
s.mu.Lock()
|
|
}
|
|
s.pHead = m.next
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// processMsg is called by parse and will place the msg on the
|
|
// appropriate channel/pending queue for processing. If the channel is full,
|
|
// or the pending queue is over the pending limits, the connection is
|
|
// considered a slow consumer.
|
|
func (nc *Conn) processMsg(data []byte) {
|
|
// Don't lock the connection to avoid server cutting us off if the
|
|
// flusher is holding the connection lock, trying to send to the server
|
|
// that is itself trying to send data to us.
|
|
nc.subsMu.RLock()
|
|
|
|
// Stats
|
|
nc.InMsgs++
|
|
nc.InBytes += uint64(len(data))
|
|
|
|
sub := nc.subs[nc.ps.ma.sid]
|
|
if sub == nil {
|
|
nc.subsMu.RUnlock()
|
|
return
|
|
}
|
|
|
|
// Copy them into string
|
|
subj := string(nc.ps.ma.subject)
|
|
reply := string(nc.ps.ma.reply)
|
|
|
|
// Doing message create outside of the sub's lock to reduce contention.
|
|
// It's possible that we end-up not using the message, but that's ok.
|
|
|
|
// FIXME(dlc): Need to copy, should/can do COW?
|
|
msgPayload := make([]byte, len(data))
|
|
copy(msgPayload, data)
|
|
|
|
// FIXME(dlc): Should we recycle these containers?
|
|
m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
|
|
|
|
sub.mu.Lock()
|
|
|
|
// Subscription internal stats (applicable only for non ChanSubscription's)
|
|
if sub.typ != ChanSubscription {
|
|
sub.pMsgs++
|
|
if sub.pMsgs > sub.pMsgsMax {
|
|
sub.pMsgsMax = sub.pMsgs
|
|
}
|
|
sub.pBytes += len(m.Data)
|
|
if sub.pBytes > sub.pBytesMax {
|
|
sub.pBytesMax = sub.pBytes
|
|
}
|
|
|
|
// Check for a Slow Consumer
|
|
if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) ||
|
|
(sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
|
|
goto slowConsumer
|
|
}
|
|
}
|
|
|
|
// We have two modes of delivery. One is the channel, used by channel
|
|
// subscribers and syncSubscribers, the other is a linked list for async.
|
|
if sub.mch != nil {
|
|
select {
|
|
case sub.mch <- m:
|
|
default:
|
|
goto slowConsumer
|
|
}
|
|
} else {
|
|
// Push onto the async pList
|
|
if sub.pHead == nil {
|
|
sub.pHead = m
|
|
sub.pTail = m
|
|
sub.pCond.Signal()
|
|
} else {
|
|
sub.pTail.next = m
|
|
sub.pTail = m
|
|
}
|
|
}
|
|
|
|
// Clear SlowConsumer status.
|
|
sub.sc = false
|
|
|
|
sub.mu.Unlock()
|
|
nc.subsMu.RUnlock()
|
|
return
|
|
|
|
slowConsumer:
|
|
sub.dropped++
|
|
sc := !sub.sc
|
|
sub.sc = true
|
|
// Undo stats from above
|
|
if sub.typ != ChanSubscription {
|
|
sub.pMsgs--
|
|
sub.pBytes -= len(m.Data)
|
|
}
|
|
sub.mu.Unlock()
|
|
nc.subsMu.RUnlock()
|
|
if sc {
|
|
// Now we need connection's lock and we may end-up in the situation
|
|
// that we were trying to avoid, except that in this case, the client
|
|
// is already experiencing client-side slow consumer situation.
|
|
nc.mu.Lock()
|
|
nc.err = ErrSlowConsumer
|
|
if nc.Opts.AsyncErrorCB != nil {
|
|
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrSlowConsumer) })
|
|
}
|
|
nc.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// processPermissionsViolation is called when the server signals a subject
|
|
// permissions violation on either publish or subscribe.
|
|
func (nc *Conn) processPermissionsViolation(err string) {
|
|
nc.mu.Lock()
|
|
// create error here so we can pass it as a closure to the async cb dispatcher.
|
|
e := errors.New("nats: " + err)
|
|
nc.err = e
|
|
if nc.Opts.AsyncErrorCB != nil {
|
|
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) })
|
|
}
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// processAuthorizationViolation is called when the server signals a user
|
|
// authorization violation.
|
|
func (nc *Conn) processAuthorizationViolation(err string) {
|
|
nc.mu.Lock()
|
|
nc.err = ErrAuthorization
|
|
if nc.Opts.AsyncErrorCB != nil {
|
|
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, ErrAuthorization) })
|
|
}
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// flusher is a separate Go routine that will process flush requests for the write
|
|
// bufio. This allows coalescing of writes to the underlying socket.
|
|
func (nc *Conn) flusher() {
|
|
// Release the wait group
|
|
defer nc.wg.Done()
|
|
|
|
// snapshot the bw and conn since they can change from underneath of us.
|
|
nc.mu.Lock()
|
|
bw := nc.bw
|
|
conn := nc.conn
|
|
fch := nc.fch
|
|
flusherTimeout := nc.Opts.FlusherTimeout
|
|
nc.mu.Unlock()
|
|
|
|
if conn == nil || bw == nil {
|
|
return
|
|
}
|
|
|
|
for {
|
|
if _, ok := <-fch; !ok {
|
|
return
|
|
}
|
|
nc.mu.Lock()
|
|
|
|
// Check to see if we should bail out.
|
|
if !nc.isConnected() || nc.isConnecting() || bw != nc.bw || conn != nc.conn {
|
|
nc.mu.Unlock()
|
|
return
|
|
}
|
|
if bw.Buffered() > 0 {
|
|
// Allow customizing how long we should wait for a flush to be done
|
|
// to prevent unhealthy connections blocking the client for too long.
|
|
if flusherTimeout > 0 {
|
|
conn.SetWriteDeadline(time.Now().Add(flusherTimeout))
|
|
}
|
|
|
|
if err := bw.Flush(); err != nil {
|
|
if nc.err == nil {
|
|
nc.err = err
|
|
}
|
|
}
|
|
conn.SetWriteDeadline(time.Time{})
|
|
}
|
|
nc.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// processPing will send an immediate pong protocol response to the
|
|
// server. The server uses this mechanism to detect dead clients.
|
|
func (nc *Conn) processPing() {
|
|
nc.sendProto(pongProto)
|
|
}
|
|
|
|
// processPong is used to process responses to the client's ping
|
|
// messages. We use pings for the flush mechanism as well.
|
|
func (nc *Conn) processPong() {
|
|
var ch chan struct{}
|
|
|
|
nc.mu.Lock()
|
|
if len(nc.pongs) > 0 {
|
|
ch = nc.pongs[0]
|
|
nc.pongs = nc.pongs[1:]
|
|
}
|
|
nc.pout = 0
|
|
nc.mu.Unlock()
|
|
if ch != nil {
|
|
ch <- struct{}{}
|
|
}
|
|
}
|
|
|
|
// processOK is a placeholder for processing OK messages.
|
|
func (nc *Conn) processOK() {
|
|
// do nothing
|
|
}
|
|
|
|
// processInfo is used to parse the info messages sent
|
|
// from the server.
|
|
// This function may update the server pool.
|
|
func (nc *Conn) processInfo(info string) error {
|
|
if info == _EMPTY_ {
|
|
return nil
|
|
}
|
|
ncInfo := serverInfo{}
|
|
if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
|
|
return err
|
|
}
|
|
// Copy content into connection's info structure.
|
|
nc.info = ncInfo
|
|
// The array could be empty/not present on initial connect,
|
|
// if advertise is disabled on that server, or servers that
|
|
// did not include themselves in the async INFO protocol.
|
|
// If empty, do not remove the implicit servers from the pool.
|
|
if len(ncInfo.ConnectURLs) == 0 {
|
|
return nil
|
|
}
|
|
// Note about pool randomization: when the pool was first created,
|
|
// it was randomized (if allowed). We keep the order the same (removing
|
|
// implicit servers that are no longer sent to us). New URLs are sent
|
|
// to us in no specific order so don't need extra randomization.
|
|
hasNew := false
|
|
// This is what we got from the server we are connected to.
|
|
urls := nc.info.ConnectURLs
|
|
// Transform that to a map for easy lookups
|
|
tmp := make(map[string]struct{}, len(urls))
|
|
for _, curl := range urls {
|
|
tmp[curl] = struct{}{}
|
|
}
|
|
// Walk the pool and removed the implicit servers that are no longer in the
|
|
// given array/map
|
|
sp := nc.srvPool
|
|
for i := 0; i < len(sp); i++ {
|
|
srv := sp[i]
|
|
curl := srv.url.Host
|
|
// Check if this URL is in the INFO protocol
|
|
_, inInfo := tmp[curl]
|
|
// Remove from the temp map so that at the end we are left with only
|
|
// new (or restarted) servers that need to be added to the pool.
|
|
delete(tmp, curl)
|
|
// Keep servers that were set through Options, but also the one that
|
|
// we are currently connected to (even if it is a discovered server).
|
|
if !srv.isImplicit || srv.url == nc.url {
|
|
continue
|
|
}
|
|
if !inInfo {
|
|
// Remove from server pool. Keep current order.
|
|
copy(sp[i:], sp[i+1:])
|
|
nc.srvPool = sp[:len(sp)-1]
|
|
sp = nc.srvPool
|
|
i--
|
|
}
|
|
}
|
|
// If there are any left in the tmp map, these are new (or restarted) servers
|
|
// and need to be added to the pool.
|
|
for curl := range tmp {
|
|
// Before adding, check if this is a new (as in never seen) URL.
|
|
// This is used to figure out if we invoke the DiscoveredServersCB
|
|
if _, present := nc.urls[curl]; !present {
|
|
hasNew = true
|
|
}
|
|
nc.addURLToPool(fmt.Sprintf("nats://%s", curl), true)
|
|
}
|
|
if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
|
|
nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processAsyncInfo does the same than processInfo, but is called
|
|
// from the parser. Calls processInfo under connection's lock
|
|
// protection.
|
|
func (nc *Conn) processAsyncInfo(info []byte) {
|
|
nc.mu.Lock()
|
|
// Ignore errors, we will simply not update the server pool...
|
|
nc.processInfo(string(info))
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// LastError reports the last error encountered via the connection.
|
|
// It can be used reliably within ClosedCB in order to find out reason
|
|
// why connection was closed for example.
|
|
func (nc *Conn) LastError() error {
|
|
if nc == nil {
|
|
return ErrInvalidConnection
|
|
}
|
|
nc.mu.Lock()
|
|
err := nc.err
|
|
nc.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
// processErr processes any error messages from the server and
|
|
// sets the connection's lastError.
|
|
func (nc *Conn) processErr(e string) {
|
|
// Trim, remove quotes, convert to lower case.
|
|
e = normalizeErr(e)
|
|
|
|
// FIXME(dlc) - process Slow Consumer signals special.
|
|
if e == STALE_CONNECTION {
|
|
nc.processOpErr(ErrStaleConnection)
|
|
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
|
|
nc.processPermissionsViolation(e)
|
|
} else if strings.HasPrefix(e, AUTHORIZATION_ERR) {
|
|
nc.processAuthorizationViolation(e)
|
|
} else {
|
|
nc.mu.Lock()
|
|
nc.err = errors.New("nats: " + e)
|
|
nc.mu.Unlock()
|
|
nc.Close()
|
|
}
|
|
}
|
|
|
|
// kickFlusher will send a bool on a channel to kick the
|
|
// flush Go routine to flush data to the server.
|
|
func (nc *Conn) kickFlusher() {
|
|
if nc.bw != nil {
|
|
select {
|
|
case nc.fch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Publish publishes the data argument to the given subject. The data
|
|
// argument is left untouched and needs to be correctly interpreted on
|
|
// the receiver.
|
|
func (nc *Conn) Publish(subj string, data []byte) error {
|
|
return nc.publish(subj, _EMPTY_, data)
|
|
}
|
|
|
|
// PublishMsg publishes the Msg structure, which includes the
|
|
// Subject, an optional Reply and an optional Data field.
|
|
func (nc *Conn) PublishMsg(m *Msg) error {
|
|
if m == nil {
|
|
return ErrInvalidMsg
|
|
}
|
|
return nc.publish(m.Subject, m.Reply, m.Data)
|
|
}
|
|
|
|
// PublishRequest will perform a Publish() excpecting a response on the
|
|
// reply subject. Use Request() for automatically waiting for a response
|
|
// inline.
|
|
func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
|
|
return nc.publish(subj, reply, data)
|
|
}
|
|
|
|
// Used for handrolled itoa
|
|
const digits = "0123456789"
|
|
|
|
// publish is the internal function to publish messages to a nats-server.
|
|
// Sends a protocol data message by queuing into the bufio writer
|
|
// and kicking the flush go routine. These writes should be protected.
|
|
func (nc *Conn) publish(subj, reply string, data []byte) error {
|
|
if nc == nil {
|
|
return ErrInvalidConnection
|
|
}
|
|
if subj == "" {
|
|
return ErrBadSubject
|
|
}
|
|
nc.mu.Lock()
|
|
|
|
if nc.isClosed() {
|
|
nc.mu.Unlock()
|
|
return ErrConnectionClosed
|
|
}
|
|
|
|
if nc.isDrainingPubs() {
|
|
nc.mu.Unlock()
|
|
return ErrConnectionDraining
|
|
}
|
|
|
|
// Proactively reject payloads over the threshold set by server.
|
|
msgSize := int64(len(data))
|
|
if msgSize > nc.info.MaxPayload {
|
|
nc.mu.Unlock()
|
|
return ErrMaxPayload
|
|
}
|
|
|
|
// Check if we are reconnecting, and if so check if
|
|
// we have exceeded our reconnect outbound buffer limits.
|
|
if nc.isReconnecting() {
|
|
// Flush to underlying buffer.
|
|
nc.bw.Flush()
|
|
// Check if we are over
|
|
if nc.pending.Len() >= nc.Opts.ReconnectBufSize {
|
|
nc.mu.Unlock()
|
|
return ErrReconnectBufExceeded
|
|
}
|
|
}
|
|
|
|
msgh := nc.scratch[:len(_PUB_P_)]
|
|
msgh = append(msgh, subj...)
|
|
msgh = append(msgh, ' ')
|
|
if reply != "" {
|
|
msgh = append(msgh, reply...)
|
|
msgh = append(msgh, ' ')
|
|
}
|
|
|
|
// We could be smarter here, but simple loop is ok,
|
|
// just avoid strconv in fast path
|
|
// FIXME(dlc) - Find a better way here.
|
|
// msgh = strconv.AppendInt(msgh, int64(len(data)), 10)
|
|
|
|
var b [12]byte
|
|
var i = len(b)
|
|
if len(data) > 0 {
|
|
for l := len(data); l > 0; l /= 10 {
|
|
i -= 1
|
|
b[i] = digits[l%10]
|
|
}
|
|
} else {
|
|
i -= 1
|
|
b[i] = digits[0]
|
|
}
|
|
|
|
msgh = append(msgh, b[i:]...)
|
|
msgh = append(msgh, _CRLF_...)
|
|
|
|
_, err := nc.bw.Write(msgh)
|
|
if err == nil {
|
|
_, err = nc.bw.Write(data)
|
|
}
|
|
if err == nil {
|
|
_, err = nc.bw.WriteString(_CRLF_)
|
|
}
|
|
if err != nil {
|
|
nc.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
nc.OutMsgs++
|
|
nc.OutBytes += uint64(len(data))
|
|
|
|
if len(nc.fch) == 0 {
|
|
nc.kickFlusher()
|
|
}
|
|
nc.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// respHandler is the global response handler. It will look up
|
|
// the appropriate channel based on the last token and place
|
|
// the message on the channel if possible.
|
|
func (nc *Conn) respHandler(m *Msg) {
|
|
rt := respToken(m.Subject)
|
|
|
|
nc.mu.Lock()
|
|
// Just return if closed.
|
|
if nc.isClosed() {
|
|
nc.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Grab mch
|
|
mch := nc.respMap[rt]
|
|
// Delete the key regardless, one response only.
|
|
// FIXME(dlc) - should we track responses past 1
|
|
// just statistics wise?
|
|
delete(nc.respMap, rt)
|
|
nc.mu.Unlock()
|
|
|
|
// Don't block, let Request timeout instead, mch is
|
|
// buffered and we should delete the key before a
|
|
// second response is processed.
|
|
select {
|
|
case mch <- m:
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
|
|
// Create the response subscription we will use for all
|
|
// new style responses. This will be on an _INBOX with an
|
|
// additional terminal token. The subscription will be on
|
|
// a wildcard. Caller is responsible for ensuring this is
|
|
// only called once.
|
|
func (nc *Conn) createRespMux(respSub string) error {
|
|
s, err := nc.Subscribe(respSub, nc.respHandler)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
nc.mu.Lock()
|
|
nc.respMux = s
|
|
nc.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Request will send a request payload and deliver the response message,
|
|
// or an error, including a timeout if no message was received properly.
|
|
func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
|
|
if nc == nil {
|
|
return nil, ErrInvalidConnection
|
|
}
|
|
|
|
nc.mu.Lock()
|
|
// If user wants the old style.
|
|
if nc.Opts.UseOldRequestStyle {
|
|
nc.mu.Unlock()
|
|
return nc.oldRequest(subj, data, timeout)
|
|
}
|
|
|
|
// Do setup for the new style.
|
|
if nc.respMap == nil {
|
|
// _INBOX wildcard
|
|
nc.respSub = fmt.Sprintf("%s.*", NewInbox())
|
|
nc.respMap = make(map[string]chan *Msg)
|
|
}
|
|
// Create literal Inbox and map to a chan msg.
|
|
mch := make(chan *Msg, RequestChanLen)
|
|
respInbox := nc.newRespInbox()
|
|
token := respToken(respInbox)
|
|
nc.respMap[token] = mch
|
|
createSub := nc.respMux == nil
|
|
ginbox := nc.respSub
|
|
nc.mu.Unlock()
|
|
|
|
if createSub {
|
|
// Make sure scoped subscription is setup only once.
|
|
var err error
|
|
nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err := nc.PublishRequest(subj, respInbox, data); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
t := globalTimerPool.Get(timeout)
|
|
defer globalTimerPool.Put(t)
|
|
|
|
var ok bool
|
|
var msg *Msg
|
|
|
|
select {
|
|
case msg, ok = <-mch:
|
|
if !ok {
|
|
return nil, ErrConnectionClosed
|
|
}
|
|
case <-t.C:
|
|
nc.mu.Lock()
|
|
delete(nc.respMap, token)
|
|
nc.mu.Unlock()
|
|
return nil, ErrTimeout
|
|
}
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
// oldRequest will create an Inbox and perform a Request() call
|
|
// with the Inbox reply and return the first reply received.
|
|
// This is optimized for the case of multiple responses.
|
|
func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Msg, error) {
|
|
inbox := NewInbox()
|
|
ch := make(chan *Msg, RequestChanLen)
|
|
|
|
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.AutoUnsubscribe(1)
|
|
defer s.Unsubscribe()
|
|
|
|
err = nc.PublishRequest(subj, inbox, data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.NextMsg(timeout)
|
|
}
|
|
|
|
// InboxPrefix is the prefix for all inbox subjects.
|
|
const InboxPrefix = "_INBOX."
|
|
const inboxPrefixLen = len(InboxPrefix)
|
|
const respInboxPrefixLen = inboxPrefixLen + nuidSize + 1
|
|
|
|
// NewInbox will return an inbox string which can be used for directed replies from
|
|
// subscribers. These are guaranteed to be unique, but can be shared and subscribed
|
|
// to by others.
|
|
func NewInbox() string {
|
|
var b [inboxPrefixLen + nuidSize]byte
|
|
pres := b[:inboxPrefixLen]
|
|
copy(pres, InboxPrefix)
|
|
ns := b[inboxPrefixLen:]
|
|
copy(ns, nuid.Next())
|
|
return string(b[:])
|
|
}
|
|
|
|
// Creates a new literal response subject that will trigger
|
|
// the global subscription handler.
|
|
func (nc *Conn) newRespInbox() string {
|
|
var b [inboxPrefixLen + (2 * nuidSize) + 1]byte
|
|
pres := b[:respInboxPrefixLen]
|
|
copy(pres, nc.respSub)
|
|
ns := b[respInboxPrefixLen:]
|
|
copy(ns, nuid.Next())
|
|
return string(b[:])
|
|
}
|
|
|
|
// respToken will return the last token of a literal response inbox
|
|
// which we use for the message channel lookup.
|
|
func respToken(respInbox string) string {
|
|
return respInbox[respInboxPrefixLen:]
|
|
}
|
|
|
|
// Subscribe will express interest in the given subject. The subject
|
|
// can have wildcards (partial:*, full:>). Messages will be delivered
|
|
// to the associated MsgHandler.
|
|
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
|
|
return nc.subscribe(subj, _EMPTY_, cb, nil)
|
|
}
|
|
|
|
// ChanSubscribe will express interest in the given subject and place
|
|
// all messages received on the channel.
|
|
// You should not close the channel until sub.Unsubscribe() has been called.
|
|
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
|
|
return nc.subscribe(subj, _EMPTY_, nil, ch)
|
|
}
|
|
|
|
// ChanQueueSubscribe will express interest in the given subject.
|
|
// All subscribers with the same queue name will form the queue group
|
|
// and only one member of the group will be selected to receive any given message,
|
|
// which will be placed on the channel.
|
|
// You should not close the channel until sub.Unsubscribe() has been called.
|
|
// Note: This is the same than QueueSubscribeSyncWithChan.
|
|
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
|
|
return nc.subscribe(subj, group, nil, ch)
|
|
}
|
|
|
|
// SubscribeSync will express interest on the given subject. Messages will
|
|
// be received synchronously using Subscription.NextMsg().
|
|
func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
|
|
if nc == nil {
|
|
return nil, ErrInvalidConnection
|
|
}
|
|
mch := make(chan *Msg, nc.Opts.SubChanLen)
|
|
s, e := nc.subscribe(subj, _EMPTY_, nil, mch)
|
|
if s != nil {
|
|
s.typ = SyncSubscription
|
|
}
|
|
return s, e
|
|
}
|
|
|
|
// QueueSubscribe creates an asynchronous queue subscriber on the given subject.
|
|
// All subscribers with the same queue name will form the queue group and
|
|
// only one member of the group will be selected to receive any given
|
|
// message asynchronously.
|
|
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
|
|
return nc.subscribe(subj, queue, cb, nil)
|
|
}
|
|
|
|
// QueueSubscribeSync creates a synchronous queue subscriber on the given
|
|
// subject. All subscribers with the same queue name will form the queue
|
|
// group and only one member of the group will be selected to receive any
|
|
// given message synchronously using Subscription.NextMsg().
|
|
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
|
|
mch := make(chan *Msg, nc.Opts.SubChanLen)
|
|
s, e := nc.subscribe(subj, queue, nil, mch)
|
|
if s != nil {
|
|
s.typ = SyncSubscription
|
|
}
|
|
return s, e
|
|
}
|
|
|
|
// QueueSubscribeSyncWithChan will express interest in the given subject.
|
|
// All subscribers with the same queue name will form the queue group
|
|
// and only one member of the group will be selected to receive any given message,
|
|
// which will be placed on the channel.
|
|
// You should not close the channel until sub.Unsubscribe() has been called.
|
|
// Note: This is the same than ChanQueueSubscribe.
|
|
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
|
|
return nc.subscribe(subj, queue, nil, ch)
|
|
}
|
|
|
|
// subscribe is the internal subscribe function that indicates interest in a subject.
|
|
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Subscription, error) {
|
|
if nc == nil {
|
|
return nil, ErrInvalidConnection
|
|
}
|
|
nc.mu.Lock()
|
|
// ok here, but defer is generally expensive
|
|
defer nc.mu.Unlock()
|
|
defer nc.kickFlusher()
|
|
|
|
// Check for some error conditions.
|
|
if nc.isClosed() {
|
|
return nil, ErrConnectionClosed
|
|
}
|
|
if nc.isDraining() {
|
|
return nil, ErrConnectionDraining
|
|
}
|
|
|
|
if cb == nil && ch == nil {
|
|
return nil, ErrBadSubscription
|
|
}
|
|
|
|
sub := &Subscription{Subject: subj, Queue: queue, mcb: cb, conn: nc}
|
|
// Set pending limits.
|
|
sub.pMsgsLimit = DefaultSubPendingMsgsLimit
|
|
sub.pBytesLimit = DefaultSubPendingBytesLimit
|
|
|
|
// If we have an async callback, start up a sub specific
|
|
// Go routine to deliver the messages.
|
|
if cb != nil {
|
|
sub.typ = AsyncSubscription
|
|
sub.pCond = sync.NewCond(&sub.mu)
|
|
go nc.waitForMsgs(sub)
|
|
} else {
|
|
sub.typ = ChanSubscription
|
|
sub.mch = ch
|
|
}
|
|
|
|
nc.subsMu.Lock()
|
|
nc.ssid++
|
|
sub.sid = nc.ssid
|
|
nc.subs[sub.sid] = sub
|
|
nc.subsMu.Unlock()
|
|
|
|
// We will send these for all subs when we reconnect
|
|
// so that we can suppress here.
|
|
if !nc.isReconnecting() {
|
|
fmt.Fprintf(nc.bw, subProto, subj, queue, sub.sid)
|
|
}
|
|
return sub, nil
|
|
}
|
|
|
|
// NumSubscriptions returns active number of subscriptions.
|
|
func (nc *Conn) NumSubscriptions() int {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return len(nc.subs)
|
|
}
|
|
|
|
// Lock for nc should be held here upon entry
|
|
func (nc *Conn) removeSub(s *Subscription) {
|
|
nc.subsMu.Lock()
|
|
delete(nc.subs, s.sid)
|
|
nc.subsMu.Unlock()
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
// Release callers on NextMsg for SyncSubscription only
|
|
if s.mch != nil && s.typ == SyncSubscription {
|
|
close(s.mch)
|
|
}
|
|
s.mch = nil
|
|
|
|
// Mark as invalid
|
|
s.conn = nil
|
|
s.closed = true
|
|
if s.pCond != nil {
|
|
s.pCond.Broadcast()
|
|
}
|
|
}
|
|
|
|
// SubscriptionType is the type of the Subscription.
|
|
type SubscriptionType int
|
|
|
|
// The different types of subscription types.
|
|
const (
|
|
AsyncSubscription = SubscriptionType(iota)
|
|
SyncSubscription
|
|
ChanSubscription
|
|
NilSubscription
|
|
)
|
|
|
|
// Type returns the type of Subscription.
|
|
func (s *Subscription) Type() SubscriptionType {
|
|
if s == nil {
|
|
return NilSubscription
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.typ
|
|
}
|
|
|
|
// IsValid returns a boolean indicating whether the subscription
|
|
// is still active. This will return false if the subscription has
|
|
// already been closed.
|
|
func (s *Subscription) IsValid() bool {
|
|
if s == nil {
|
|
return false
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.conn != nil
|
|
}
|
|
|
|
// Drain will remove interest but continue callbacks until all messages
|
|
// have been processed.
|
|
func (s *Subscription) Drain() error {
|
|
if s == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
conn := s.conn
|
|
s.mu.Unlock()
|
|
if conn == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
return conn.unsubscribe(s, 0, true)
|
|
}
|
|
|
|
// Unsubscribe will remove interest in the given subject.
|
|
func (s *Subscription) Unsubscribe() error {
|
|
if s == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
conn := s.conn
|
|
s.mu.Unlock()
|
|
if conn == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
if conn.IsDraining() {
|
|
return ErrConnectionDraining
|
|
}
|
|
return conn.unsubscribe(s, 0, false)
|
|
}
|
|
|
|
// checkDrained will watch for a subscription to be fully drained
|
|
// and then remove it.
|
|
func (nc *Conn) checkDrained(sub *Subscription) {
|
|
if nc == nil || sub == nil {
|
|
return
|
|
}
|
|
|
|
// This allows us to know that whatever we have in the client pending
|
|
// is correct and the server will not send additional information.
|
|
nc.Flush()
|
|
|
|
// Once we are here we just wait for Pending to reach 0 or
|
|
// any other state to exit this go routine.
|
|
for {
|
|
// check connection is still valid.
|
|
if nc.IsClosed() {
|
|
return
|
|
}
|
|
|
|
// Check subscription state
|
|
sub.mu.Lock()
|
|
conn := sub.conn
|
|
closed := sub.closed
|
|
pMsgs := sub.pMsgs
|
|
sub.mu.Unlock()
|
|
|
|
if conn == nil || closed || pMsgs == 0 {
|
|
nc.mu.Lock()
|
|
nc.removeSub(sub)
|
|
nc.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// AutoUnsubscribe will issue an automatic Unsubscribe that is
|
|
// processed by the server when max messages have been received.
|
|
// This can be useful when sending a request to an unknown number
|
|
// of subscribers.
|
|
func (s *Subscription) AutoUnsubscribe(max int) error {
|
|
if s == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
conn := s.conn
|
|
s.mu.Unlock()
|
|
if conn == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
return conn.unsubscribe(s, max, false)
|
|
}
|
|
|
|
// unsubscribe performs the low level unsubscribe to the server.
|
|
// Use Subscription.Unsubscribe()
|
|
func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
|
|
nc.mu.Lock()
|
|
// ok here, but defer is expensive
|
|
defer nc.mu.Unlock()
|
|
defer nc.kickFlusher()
|
|
|
|
if nc.isClosed() {
|
|
return ErrConnectionClosed
|
|
}
|
|
|
|
nc.subsMu.RLock()
|
|
s := nc.subs[sub.sid]
|
|
nc.subsMu.RUnlock()
|
|
// Already unsubscribed
|
|
if s == nil {
|
|
return nil
|
|
}
|
|
|
|
maxStr := _EMPTY_
|
|
if max > 0 {
|
|
s.max = uint64(max)
|
|
maxStr = strconv.Itoa(max)
|
|
} else if !drainMode {
|
|
nc.removeSub(s)
|
|
}
|
|
|
|
if drainMode {
|
|
go nc.checkDrained(sub)
|
|
}
|
|
|
|
// We will send these for all subs when we reconnect
|
|
// so that we can suppress here.
|
|
if !nc.isReconnecting() {
|
|
fmt.Fprintf(nc.bw, unsubProto, s.sid, maxStr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NextMsg will return the next message available to a synchronous subscriber
|
|
// or block until one is available. A timeout can be used to return when no
|
|
// message has been delivered.
|
|
func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
|
|
if s == nil {
|
|
return nil, ErrBadSubscription
|
|
}
|
|
|
|
s.mu.Lock()
|
|
err := s.validateNextMsgState()
|
|
if err != nil {
|
|
s.mu.Unlock()
|
|
return nil, err
|
|
}
|
|
|
|
// snapshot
|
|
mch := s.mch
|
|
s.mu.Unlock()
|
|
|
|
var ok bool
|
|
var msg *Msg
|
|
|
|
t := globalTimerPool.Get(timeout)
|
|
defer globalTimerPool.Put(t)
|
|
|
|
select {
|
|
case msg, ok = <-mch:
|
|
if !ok {
|
|
return nil, ErrConnectionClosed
|
|
}
|
|
err := s.processNextMsgDelivered(msg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
case <-t.C:
|
|
return nil, ErrTimeout
|
|
}
|
|
|
|
return msg, nil
|
|
}
|
|
|
|
// validateNextMsgState checks whether the subscription is in a valid
|
|
// state to call NextMsg and be delivered another message synchronously.
|
|
// This should be called while holding the lock.
|
|
func (s *Subscription) validateNextMsgState() error {
|
|
if s.connClosed {
|
|
return ErrConnectionClosed
|
|
}
|
|
if s.mch == nil {
|
|
if s.max > 0 && s.delivered >= s.max {
|
|
return ErrMaxMessages
|
|
} else if s.closed {
|
|
return ErrBadSubscription
|
|
}
|
|
}
|
|
if s.mcb != nil {
|
|
return ErrSyncSubRequired
|
|
}
|
|
if s.sc {
|
|
s.sc = false
|
|
return ErrSlowConsumer
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processNextMsgDelivered takes a message and applies the needed
|
|
// accounting to the stats from the subscription, returning an
|
|
// error in case we have the maximum number of messages have been
|
|
// delivered already. It should not be called while holding the lock.
|
|
func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
|
|
s.mu.Lock()
|
|
nc := s.conn
|
|
max := s.max
|
|
|
|
// Update some stats.
|
|
s.delivered++
|
|
delivered := s.delivered
|
|
if s.typ == SyncSubscription {
|
|
s.pMsgs--
|
|
s.pBytes -= len(msg.Data)
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
if max > 0 {
|
|
if delivered > max {
|
|
return ErrMaxMessages
|
|
}
|
|
// Remove subscription if we have reached max.
|
|
if delivered == max {
|
|
nc.mu.Lock()
|
|
nc.removeSub(s)
|
|
nc.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Queued returns the number of queued messages in the client for this subscription.
|
|
// DEPRECATED: Use Pending()
|
|
func (s *Subscription) QueuedMsgs() (int, error) {
|
|
m, _, err := s.Pending()
|
|
return int(m), err
|
|
}
|
|
|
|
// Pending returns the number of queued messages and queued bytes in the client for this subscription.
|
|
func (s *Subscription) Pending() (int, int, error) {
|
|
if s == nil {
|
|
return -1, -1, ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conn == nil {
|
|
return -1, -1, ErrBadSubscription
|
|
}
|
|
if s.typ == ChanSubscription {
|
|
return -1, -1, ErrTypeSubscription
|
|
}
|
|
return s.pMsgs, s.pBytes, nil
|
|
}
|
|
|
|
// MaxPending returns the maximum number of queued messages and queued bytes seen so far.
|
|
func (s *Subscription) MaxPending() (int, int, error) {
|
|
if s == nil {
|
|
return -1, -1, ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conn == nil {
|
|
return -1, -1, ErrBadSubscription
|
|
}
|
|
if s.typ == ChanSubscription {
|
|
return -1, -1, ErrTypeSubscription
|
|
}
|
|
return s.pMsgsMax, s.pBytesMax, nil
|
|
}
|
|
|
|
// ClearMaxPending resets the maximums seen so far.
|
|
func (s *Subscription) ClearMaxPending() error {
|
|
if s == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conn == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
if s.typ == ChanSubscription {
|
|
return ErrTypeSubscription
|
|
}
|
|
s.pMsgsMax, s.pBytesMax = 0, 0
|
|
return nil
|
|
}
|
|
|
|
// Pending Limits
|
|
const (
|
|
DefaultSubPendingMsgsLimit = 65536
|
|
DefaultSubPendingBytesLimit = 65536 * 1024
|
|
)
|
|
|
|
// PendingLimits returns the current limits for this subscription.
|
|
// If no error is returned, a negative value indicates that the
|
|
// given metric is not limited.
|
|
func (s *Subscription) PendingLimits() (int, int, error) {
|
|
if s == nil {
|
|
return -1, -1, ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conn == nil {
|
|
return -1, -1, ErrBadSubscription
|
|
}
|
|
if s.typ == ChanSubscription {
|
|
return -1, -1, ErrTypeSubscription
|
|
}
|
|
return s.pMsgsLimit, s.pBytesLimit, nil
|
|
}
|
|
|
|
// SetPendingLimits sets the limits for pending msgs and bytes for this subscription.
|
|
// Zero is not allowed. Any negative value means that the given metric is not limited.
|
|
func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
|
|
if s == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conn == nil {
|
|
return ErrBadSubscription
|
|
}
|
|
if s.typ == ChanSubscription {
|
|
return ErrTypeSubscription
|
|
}
|
|
if msgLimit == 0 || bytesLimit == 0 {
|
|
return ErrInvalidArg
|
|
}
|
|
s.pMsgsLimit, s.pBytesLimit = msgLimit, bytesLimit
|
|
return nil
|
|
}
|
|
|
|
// Delivered returns the number of delivered messages for this subscription.
|
|
func (s *Subscription) Delivered() (int64, error) {
|
|
if s == nil {
|
|
return -1, ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conn == nil {
|
|
return -1, ErrBadSubscription
|
|
}
|
|
return int64(s.delivered), nil
|
|
}
|
|
|
|
// Dropped returns the number of known dropped messages for this subscription.
|
|
// This will correspond to messages dropped by violations of PendingLimits. If
|
|
// the server declares the connection a SlowConsumer, this number may not be
|
|
// valid.
|
|
func (s *Subscription) Dropped() (int, error) {
|
|
if s == nil {
|
|
return -1, ErrBadSubscription
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conn == nil {
|
|
return -1, ErrBadSubscription
|
|
}
|
|
return s.dropped, nil
|
|
}
|
|
|
|
// FIXME: This is a hack
|
|
// removeFlushEntry is needed when we need to discard queued up responses
|
|
// for our pings as part of a flush call. This happens when we have a flush
|
|
// call outstanding and we call close.
|
|
func (nc *Conn) removeFlushEntry(ch chan struct{}) bool {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
if nc.pongs == nil {
|
|
return false
|
|
}
|
|
for i, c := range nc.pongs {
|
|
if c == ch {
|
|
nc.pongs[i] = nil
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// The lock must be held entering this function.
|
|
func (nc *Conn) sendPing(ch chan struct{}) {
|
|
nc.pongs = append(nc.pongs, ch)
|
|
nc.bw.WriteString(pingProto)
|
|
// Flush in place.
|
|
nc.bw.Flush()
|
|
}
|
|
|
|
// This will fire periodically and send a client origin
|
|
// ping to the server. Will also check that we have received
|
|
// responses from the server.
|
|
func (nc *Conn) processPingTimer() {
|
|
nc.mu.Lock()
|
|
|
|
if nc.status != CONNECTED {
|
|
nc.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Check for violation
|
|
nc.pout++
|
|
if nc.pout > nc.Opts.MaxPingsOut {
|
|
nc.mu.Unlock()
|
|
nc.processOpErr(ErrStaleConnection)
|
|
return
|
|
}
|
|
|
|
nc.sendPing(nil)
|
|
nc.ptmr.Reset(nc.Opts.PingInterval)
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// FlushTimeout allows a Flush operation to have an associated timeout.
|
|
func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) {
|
|
if nc == nil {
|
|
return ErrInvalidConnection
|
|
}
|
|
if timeout <= 0 {
|
|
return ErrBadTimeout
|
|
}
|
|
|
|
nc.mu.Lock()
|
|
if nc.isClosed() {
|
|
nc.mu.Unlock()
|
|
return ErrConnectionClosed
|
|
}
|
|
t := globalTimerPool.Get(timeout)
|
|
defer globalTimerPool.Put(t)
|
|
|
|
// Create a buffered channel to prevent chan send to block
|
|
// in processPong() if this code here times out just when
|
|
// PONG was received.
|
|
ch := make(chan struct{}, 1)
|
|
nc.sendPing(ch)
|
|
nc.mu.Unlock()
|
|
|
|
select {
|
|
case _, ok := <-ch:
|
|
if !ok {
|
|
err = ErrConnectionClosed
|
|
} else {
|
|
close(ch)
|
|
}
|
|
case <-t.C:
|
|
err = ErrTimeout
|
|
}
|
|
|
|
if err != nil {
|
|
nc.removeFlushEntry(ch)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Flush will perform a round trip to the server and return when it
|
|
// receives the internal reply.
|
|
func (nc *Conn) Flush() error {
|
|
return nc.FlushTimeout(60 * time.Second)
|
|
}
|
|
|
|
// Buffered will return the number of bytes buffered to be sent to the server.
|
|
// FIXME(dlc) take into account disconnected state.
|
|
func (nc *Conn) Buffered() (int, error) {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
if nc.isClosed() || nc.bw == nil {
|
|
return -1, ErrConnectionClosed
|
|
}
|
|
return nc.bw.Buffered(), nil
|
|
}
|
|
|
|
// resendSubscriptions will send our subscription state back to the
|
|
// server. Used in reconnects
|
|
func (nc *Conn) resendSubscriptions() {
|
|
// Since we are going to send protocols to the server, we don't want to
|
|
// be holding the subsMu lock (which is used in processMsg). So copy
|
|
// the subscriptions in a temporary array.
|
|
nc.subsMu.RLock()
|
|
subs := make([]*Subscription, 0, len(nc.subs))
|
|
for _, s := range nc.subs {
|
|
subs = append(subs, s)
|
|
}
|
|
nc.subsMu.RUnlock()
|
|
for _, s := range subs {
|
|
adjustedMax := uint64(0)
|
|
s.mu.Lock()
|
|
if s.max > 0 {
|
|
if s.delivered < s.max {
|
|
adjustedMax = s.max - s.delivered
|
|
}
|
|
|
|
// adjustedMax could be 0 here if the number of delivered msgs
|
|
// reached the max, if so unsubscribe.
|
|
if adjustedMax == 0 {
|
|
s.mu.Unlock()
|
|
fmt.Fprintf(nc.bw, unsubProto, s.sid, _EMPTY_)
|
|
continue
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
fmt.Fprintf(nc.bw, subProto, s.Subject, s.Queue, s.sid)
|
|
if adjustedMax > 0 {
|
|
maxStr := strconv.Itoa(int(adjustedMax))
|
|
fmt.Fprintf(nc.bw, unsubProto, s.sid, maxStr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// This will clear any pending flush calls and release pending calls.
|
|
// Lock is assumed to be held by the caller.
|
|
func (nc *Conn) clearPendingFlushCalls() {
|
|
// Clear any queued pongs, e.g. pending flush calls.
|
|
for _, ch := range nc.pongs {
|
|
if ch != nil {
|
|
close(ch)
|
|
}
|
|
}
|
|
nc.pongs = nil
|
|
}
|
|
|
|
// This will clear any pending Request calls.
|
|
// Lock is assumed to be held by the caller.
|
|
func (nc *Conn) clearPendingRequestCalls() {
|
|
if nc.respMap == nil {
|
|
return
|
|
}
|
|
for key, ch := range nc.respMap {
|
|
if ch != nil {
|
|
close(ch)
|
|
delete(nc.respMap, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Low level close call that will do correct cleanup and set
|
|
// desired status. Also controls whether user defined callbacks
|
|
// will be triggered. The lock should not be held entering this
|
|
// function. This function will handle the locking manually.
|
|
func (nc *Conn) close(status Status, doCBs bool) {
|
|
nc.mu.Lock()
|
|
if nc.isClosed() {
|
|
nc.status = status
|
|
nc.mu.Unlock()
|
|
return
|
|
}
|
|
nc.status = CLOSED
|
|
|
|
// Kick the Go routines so they fall out.
|
|
nc.kickFlusher()
|
|
nc.mu.Unlock()
|
|
|
|
nc.mu.Lock()
|
|
|
|
// Clear any queued pongs, e.g. pending flush calls.
|
|
nc.clearPendingFlushCalls()
|
|
|
|
// Clear any queued and blocking Requests.
|
|
nc.clearPendingRequestCalls()
|
|
|
|
// Stop ping timer if set.
|
|
nc.stopPingTimer()
|
|
nc.ptmr = nil
|
|
|
|
// Go ahead and make sure we have flushed the outbound
|
|
if nc.conn != nil {
|
|
nc.bw.Flush()
|
|
defer nc.conn.Close()
|
|
}
|
|
|
|
// Close sync subscriber channels and release any
|
|
// pending NextMsg() calls.
|
|
nc.subsMu.Lock()
|
|
for _, s := range nc.subs {
|
|
s.mu.Lock()
|
|
|
|
// Release callers on NextMsg for SyncSubscription only
|
|
if s.mch != nil && s.typ == SyncSubscription {
|
|
close(s.mch)
|
|
}
|
|
s.mch = nil
|
|
// Mark as invalid, for signaling to deliverMsgs
|
|
s.closed = true
|
|
// Mark connection closed in subscription
|
|
s.connClosed = true
|
|
// If we have an async subscription, signals it to exit
|
|
if s.typ == AsyncSubscription && s.pCond != nil {
|
|
s.pCond.Signal()
|
|
}
|
|
|
|
s.mu.Unlock()
|
|
}
|
|
nc.subs = nil
|
|
nc.subsMu.Unlock()
|
|
|
|
nc.status = status
|
|
|
|
// Perform appropriate callback if needed for a disconnect.
|
|
if doCBs {
|
|
if nc.Opts.DisconnectedCB != nil && nc.conn != nil {
|
|
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
|
|
}
|
|
if nc.Opts.ClosedCB != nil {
|
|
nc.ach.push(func() { nc.Opts.ClosedCB(nc) })
|
|
}
|
|
nc.ach.close()
|
|
}
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// Close will close the connection to the server. This call will release
|
|
// all blocking calls, such as Flush() and NextMsg()
|
|
func (nc *Conn) Close() {
|
|
nc.close(CLOSED, true)
|
|
}
|
|
|
|
// IsClosed tests if a Conn has been closed.
|
|
func (nc *Conn) IsClosed() bool {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.isClosed()
|
|
}
|
|
|
|
// IsReconnecting tests if a Conn is reconnecting.
|
|
func (nc *Conn) IsReconnecting() bool {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.isReconnecting()
|
|
}
|
|
|
|
// IsConnected tests if a Conn is connected.
|
|
func (nc *Conn) IsConnected() bool {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.isConnected()
|
|
}
|
|
|
|
// drainConnection will run in a separate Go routine and will
|
|
// flush all publishes and drain all active subscriptions.
|
|
func (nc *Conn) drainConnection() {
|
|
// Snapshot subs list.
|
|
nc.mu.Lock()
|
|
subs := make([]*Subscription, 0, len(nc.subs))
|
|
for _, s := range nc.subs {
|
|
subs = append(subs, s)
|
|
}
|
|
errCB := nc.Opts.AsyncErrorCB
|
|
drainWait := nc.Opts.DrainTimeout
|
|
nc.mu.Unlock()
|
|
|
|
// for pushing errors with context.
|
|
pushErr := func(err error) {
|
|
nc.mu.Lock()
|
|
if errCB != nil {
|
|
nc.ach.push(func() { errCB(nc, nil, err) })
|
|
}
|
|
nc.mu.Unlock()
|
|
}
|
|
|
|
// Do subs first
|
|
for _, s := range subs {
|
|
if err := s.Drain(); err != nil {
|
|
// We will notify about these but continue.
|
|
pushErr(err)
|
|
}
|
|
}
|
|
|
|
// Wait for the subscriptions to drop to zero.
|
|
timeout := time.Now().Add(drainWait)
|
|
for time.Now().Before(timeout) {
|
|
if nc.NumSubscriptions() == 0 {
|
|
break
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
// Check if we timed out.
|
|
if nc.NumSubscriptions() != 0 {
|
|
pushErr(ErrDrainTimeout)
|
|
}
|
|
|
|
// Flip State
|
|
nc.mu.Lock()
|
|
nc.status = DRAINING_PUBS
|
|
nc.mu.Unlock()
|
|
|
|
// Do publish drain via Flush() call.
|
|
err := nc.Flush()
|
|
if err != nil {
|
|
pushErr(err)
|
|
nc.Close()
|
|
return
|
|
}
|
|
|
|
// Move to closed state.
|
|
nc.Close()
|
|
}
|
|
|
|
// Drain will put a connection into a drain state. All subscriptions will
|
|
// immediately be put into a drain state. Upon completion, the publishers
|
|
// will be drained and can not publish any additional messages. Upon draining
|
|
// of the publishers, the connection will be closed. Use the ClosedCB()
|
|
// option to know when the connection has moved from draining to closed.
|
|
func (nc *Conn) Drain() error {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
|
|
if nc.isClosed() {
|
|
return ErrConnectionClosed
|
|
}
|
|
if nc.isConnecting() || nc.isReconnecting() {
|
|
return ErrConnectionReconnecting
|
|
}
|
|
if nc.isDraining() {
|
|
return nil
|
|
}
|
|
|
|
nc.status = DRAINING_SUBS
|
|
go nc.drainConnection()
|
|
return nil
|
|
}
|
|
|
|
// IsDraining tests if a Conn is in the draining state.
|
|
func (nc *Conn) IsDraining() bool {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.isDraining()
|
|
}
|
|
|
|
// caller must lock
|
|
func (nc *Conn) getServers(implicitOnly bool) []string {
|
|
poolSize := len(nc.srvPool)
|
|
var servers = make([]string, 0)
|
|
for i := 0; i < poolSize; i++ {
|
|
if implicitOnly && !nc.srvPool[i].isImplicit {
|
|
continue
|
|
}
|
|
url := nc.srvPool[i].url
|
|
servers = append(servers, fmt.Sprintf("%s://%s", url.Scheme, url.Host))
|
|
}
|
|
return servers
|
|
}
|
|
|
|
// Servers returns the list of known server urls, including additional
|
|
// servers discovered after a connection has been established. If
|
|
// authentication is enabled, use UserInfo or Token when connecting with
|
|
// these urls.
|
|
func (nc *Conn) Servers() []string {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.getServers(false)
|
|
}
|
|
|
|
// DiscoveredServers returns only the server urls that have been discovered
|
|
// after a connection has been established. If authentication is enabled,
|
|
// use UserInfo or Token when connecting with these urls.
|
|
func (nc *Conn) DiscoveredServers() []string {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.getServers(true)
|
|
}
|
|
|
|
// Status returns the current state of the connection.
|
|
func (nc *Conn) Status() Status {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.status
|
|
}
|
|
|
|
// Test if Conn has been closed Lock is assumed held.
|
|
func (nc *Conn) isClosed() bool {
|
|
return nc.status == CLOSED
|
|
}
|
|
|
|
// Test if Conn is in the process of connecting
|
|
func (nc *Conn) isConnecting() bool {
|
|
return nc.status == CONNECTING
|
|
}
|
|
|
|
// Test if Conn is being reconnected.
|
|
func (nc *Conn) isReconnecting() bool {
|
|
return nc.status == RECONNECTING
|
|
}
|
|
|
|
// Test if Conn is connected or connecting.
|
|
func (nc *Conn) isConnected() bool {
|
|
return nc.status == CONNECTED || nc.isDraining()
|
|
}
|
|
|
|
// Test if Conn is in the draining state.
|
|
func (nc *Conn) isDraining() bool {
|
|
return nc.status == DRAINING_SUBS || nc.status == DRAINING_PUBS
|
|
}
|
|
|
|
// Test if Conn is in the draining state for pubs.
|
|
func (nc *Conn) isDrainingPubs() bool {
|
|
return nc.status == DRAINING_PUBS
|
|
}
|
|
|
|
// Stats will return a race safe copy of the Statistics section for the connection.
|
|
func (nc *Conn) Stats() Statistics {
|
|
// Stats are updated either under connection's mu or subsMu mutexes.
|
|
// Lock both to safely get them.
|
|
nc.mu.Lock()
|
|
nc.subsMu.RLock()
|
|
stats := Statistics{
|
|
InMsgs: nc.InMsgs,
|
|
InBytes: nc.InBytes,
|
|
OutMsgs: nc.OutMsgs,
|
|
OutBytes: nc.OutBytes,
|
|
Reconnects: nc.Reconnects,
|
|
}
|
|
nc.subsMu.RUnlock()
|
|
nc.mu.Unlock()
|
|
return stats
|
|
}
|
|
|
|
// MaxPayload returns the size limit that a message payload can have.
|
|
// This is set by the server configuration and delivered to the client
|
|
// upon connect.
|
|
func (nc *Conn) MaxPayload() int64 {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.info.MaxPayload
|
|
}
|
|
|
|
// AuthRequired will return if the connected server requires authorization.
|
|
func (nc *Conn) AuthRequired() bool {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.info.AuthRequired
|
|
}
|
|
|
|
// TLSRequired will return if the connected server requires TLS connections.
|
|
func (nc *Conn) TLSRequired() bool {
|
|
nc.mu.Lock()
|
|
defer nc.mu.Unlock()
|
|
return nc.info.TLSRequired
|
|
}
|
|
|
|
// Barrier schedules the given function `f` to all registered asynchronous
|
|
// subscriptions.
|
|
// Only the last subscription to see this barrier will invoke the function.
|
|
// If no subscription is registered at the time of this call, `f()` is invoked
|
|
// right away.
|
|
// ErrConnectionClosed is returned if the connection is closed prior to
|
|
// the call.
|
|
func (nc *Conn) Barrier(f func()) error {
|
|
nc.mu.Lock()
|
|
if nc.isClosed() {
|
|
nc.mu.Unlock()
|
|
return ErrConnectionClosed
|
|
}
|
|
nc.subsMu.Lock()
|
|
// Need to figure out how many non chan subscriptions there are
|
|
numSubs := 0
|
|
for _, sub := range nc.subs {
|
|
if sub.typ == AsyncSubscription {
|
|
numSubs++
|
|
}
|
|
}
|
|
if numSubs == 0 {
|
|
nc.subsMu.Unlock()
|
|
nc.mu.Unlock()
|
|
f()
|
|
return nil
|
|
}
|
|
barrier := &barrierInfo{refs: int64(numSubs), f: f}
|
|
for _, sub := range nc.subs {
|
|
sub.mu.Lock()
|
|
if sub.mch == nil {
|
|
msg := &Msg{barrier: barrier}
|
|
// Push onto the async pList
|
|
if sub.pTail != nil {
|
|
sub.pTail.next = msg
|
|
} else {
|
|
sub.pHead = msg
|
|
sub.pCond.Signal()
|
|
}
|
|
sub.pTail = msg
|
|
}
|
|
sub.mu.Unlock()
|
|
}
|
|
nc.subsMu.Unlock()
|
|
nc.mu.Unlock()
|
|
return nil
|
|
}
|