package mqtt import ( "context" "time" paho "github.com/eclipse/paho.mqtt.golang" ) // copy the paho lib Message interface // so no need for deps to import paho lib type Message interface { Duplicate() bool Qos() byte Retained() bool Topic() string MessageID() uint16 Payload() []byte Ack() } type PubMessage interface { Topic() string Payload() []byte Qos() byte Retained() bool } type message struct { qos int retained bool topic string payload []byte } func (m *message) Qos() byte { return byte(m.qos) } func (m *message) Retained() bool { return m.retained } func (m *message) Topic() string { return m.topic } func (m *message) Payload() []byte { return m.payload } func NewPubMessage(topic string, payload []byte, qos int, retained bool) *message { return &message{ topic: topic, payload: payload, qos: qos, retained: retained, } } type Config struct { Host string Port string Username string Password string ClientID string CleanSession bool AutoReconnect bool Retained bool KeepAlive time.Duration MsgChanDept uint WillMessage PubMessage } type Client struct { Config Config pahoClient paho.Client } func New(conf Config) *Client { opts := paho.NewClientOptions() broker := conf.Host + ":" + conf.Port opts.AddBroker(broker) opts.SetClientID(conf.ClientID) opts.SetUsername(conf.Username) opts.SetPassword(conf.Password) opts.SetCleanSession(conf.CleanSession) opts.SetAutoReconnect(conf.AutoReconnect) opts.SetKeepAlive(conf.KeepAlive) opts.SetMessageChannelDepth(conf.MsgChanDept) if conf.WillMessage != nil { m := conf.WillMessage opts.SetBinaryWill(m.Topic(), m.Payload(), m.Qos(), m.Retained()) } if conf.AutoReconnect { opts.SetResumeSubs(true) } client := paho.NewClient(opts) return &Client{ Config: conf, pahoClient: client, } } func (c *Client) Connect(ctx context.Context) error { if token := c.pahoClient.Connect(); token.Wait() && token.Error() != nil { return token.Error() } go func(ctx context.Context) { //TODO remove the select, only wait on signal chanel for { select { case <-ctx.Done(): c.pahoClient.Disconnect(1000) return } } }(ctx) return nil } type Subscriber struct { OnMessage chan Message OnError chan error } func (c *Client) Subscribe(ctx context.Context, topic string, qos int) *Subscriber { sub := &Subscriber{ OnMessage: make(chan Message), OnError: make(chan error), } go func(ctx context.Context, sub *Subscriber) { defer func() { close(sub.OnMessage) close(sub.OnError) }() if token := c.pahoClient.Subscribe(topic, byte(qos), func(mqttClient paho.Client, message paho.Message) { sub.OnMessage <- message }); token.Wait() && token.Error() != nil { sub.OnError <- token.Error() } <-ctx.Done() }(ctx, sub) return sub } type frame struct { topic string qos int payload []byte retain bool } type Publisher struct { OnError chan error publish chan *frame } // Publisher will create MQTT publisher and private listener for messages to be published. // All messages to be published are sent through private publish channel. // Errors will be sent to OnError channel. func (c *Client) Publisher(ctx context.Context) *Publisher { pub := &Publisher{ OnError: make(chan error), publish: make(chan *frame), } go func(ctx context.Context, pub *Publisher) { defer close(pub.OnError) for { select { case fr := <-pub.publish: if token := c.pahoClient. Publish(fr.topic, byte(fr.qos), fr.retain, fr.payload); token.Wait() && token.Error() != nil { pub.OnError <- token.Error() } case <-ctx.Done(): return } } }(ctx, pub) return pub } // Publish message to topic through private pub.publish channel. // Thread-safe. func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool) { pub.publish <- &frame{ topic: topic, qos: qos, payload: message, retain: retain, } }