package main import ( "context" "encoding/json" "flag" "fmt" "net/http" "os" "time" "git.quimbo.fr/nicolas/mqtt" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" ) var ( gazparCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "gazpar", Help: "increment every 10L", }) ) type Config struct { Host string Port string Username string Password string Clientid string } type App struct { mqtt *mqtt.Client ctx context.Context } func NewApp(conf Config) *App { client := mqtt.New(mqtt.Config{ Host: conf.Host, Port: conf.Port, Username: conf.Username, Password: conf.Password, ClientID: conf.Clientid, CleanSession: true, AutoReconnect: true, Retained: false, KeepAlive: 15 * time.Second, MsgChanDept: 100, }) app := &App{ mqtt: client, } return app } func (app *App) subscribe( topic string, qos int, callback func(context.Context, mqtt.Message), ) context.CancelFunc { ctx, cancelFunc := context.WithCancel(app.ctx) log := zerolog.Ctx(ctx) go func(ctx context.Context, sub *mqtt.Subscriber) { for { select { case msg, ok := <-sub.OnMessage: if !ok { return } callback(ctx, msg) case err, ok := <-sub.OnError: if !ok { return } log.Error().Err(err) case <-ctx.Done(): return } } }(ctx, app.mqtt.Subscribe(ctx, topic, qos)) log.Info().Str("topic", topic).Msg("subscribe") return cancelFunc } func (app *App) Run() { ctx, ctxCancel := context.WithCancel(context.Background()) output := zerolog.ConsoleWriter{Out: os.Stderr} log := zerolog.New(output). With(). Logger() ctx = log.WithContext(ctx) defer ctxCancel() if err := app.mqtt.Connect(ctx); err != nil { log.Fatal().Err(err) } app.ctx = ctx app.subscribe("zigbee2mqtt/gazpar", 2, app.onMessage) <-ctx.Done() } func (app *App) onMessage(ctx context.Context, msg mqtt.Message) { logger := zerolog.Ctx(ctx).With(). Str("action", "onMessage"). Logger() var r struct { Action string `json:"action"` } if err := json.Unmarshal(msg.Payload(), &r); err != nil { logger.Error().Err(err).Msg("Error while parsing payload") return } if r.Action == "on" { logger.Info().Msg("increment") gazparCount.Inc() } } func main() { host := flag.String("host", "localhost", "mqtt host") port := flag.String("port", "1883", "mqtt port") username := flag.String("username", "", "mqtt username") password := flag.String("password", "", "mqtt password") clientid := flag.String("clientid", "gazpar", "mqtt client id") promPort := flag.String("promport", "9641", "prometheus metrics http port") flag.Parse() config := Config{ Host: *host, Port: *port, Username: *username, Password: *password, Clientid: *clientid, } app := NewApp(config) go app.Run() http.Handle("/metrics", promhttp.Handler()) http.ListenAndServe(fmt.Sprintf(":%s", *promPort), nil) }