158 lines
3.0 KiB
Go
158 lines
3.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"git.quimbo.fr/nicolas/mqtt"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
var (
|
|
gazparCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "gazpar",
|
|
Help: "increment every 10L",
|
|
})
|
|
)
|
|
|
|
type Config struct {
|
|
Host string
|
|
Port string
|
|
Username string
|
|
Password string
|
|
Clientid string
|
|
}
|
|
|
|
type App struct {
|
|
mqtt *mqtt.Client
|
|
ctx context.Context
|
|
}
|
|
|
|
func NewApp(conf Config) *App {
|
|
client := mqtt.New(mqtt.Config{
|
|
Host: conf.Host,
|
|
Port: conf.Port,
|
|
Username: conf.Username,
|
|
Password: conf.Password,
|
|
ClientID: conf.Clientid,
|
|
CleanSession: true,
|
|
AutoReconnect: true,
|
|
Retained: false,
|
|
KeepAlive: 15 * time.Second,
|
|
MsgChanDept: 100,
|
|
})
|
|
|
|
app := &App{
|
|
mqtt: client,
|
|
}
|
|
return app
|
|
}
|
|
|
|
func (app *App) subscribe(
|
|
topic string,
|
|
qos int,
|
|
callback func(context.Context, mqtt.Message),
|
|
) context.CancelFunc {
|
|
|
|
ctx, cancelFunc := context.WithCancel(app.ctx)
|
|
|
|
log := zerolog.Ctx(ctx)
|
|
|
|
go func(ctx context.Context, sub *mqtt.Subscriber) {
|
|
for {
|
|
select {
|
|
case msg, ok := <-sub.OnMessage:
|
|
if !ok {
|
|
return
|
|
}
|
|
callback(ctx, msg)
|
|
case err, ok := <-sub.OnError:
|
|
if !ok {
|
|
return
|
|
}
|
|
log.Error().Err(err)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}(ctx, app.mqtt.Subscribe(ctx, topic, qos))
|
|
log.Info().Str("topic", topic).Msg("subscribe")
|
|
return cancelFunc
|
|
}
|
|
|
|
func (app *App) Run() {
|
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
|
output := zerolog.ConsoleWriter{Out: os.Stderr}
|
|
log := zerolog.New(output).
|
|
With().
|
|
Logger()
|
|
|
|
ctx = log.WithContext(ctx)
|
|
|
|
defer ctxCancel()
|
|
|
|
if err := app.mqtt.Connect(ctx); err != nil {
|
|
log.Fatal().Err(err)
|
|
}
|
|
|
|
app.ctx = ctx
|
|
|
|
app.subscribe("zigbee2mqtt/gazpar", 2, app.onMessage)
|
|
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (app *App) onMessage(ctx context.Context, msg mqtt.Message) {
|
|
|
|
logger := zerolog.Ctx(ctx).With().
|
|
Str("action", "onMessage").
|
|
Logger()
|
|
|
|
var r struct {
|
|
Action string `json:"action"`
|
|
}
|
|
if err := json.Unmarshal(msg.Payload(), &r); err != nil {
|
|
logger.Error().Err(err).Msg("Error while parsing payload")
|
|
return
|
|
}
|
|
if r.Action == "on" {
|
|
logger.Info().Msg("increment")
|
|
gazparCount.Inc()
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
|
|
host := flag.String("host", "localhost", "mqtt host")
|
|
port := flag.String("port", "1883", "mqtt port")
|
|
username := flag.String("username", "", "mqtt username")
|
|
password := flag.String("password", "", "mqtt password")
|
|
clientid := flag.String("clientid", "gazpar", "mqtt client id")
|
|
|
|
promPort := flag.String("promport", "9641", "prometheus metrics http port")
|
|
|
|
flag.Parse()
|
|
|
|
config := Config{
|
|
Host: *host,
|
|
Port: *port,
|
|
Username: *username,
|
|
Password: *password,
|
|
Clientid: *clientid,
|
|
}
|
|
|
|
app := NewApp(config)
|
|
go app.Run()
|
|
|
|
http.Handle("/metrics", promhttp.Handler())
|
|
http.ListenAndServe(fmt.Sprintf(":%s", *promPort), nil)
|
|
}
|