From cf8afa3617a9e5c3f7800c1377c2553a966c377b Mon Sep 17 00:00:00 2001 From: Pavel Makarenko Date: Wed, 8 Mar 2017 00:39:49 +0300 Subject: [PATCH] Added QoS and retained flags --- controller/endpoint/endpoint.go | 36 +++++++++++++++++++++++++++++++++ controller/endpoint/mqtt.go | 2 +- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/controller/endpoint/endpoint.go b/controller/endpoint/endpoint.go index ec35e746..8ec4a240 100644 --- a/controller/endpoint/endpoint.go +++ b/controller/endpoint/endpoint.go @@ -53,6 +53,8 @@ type Endpoint struct { Host string Port int QueueName string + Qos byte + Retained bool } } @@ -328,6 +330,40 @@ func parseEndpoint(s string) (Endpoint, error) { } } + // Parsing additional params + if len(sqp) > 1 { + m, err := url.ParseQuery(sqp[1]) + if err != nil { + return endpoint, errors.New("invalid MQTT url") + } + for key, val := range m { + if len(val) == 0 { + continue + } + switch key { + case "qos": + n, err := strconv.ParseUint(val[0], 10, 8) + if err != nil { + return endpoint, errors.New("invalid MQTT qos value") + } + endpoint.MQTT.Qos = byte(n) + case "retained": + n, err := strconv.ParseUint(val[0], 10, 8) + if err != nil { + return endpoint, errors.New("invalid MQTT retained value") + } + + if n != 1 && n != 0 { + return endpoint, errors.New("invalid MQTT retained, should be [0, 1]") + } + + if n == 1 { + endpoint.MQTT.Retained = true + } + } + } + } + // Throw error if we not provide any queue name if endpoint.MQTT.QueueName == "" { return endpoint, errors.New("missing MQTT topic name") diff --git a/controller/endpoint/mqtt.go b/controller/endpoint/mqtt.go index 8b91e596..69a6b120 100644 --- a/controller/endpoint/mqtt.go +++ b/controller/endpoint/mqtt.go @@ -63,7 +63,7 @@ func (conn *MQTTEndpointConn) Send(msg string) error { conn.conn = c } - t := conn.conn.Publish(conn.ep.MQTT.QueueName, 0, false, msg) + t := conn.conn.Publish(conn.ep.MQTT.QueueName, conn.ep.MQTT.Qos, conn.ep.MQTT.Retained, msg) t.Wait() if t.Error() != nil {