gazpar/main.go

158 lines
3.0 KiB
Go

package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"time"
"git.quimbo.fr/nicolas/mqtt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
)
var (
gazparCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "gazpar",
Help: "increment every 10L",
})
)
type Config struct {
Host string
Port string
Username string
Password string
Clientid string
}
type App struct {
mqtt *mqtt.Client
ctx context.Context
}
func NewApp(conf Config) *App {
client := mqtt.New(mqtt.Config{
Host: conf.Host,
Port: conf.Port,
Username: conf.Username,
Password: conf.Password,
ClientID: conf.Clientid,
CleanSession: true,
AutoReconnect: true,
Retained: false,
KeepAlive: 15 * time.Second,
MsgChanDept: 100,
})
app := &App{
mqtt: client,
}
return app
}
func (app *App) subscribe(
topic string,
qos int,
callback func(context.Context, mqtt.Message),
) context.CancelFunc {
ctx, cancelFunc := context.WithCancel(app.ctx)
log := zerolog.Ctx(ctx)
go func(ctx context.Context, sub *mqtt.Subscriber) {
for {
select {
case msg, ok := <-sub.OnMessage:
if !ok {
return
}
callback(ctx, msg)
case err, ok := <-sub.OnError:
if !ok {
return
}
log.Error().Err(err)
case <-ctx.Done():
return
}
}
}(ctx, app.mqtt.Subscribe(ctx, topic, qos))
log.Info().Str("topic", topic).Msg("subscribe")
return cancelFunc
}
func (app *App) Run() {
ctx, ctxCancel := context.WithCancel(context.Background())
output := zerolog.ConsoleWriter{Out: os.Stderr}
log := zerolog.New(output).
With().
Logger()
ctx = log.WithContext(ctx)
defer ctxCancel()
if err := app.mqtt.Connect(ctx); err != nil {
log.Fatal().Err(err)
}
app.ctx = ctx
app.subscribe("zigbee2mqtt/gazpar", 2, app.onMessage)
<-ctx.Done()
}
func (app *App) onMessage(ctx context.Context, msg mqtt.Message) {
logger := zerolog.Ctx(ctx).With().
Str("action", "onMessage").
Logger()
var r struct {
Action string `json:"action"`
}
if err := json.Unmarshal(msg.Payload(), &r); err != nil {
logger.Error().Err(err).Msg("Error while parsing payload")
return
}
if r.Action == "on" {
logger.Info().Msg("increment")
gazparCount.Inc()
}
}
func main() {
host := flag.String("host", "localhost", "mqtt host")
port := flag.String("port", "1883", "mqtt port")
username := flag.String("username", "", "mqtt username")
password := flag.String("password", "", "mqtt password")
clientid := flag.String("clientid", "gazpar", "mqtt client id")
promPort := flag.String("promport", "9641", "prometheus metrics http port")
flag.Parse()
config := Config{
Host: *host,
Port: *port,
Username: *username,
Password: *password,
Clientid: *clientid,
}
app := NewApp(config)
go app.Run()
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(fmt.Sprintf(":%s", *promPort), nil)
}