Plamen Todorov fb2aef2ce6 MQTT clientId should be unique
Each mqtt hook establishes separate connection to the MQTT broker. If
their clientIds are all equal the MQTT broker will disconnect the clients - the
protocol does not allow 2 connected clients with the same name
2019-10-06 22:15:06 +03:00

116 lines
2.3 KiB
Go

package endpoint
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"sync"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
)
const (
mqttExpiresAfter = time.Second * 30
)
// MQTTConn is an endpoint connection
type MQTTConn struct {
mu sync.Mutex
ep Endpoint
conn paho.Client
ex bool
t time.Time
}
// Expired returns true if the connection has expired
func (conn *MQTTConn) Expired() bool {
conn.mu.Lock()
defer conn.mu.Unlock()
if !conn.ex {
if time.Now().Sub(conn.t) > mqttExpiresAfter {
conn.close()
conn.ex = true
}
}
return conn.ex
}
func (conn *MQTTConn) close() {
if conn.conn != nil {
if conn.conn.IsConnected() {
conn.conn.Disconnect(250)
}
conn.conn = nil
}
}
// Send sends a message
func (conn *MQTTConn) Send(msg string) error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ex {
return errExpired
}
conn.t = time.Now()
if conn.conn == nil {
uri := fmt.Sprintf("tcp://%s:%d", conn.ep.MQTT.Host, conn.ep.MQTT.Port)
ops := paho.NewClientOptions()
if conn.ep.MQTT.CertFile != "" || conn.ep.MQTT.KeyFile != "" ||
conn.ep.MQTT.CACertFile != "" {
var config tls.Config
if conn.ep.MQTT.CertFile != "" || conn.ep.MQTT.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(conn.ep.MQTT.CertFile,
conn.ep.MQTT.KeyFile)
if err != nil {
return err
}
config.Certificates = append(config.Certificates, cert)
}
if conn.ep.MQTT.CACertFile != "" {
// Load CA cert
caCert, err := ioutil.ReadFile(conn.ep.MQTT.CACertFile)
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
config.RootCAs = caCertPool
}
ops = ops.SetTLSConfig(&config)
}
nano := time.Now().UnixNano()
clientID := fmt.Sprintf("tile38_%x", nano) //the id of connected clients should be unique
ops = ops.SetClientID(clientID).AddBroker(uri)
c := paho.NewClient(ops)
if token := c.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
conn.conn = c
}
t := conn.conn.Publish(conn.ep.MQTT.QueueName, conn.ep.MQTT.Qos,
conn.ep.MQTT.Retained, msg)
t.Wait()
if t.Error() != nil {
conn.close()
return t.Error()
}
return nil
}
func newMQTTConn(ep Endpoint) *MQTTConn {
return &MQTTConn{
ep: ep,
t: time.Now(),
}
}