heater/mqtt/hub.go
2022-10-28 19:38:23 +02:00

126 lines
2.5 KiB
Go

package mqtt
import (
"context"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/sirupsen/logrus"
)
const (
DefaultPubQoS = 0
DefaultSubQoS = 0
)
type Hub struct {
conn *connection
}
type Subscriber struct {
OnMessage chan mqtt.Message
OnError chan error
}
type Publisher struct {
OnError chan error
publish chan *frame
}
type frame struct {
topic string
qos int
payload []byte
retain bool
}
func NewHub(conf *Config) *Hub {
return &Hub{
conn: newConnection(conf),
}
}
// Connect to MQTT server
func (hub *Hub) Connect(ctx context.Context) error {
if err := hub.conn.connect(); err != nil {
return err
}
go func(ctx context.Context) {
defer logrus.Warn("hub closed MQTT connection")
for {
select {
case <-ctx.Done():
hub.conn.client.Disconnect(1000)
return
}
}
}(ctx)
return nil
}
// Subscribe will create MQTT subscriber and listen for messages.
// Messages and errors are sent to OnMessage and OnError channels.
func (hub *Hub) Subscribe(ctx context.Context, topic string, qos int) *Subscriber {
sub := &Subscriber{
OnMessage: make(chan mqtt.Message),
OnError: make(chan error),
}
go func(ctx context.Context, sub *Subscriber) {
defer func() {
close(sub.OnMessage)
close(sub.OnError)
}()
if token := hub.conn.subscribe(topic, byte(qos), func(mqttClient mqtt.Client, message mqtt.Message) {
sub.OnMessage <- message
}); token.Wait() && token.Error() != nil {
sub.OnError <- token.Error()
}
<-ctx.Done()
}(ctx, sub)
return sub
}
// Publisher will create MQTT publisher and private listener for messages to be published.
// All messages to be published are sent through private publish channel.
// Errors will be sent to OnError channel.
func (hub *Hub) Publisher(ctx context.Context) *Publisher {
pub := &Publisher{
OnError: make(chan error),
publish: make(chan *frame),
}
go func(ctx context.Context, pub *Publisher) {
defer close(pub.OnError)
for {
select {
case fr := <-pub.publish:
if token := hub.conn.publish(fr.topic, byte(fr.qos), fr.payload, fr.retain); token.Wait() && token.Error() != nil {
pub.OnError <- token.Error()
}
case <-ctx.Done():
return
}
}
}(ctx, pub)
return pub
}
// Publish message to topic through private pub.publish channel.
// Thread-safe.
func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool) {
pub.publish <- &frame{
topic: topic,
qos: qos,
payload: message,
retain: retain,
}
}