heater/main.go

279 lines
5.9 KiB
Go

package main
import (
"citadel/heater/pkg/device"
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"
"git.quimbo.fr/nicolas/mqtt"
_ "net/http/pprof"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
)
type Config struct {
Host string
Port string
Username string
Password string
ClientId string
}
type App struct {
mqtt *mqtt.Client
ctx context.Context
DeviceManager *device.DeviceManager
PubChan chan device.Message
}
func NewApp(conf Config) *App {
client := mqtt.New(mqtt.Config{
Host: conf.Host,
Port: conf.Port,
Username: conf.Username,
Password: conf.Password,
ClientID: conf.ClientId,
CleanSession: true,
AutoReconnect: true,
Retained: false,
KeepAlive: 15 * time.Second,
MsgChanDept: 100,
})
pubchan := make(chan device.Message, 10)
app := &App{
mqtt: client,
PubChan: pubchan,
}
app.DeviceManager = device.NewDeviceManager(pubchan, app.subscribe)
return app
}
func (app *App) subscribe(
topic string,
qos int,
callback func(context.Context, mqtt.Message),
) context.CancelFunc {
ctx, cancelFunc := context.WithCancel(app.ctx)
ctx = context.WithValue(ctx, "pubchan", app.PubChan)
log := zerolog.Ctx(ctx)
go func(ctx context.Context, sub *mqtt.Subscriber) {
for {
select {
case msg, ok := <-sub.OnMessage:
if !ok {
return
}
callback(ctx, msg)
case err, ok := <-sub.OnError:
if !ok {
return
}
log.Error().Err(err)
case <-ctx.Done():
return
}
}
}(ctx, app.mqtt.Subscribe(ctx, topic, qos))
log.Info().Str("topic", topic).Msg("subscribe")
return cancelFunc
}
func (app *App) runTicker() {
ticker := time.NewTicker(60 * time.Second)
logger := zerolog.Ctx(app.ctx).With().Str("action", "ticker").Logger()
logger.Info().Msg("Start ticker")
go func() {
for {
select {
case <-app.ctx.Done():
return
case t := <-ticker.C:
// case <-ticker.C:
ctx, cancel := context.WithCancel(app.ctx)
ctx = logger.WithContext(app.ctx)
logger.Info().Time("at", t).Msg("Tick")
app.DeviceManager.CheckAll(ctx)
cancel()
}
}
}()
}
func (app *App) onSettingsMessage(ctx context.Context, msg mqtt.Message) {
// callback for topic /{prefix}/{tvr_id}/settings change's
device_name := strings.Split(msg.Topic(), "/")[1]
logger := zerolog.Ctx(ctx).With().
Str("action", "onSettingsMessage").
Str("device", device_name).
Logger()
ctx = logger.WithContext(ctx)
logger.Debug().
Str("topic", msg.Topic()).
Str("payload", string(msg.Payload())).
Msg("")
var device_settings device.DeviceSettings
if err := json.Unmarshal(msg.Payload(), &device_settings); err != nil {
logger.Error().Err(err).Msg("Parsing payload")
}
tvr := device.Device{
Name: device_name,
Settings: device_settings,
}
if _, ok := app.DeviceManager.Get(device_name); !ok {
if err := app.DeviceManager.Add(tvr); err != nil {
logger.Error().Err(err).Msg("unexpected, abord")
return
}
} else {
if err := app.DeviceManager.
SetSettings(device_name, device_settings); err != nil {
logger.Error().Err(err).Msg("unexpected, abord")
return
}
}
if err := app.DeviceManager.Check(ctx, device_name); err != nil {
logger.Error().Err(err).Msg("During device `Check`")
}
}
func (app *App) onSetStateMessage(ctx context.Context, msg mqtt.Message) {
// callback for topic /{prefix}/{tvr_id}/state/set change's
device_name := strings.Split(msg.Topic(), "/")[1]
logger := zerolog.Ctx(ctx).With().
Str("action", "onSetStateMessage").
Str("device", device_name).
Logger()
// ctx = logger.WithContext(ctx)
var state device.DeviceState
if err := json.Unmarshal(msg.Payload(), &state); err != nil {
logger.Error().Err(err).Msg("Error while parsing payload")
return
}
logger.Debug().Interface("state", state).Msg("new state")
if device, ok := app.DeviceManager.Get(device_name); ok {
if err := device.SetState(logger, state, app.PubChan); err != nil {
logger.Error().Err(err).Msg("unexpected")
return
}
} else {
logger.Error().Msg("Device not found, abord")
}
}
func (app *App) Run() {
ctx, ctxCancel := context.WithCancel(context.Background())
output := zerolog.ConsoleWriter{Out: os.Stderr}
log := zerolog.New(output).
With().
Logger()
ctx = log.WithContext(ctx)
defer ctxCancel()
if err := app.mqtt.Connect(ctx); err != nil {
log.Fatal().Err(err)
}
app.ctx = ctx
app.subscribe("heater/+/settings", 2, app.onSettingsMessage)
app.subscribe("heater/+/state/set", 2, app.onSetStateMessage)
go func(ctx context.Context, pub *mqtt.Publisher) {
log := zerolog.Ctx(ctx).With().
Str("action", "publisher").
Logger()
defer log.Error().Msg("publisher stoped")
for {
select {
case msg, ok := <-app.PubChan:
if !ok {
log.Error().Msg("publish PubChan Not OK")
return
}
log.Debug().
Str("topic", msg.Topic).
Str("payload", string(msg.Payload)).
Msg("publish")
pub.Publish(msg.Topic, 1, msg.Payload, msg.Retain)
case err, ok := <-pub.OnError:
if !ok {
log.Error().Msg("publish OnError Not OK")
return
}
log.Error().Err(err).Msg("publish OnError")
case <-ctx.Done():
return
}
}
}(ctx, app.mqtt.Publisher(ctx))
app.runTicker()
for {
select {
case <-ctx.Done():
return
}
}
}
func main() {
host := flag.String("host", "localhost", "mqtt host")
port := flag.String("port", "1883", "mqtt port")
username := flag.String("username", "", "mqtt username")
password := flag.String("password", "", "mqtt password")
clientid := flag.String("clientid", "goheater", "mqtt client id")
pPort := flag.String("pport", "9641", "prometheus metrics server port")
flag.Parse()
config := Config{
Host: *host,
Port: *port,
Username: *username,
Password: *password,
ClientId: *clientid,
}
app := NewApp(config)
go app.Run()
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(fmt.Sprintf(":%s", *pPort), nil)
}