From 0d83d14329e60a4adf7b76d40b023dec0998cc9e Mon Sep 17 00:00:00 2001 From: Nicolas Duhamel Date: Fri, 28 Oct 2022 20:40:23 +0200 Subject: [PATCH] use git.quimbo.fr/nicolas/mqtt lib --- go.mod | 3 +- go.sum | 4 ++ main.go | 26 ++++---- mqtt/config.go | 62 ----------------- mqtt/connection.go | 52 --------------- mqtt/hub.go | 125 ----------------------------------- pkg/device/device.go | 2 +- pkg/device/device_manager.go | 2 +- 8 files changed, 21 insertions(+), 255 deletions(-) delete mode 100644 mqtt/config.go delete mode 100644 mqtt/connection.go delete mode 100644 mqtt/hub.go diff --git a/go.mod b/go.mod index 6d990ca..16a642a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module citadel/heater go 1.19 require ( - github.com/eclipse/paho.mqtt.golang v1.4.1 // indirect + git.quimbo.fr/nicolas/mqtt v0.0.0-20221028183316-e2ef561b6124 // indirect + github.com/eclipse/paho.mqtt.golang v1.4.2 // indirect github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect diff --git a/go.sum b/go.sum index 4d1e977..a315492 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,12 @@ +git.quimbo.fr/nicolas/mqtt v0.0.0-20221028183316-e2ef561b6124 h1:GxY5G5Hxb+8dPQDFpStfyLvliSDucc7FhEjZ06muG/8= +git.quimbo.fr/nicolas/mqtt v0.0.0-20221028183316-e2ef561b6124/go.mod h1:Cn+S3mXDNIIcWH0xdvLChzcPZO50aZMfEH6jgl8+BhA= github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI= github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= +github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2 h1:I5vl2HHSkUSZrtZtof5nRg2a1n5Arih1GqoGwGCwb7A= github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2/go.mod h1:2geROsfVUlQLJCmrUgpgDzlf+OSrHp4X18Bj4kTTRk0= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= diff --git a/main.go b/main.go index 30fce70..e689367 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "citadel/heater/mqtt" "citadel/heater/pkg/device" "context" "encoding/json" @@ -12,7 +11,8 @@ import ( "syscall" "time" - mqttpaho "github.com/eclipse/paho.mqtt.golang" + "git.quimbo.fr/nicolas/mqtt" + "github.com/rs/zerolog" ) @@ -25,15 +25,15 @@ type Config struct { } type App struct { - mqtt_hub *mqtt.Hub - ctx context.Context + mqtt *mqtt.Client + ctx context.Context DeviceManager *device.DeviceManager PubChan chan device.Message } func NewApp(conf Config) *App { - hub := mqtt.NewHub(&mqtt.Config{ + client := mqtt.New(mqtt.Config{ Host: conf.Host, Port: conf.Port, Username: conf.Username, @@ -48,8 +48,8 @@ func NewApp(conf Config) *App { pubchan := make(chan device.Message, 10) app := &App{ - mqtt_hub: hub, - PubChan: pubchan, + mqtt: client, + PubChan: pubchan, } app.DeviceManager = device.NewDeviceManager(pubchan, app.subscribe) return app @@ -58,7 +58,7 @@ func NewApp(conf Config) *App { func (app *App) subscribe( topic string, qos int, - callback func(context.Context, mqttpaho.Message), + callback func(context.Context, mqtt.Message), ) context.CancelFunc { ctx, cancelFunc := context.WithCancel(app.ctx) @@ -82,7 +82,7 @@ func (app *App) subscribe( return } } - }(ctx, app.mqtt_hub.Subscribe(ctx, topic, qos)) + }(ctx, app.mqtt.Subscribe(ctx, topic, qos)) log.Info().Str("topic", topic).Msg("subscribe") return cancelFunc } @@ -107,7 +107,7 @@ func (app *App) runTicker() { } -func (app *App) onSettingsMessage(ctx context.Context, msg mqttpaho.Message) { +func (app *App) onSettingsMessage(ctx context.Context, msg mqtt.Message) { // callback for topic /{prefix}/{tvr_id}/settings change's device_name := strings.Split(msg.Topic(), "/")[1] @@ -151,7 +151,7 @@ func (app *App) onSettingsMessage(ctx context.Context, msg mqttpaho.Message) { } } -func (app *App) onSetStateMessage(ctx context.Context, msg mqttpaho.Message) { +func (app *App) onSetStateMessage(ctx context.Context, msg mqtt.Message) { // callback for topic /{prefix}/{tvr_id}/state/set change's device_name := strings.Split(msg.Topic(), "/")[1] @@ -191,7 +191,7 @@ func (app *App) Run() { defer ctxCancel() - if err := app.mqtt_hub.Connect(ctx); err != nil { + if err := app.mqtt.Connect(ctx); err != nil { log.Fatal().Err(err) } @@ -231,7 +231,7 @@ func (app *App) Run() { return } } - }(ctx, app.mqtt_hub.Publisher(ctx)) + }(ctx, app.mqtt.Publisher(ctx)) app.runTicker() diff --git a/mqtt/config.go b/mqtt/config.go deleted file mode 100644 index 0d76c09..0000000 --- a/mqtt/config.go +++ /dev/null @@ -1,62 +0,0 @@ -package mqtt - -import ( - "os" - "strings" - "time" - - "github.com/google/uuid" -) - -type Config struct { - Host string - Port string - Username string - Password string - ClientID string - CleanSession bool - AutoReconnect bool - Retained bool - KeepAlive time.Duration - MsgChanDept uint -} - -func NewConfig() *Config { - host := os.Getenv("MQTT_HOST") - if strings.TrimSpace(host) == "" { - host = "localhost" - } - - port := os.Getenv("MQTT_PORT") - if strings.TrimSpace(port) == "" { - port = "1883" - } - - username := os.Getenv("MQTT_USERNAME") - if strings.TrimSpace(username) == "" { - username = "guest" - } - - password := os.Getenv("MQTT_PASSWORD") - if strings.TrimSpace(password) == "" { - password = "guest" - } - - clientId := os.Getenv("MQTT_CLIENT_ID") - if strings.TrimSpace(clientId) == "" { - clientId = uuid.New().String() - } - - return &Config{ - Host: host, - Port: port, - Username: username, - Password: password, - ClientID: clientId, - CleanSession: true, - AutoReconnect: true, - Retained: false, - KeepAlive: 15 * time.Second, - MsgChanDept: 100, - } -} diff --git a/mqtt/connection.go b/mqtt/connection.go deleted file mode 100644 index b8c14d5..0000000 --- a/mqtt/connection.go +++ /dev/null @@ -1,52 +0,0 @@ -package mqtt - -import ( - mqttLib "github.com/eclipse/paho.mqtt.golang" -) - -type connection struct { - conf *Config - client mqttLib.Client -} - -func newConnection(conf *Config) *connection { - conn := &connection{ - conf: conf, - } - - opts := mqttLib.NewClientOptions() - - broker := conn.conf.Host + ":" + conn.conf.Port - - opts.AddBroker(broker) - opts.SetClientID(conn.conf.ClientID) - opts.SetUsername(conn.conf.Username) - opts.SetPassword(conn.conf.Password) - opts.SetCleanSession(conn.conf.CleanSession) - opts.SetAutoReconnect(conn.conf.AutoReconnect) - opts.SetKeepAlive(conn.conf.KeepAlive) - opts.SetMessageChannelDepth(conn.conf.MsgChanDept) - if conn.conf.AutoReconnect { - opts.SetResumeSubs(true) - } - - conn.client = mqttLib.NewClient(opts) - - return conn -} - -func (conn *connection) connect() error { - if token := conn.client.Connect(); token.Wait() && token.Error() != nil { - return token.Error() - } - - return nil -} - -func (conn *connection) publish(topic string, qos byte, payload []byte, retain bool) mqttLib.Token { - return conn.client.Publish(topic, qos, retain, payload) -} - -func (conn *connection) subscribe(topic string, qos byte, callback func(mqttClient mqttLib.Client, message mqttLib.Message)) mqttLib.Token { - return conn.client.Subscribe(topic, qos, callback) -} diff --git a/mqtt/hub.go b/mqtt/hub.go deleted file mode 100644 index 3bac8a3..0000000 --- a/mqtt/hub.go +++ /dev/null @@ -1,125 +0,0 @@ -package mqtt - -import ( - "context" - - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/sirupsen/logrus" -) - -const ( - DefaultPubQoS = 0 - DefaultSubQoS = 0 -) - -type Hub struct { - conn *connection -} - -type Subscriber struct { - OnMessage chan mqtt.Message - OnError chan error -} - -type Publisher struct { - OnError chan error - publish chan *frame -} - -type frame struct { - topic string - qos int - payload []byte - retain bool -} - -func NewHub(conf *Config) *Hub { - return &Hub{ - conn: newConnection(conf), - } -} - -// Connect to MQTT server -func (hub *Hub) Connect(ctx context.Context) error { - if err := hub.conn.connect(); err != nil { - return err - } - - go func(ctx context.Context) { - defer logrus.Warn("hub closed MQTT connection") - - for { - select { - case <-ctx.Done(): - hub.conn.client.Disconnect(1000) - return - } - } - }(ctx) - - return nil -} - -// Subscribe will create MQTT subscriber and listen for messages. -// Messages and errors are sent to OnMessage and OnError channels. -func (hub *Hub) Subscribe(ctx context.Context, topic string, qos int) *Subscriber { - sub := &Subscriber{ - OnMessage: make(chan mqtt.Message), - OnError: make(chan error), - } - - go func(ctx context.Context, sub *Subscriber) { - defer func() { - close(sub.OnMessage) - close(sub.OnError) - }() - - if token := hub.conn.subscribe(topic, byte(qos), func(mqttClient mqtt.Client, message mqtt.Message) { - sub.OnMessage <- message - }); token.Wait() && token.Error() != nil { - sub.OnError <- token.Error() - } - - <-ctx.Done() - }(ctx, sub) - - return sub -} - -// Publisher will create MQTT publisher and private listener for messages to be published. -// All messages to be published are sent through private publish channel. -// Errors will be sent to OnError channel. -func (hub *Hub) Publisher(ctx context.Context) *Publisher { - pub := &Publisher{ - OnError: make(chan error), - publish: make(chan *frame), - } - - go func(ctx context.Context, pub *Publisher) { - defer close(pub.OnError) - - for { - select { - case fr := <-pub.publish: - if token := hub.conn.publish(fr.topic, byte(fr.qos), fr.payload, fr.retain); token.Wait() && token.Error() != nil { - pub.OnError <- token.Error() - } - case <-ctx.Done(): - return - } - } - }(ctx, pub) - - return pub -} - -// Publish message to topic through private pub.publish channel. -// Thread-safe. -func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool) { - pub.publish <- &frame{ - topic: topic, - qos: qos, - payload: message, - retain: retain, - } -} diff --git a/pkg/device/device.go b/pkg/device/device.go index c30a7a0..9db1b22 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - mqtt "github.com/eclipse/paho.mqtt.golang" + "git.quimbo.fr/nicolas/mqtt" "github.com/ohler55/ojg/jp" "github.com/ohler55/ojg/oj" "github.com/rs/zerolog" diff --git a/pkg/device/device_manager.go b/pkg/device/device_manager.go index 44239f4..fd057d7 100644 --- a/pkg/device/device_manager.go +++ b/pkg/device/device_manager.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - mqtt "github.com/eclipse/paho.mqtt.golang" + "git.quimbo.fr/nicolas/mqtt" "github.com/rs/zerolog" )