Added QoS and retained flags

This commit is contained in:
Pavel Makarenko 2017-03-08 00:39:49 +03:00
parent 9e0c8fc28e
commit cf8afa3617
2 changed files with 37 additions and 1 deletions

View File

@ -53,6 +53,8 @@ type Endpoint struct {
Host string Host string
Port int Port int
QueueName string QueueName string
Qos byte
Retained bool
} }
} }
@ -328,6 +330,40 @@ func parseEndpoint(s string) (Endpoint, error) {
} }
} }
// Parsing additional params
if len(sqp) > 1 {
m, err := url.ParseQuery(sqp[1])
if err != nil {
return endpoint, errors.New("invalid MQTT url")
}
for key, val := range m {
if len(val) == 0 {
continue
}
switch key {
case "qos":
n, err := strconv.ParseUint(val[0], 10, 8)
if err != nil {
return endpoint, errors.New("invalid MQTT qos value")
}
endpoint.MQTT.Qos = byte(n)
case "retained":
n, err := strconv.ParseUint(val[0], 10, 8)
if err != nil {
return endpoint, errors.New("invalid MQTT retained value")
}
if n != 1 && n != 0 {
return endpoint, errors.New("invalid MQTT retained, should be [0, 1]")
}
if n == 1 {
endpoint.MQTT.Retained = true
}
}
}
}
// Throw error if we not provide any queue name // Throw error if we not provide any queue name
if endpoint.MQTT.QueueName == "" { if endpoint.MQTT.QueueName == "" {
return endpoint, errors.New("missing MQTT topic name") return endpoint, errors.New("missing MQTT topic name")

View File

@ -63,7 +63,7 @@ func (conn *MQTTEndpointConn) Send(msg string) error {
conn.conn = c conn.conn = c
} }
t := conn.conn.Publish(conn.ep.MQTT.QueueName, 0, false, msg) t := conn.conn.Publish(conn.ep.MQTT.QueueName, conn.ep.MQTT.Qos, conn.ep.MQTT.Retained, msg)
t.Wait() t.Wait()
if t.Error() != nil { if t.Error() != nil {