diff --git a/main.go b/main.go index f739eb4..30fce70 100644 --- a/main.go +++ b/main.go @@ -160,7 +160,7 @@ func (app *App) onSetStateMessage(ctx context.Context, msg mqttpaho.Message) { Str("action", "onSetStateMessage"). Str("device", device_name). Logger() - ctx = logger.WithContext(ctx) + // ctx = logger.WithContext(ctx) var state device.DeviceState if err := json.Unmarshal(msg.Payload(), &state); err != nil { diff --git a/mqtt/config.go b/mqtt/config.go index f01fe4f..0d76c09 100644 --- a/mqtt/config.go +++ b/mqtt/config.go @@ -1,10 +1,11 @@ package mqtt import ( - "github.com/google/uuid" "os" "strings" "time" + + "github.com/google/uuid" ) type Config struct { diff --git a/mqtt/connection.go b/mqtt/connection.go index be98a6c..b8c14d5 100644 --- a/mqtt/connection.go +++ b/mqtt/connection.go @@ -26,9 +26,9 @@ func newConnection(conf *Config) *connection { opts.SetAutoReconnect(conn.conf.AutoReconnect) opts.SetKeepAlive(conn.conf.KeepAlive) opts.SetMessageChannelDepth(conn.conf.MsgChanDept) - if conn.conf.AutoReconnect { - opts.SetResumeSubs(true) - } + if conn.conf.AutoReconnect { + opts.SetResumeSubs(true) + } conn.client = mqttLib.NewClient(opts) diff --git a/mqtt/hub.go b/mqtt/hub.go index a66a438..3bac8a3 100644 --- a/mqtt/hub.go +++ b/mqtt/hub.go @@ -2,6 +2,7 @@ package mqtt import ( "context" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/sirupsen/logrus" ) @@ -29,7 +30,7 @@ type frame struct { topic string qos int payload []byte - retain bool + retain bool } func NewHub(conf *Config) *Hub { @@ -73,13 +74,13 @@ func (hub *Hub) Subscribe(ctx context.Context, topic string, qos int) *Subscribe close(sub.OnError) }() - if token := hub.conn.subscribe(topic, byte(qos), func(mqttClient mqtt.Client, message mqtt.Message) { - sub.OnMessage <- message - }); token.Wait() && token.Error() != nil { - sub.OnError <- token.Error() - } + if token := hub.conn.subscribe(topic, byte(qos), func(mqttClient mqtt.Client, message mqtt.Message) { + sub.OnMessage <- message + }); token.Wait() && token.Error() != nil { + sub.OnError <- token.Error() + } - <-ctx.Done() + <-ctx.Done() }(ctx, sub) return sub @@ -119,6 +120,6 @@ func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool topic: topic, qos: qos, payload: message, - retain: retain, + retain: retain, } }