package mqtt import ( "context" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/sirupsen/logrus" ) const ( DefaultPubQoS = 0 DefaultSubQoS = 0 ) type Hub struct { conn *connection } type Subscriber struct { OnMessage chan mqtt.Message OnError chan error } type Publisher struct { OnError chan error publish chan *frame } type frame struct { topic string qos int payload []byte retain bool } func NewHub(conf *Config) *Hub { return &Hub{ conn: newConnection(conf), } } // Connect to MQTT server func (hub *Hub) Connect(ctx context.Context) error { if err := hub.conn.connect(); err != nil { return err } go func(ctx context.Context) { defer logrus.Warn("hub closed MQTT connection") for { select { case <-ctx.Done(): hub.conn.client.Disconnect(1000) return } } }(ctx) return nil } // Subscribe will create MQTT subscriber and listen for messages. // Messages and errors are sent to OnMessage and OnError channels. func (hub *Hub) Subscribe(ctx context.Context, topic string, qos int) *Subscriber { sub := &Subscriber{ OnMessage: make(chan mqtt.Message), OnError: make(chan error), } go func(ctx context.Context, sub *Subscriber) { defer func() { close(sub.OnMessage) close(sub.OnError) }() if token := hub.conn.subscribe(topic, byte(qos), func(mqttClient mqtt.Client, message mqtt.Message) { sub.OnMessage <- message }); token.Wait() && token.Error() != nil { sub.OnError <- token.Error() } <-ctx.Done() }(ctx, sub) return sub } // Publisher will create MQTT publisher and private listener for messages to be published. // All messages to be published are sent through private publish channel. // Errors will be sent to OnError channel. func (hub *Hub) Publisher(ctx context.Context) *Publisher { pub := &Publisher{ OnError: make(chan error), publish: make(chan *frame), } go func(ctx context.Context, pub *Publisher) { defer close(pub.OnError) for { select { case fr := <-pub.publish: if token := hub.conn.publish(fr.topic, byte(fr.qos), fr.payload, fr.retain); token.Wait() && token.Error() != nil { pub.OnError <- token.Error() } case <-ctx.Done(): return } } }(ctx, pub) return pub } // Publish message to topic through private pub.publish channel. // Thread-safe. func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool) { pub.publish <- &frame{ topic: topic, qos: qos, payload: message, retain: retain, } }