heater/main.go

280 lines
7.1 KiB
Go

package main
import (
"citadel/heater/pkg/device"
"citadel/heater/mqtt"
mqttpaho "github.com/eclipse/paho.mqtt.golang"
"context"
"encoding/json"
"os"
"os/signal"
"syscall"
"time"
"strings"
"github.com/rs/zerolog"
"flag"
)
type Config struct {
Host string
Port string
Username string
Password string
Clientid string
}
type App struct {
mqtt_hub *mqtt.Hub
ctx context.Context
DeviceManager *device.DeviceManager
PubChan chan device.Message
}
func NewApp(conf Config) *App {
hub := mqtt.NewHub(&mqtt.Config{
Host: conf.Host,
Port: conf.Port,
Username: conf.Username,
Password: conf.Password,
ClientID: conf.Clientid,
CleanSession: true,
AutoReconnect: true,
Retained: false,
KeepAlive: 15 * time.Second,
MsgChanDept: 100,
})
pubchan := make(chan device.Message, 10)
app := &App{
mqtt_hub: hub,
PubChan: pubchan,
}
app.DeviceManager = device.NewDeviceManager(pubchan, app.subscribe)
return app
}
func (app *App) subscribe(
topic string,
qos int,
callback func(context.Context, mqttpaho.Message),
) context.CancelFunc {
ctx, cancelFunc := context.WithCancel(app.ctx)
log := zerolog.Ctx(ctx)
go func(ctx context.Context, sub *mqtt.Subscriber) {
for {
select {
case msg, ok := <-sub.OnMessage:
if !ok {
return
}
callback(ctx, msg)
case err, ok := <-sub.OnError:
if !ok {
return
}
log.Error().Err(err)
case <-ctx.Done():
return
}
}
}(ctx, app.mqtt_hub.Subscribe(ctx, topic, qos))
log.Info().Str("topic", topic).Msg("subscribe")
return cancelFunc
}
func (app *App) runTicker() {
ticker := time.NewTicker(30 * time.Second)
logger := zerolog.Ctx(app.ctx).With().Str("action", "ticker").Logger()
logger.Info().Msg("Start ticker")
ctx := logger.WithContext(app.ctx)
go func() {
for {
select {
case <- app.ctx.Done():
return
case t := <-ticker.C:
logger.Info().Time("at", t).Msg("Tick")
app.DeviceManager.CheckAll(ctx)
}
}
}()
}
func (app *App) onSettingsMessage(ctx context.Context, msg mqttpaho.Message) {
// callback for topic /{prefix}/{tvr_id}/settings change's
device_name := strings.Split(msg.Topic(), "/")[1]
logger := zerolog.Ctx(ctx).With().
Str("action", "onSettingsMessage").
Str("device", device_name).
Logger()
logger.Debug().Str("topic", msg.Topic()).Str("payload", string(msg.Payload())).Msg("")
ctx = logger.WithContext(ctx)
var device_settings device.DeviceSettings
if err := json.Unmarshal(msg.Payload(), &device_settings); err != nil {
logger.Error().Err(err).Msg("Parsing payload")
}
tvr := device.Device{
Name: device_name,
Settings: device_settings,
}
if _, ok := app.DeviceManager.Get(device_name); !ok {
err := app.DeviceManager.Add(tvr)
if err != nil {
logger.Error().Err(err).Msg("Unexpected")
return
}
} else {
err := app.DeviceManager.SetSettings(device_name, device_settings)
if err != nil {
logger.Error().Err(err).Msg("Unexpected")
return
}
}
err := app.DeviceManager.Check(ctx, device_name)
if err != nil {
logger.Error().Err(err).Msg("During device `Check`")
}
}
func (app *App) onSetStateMessage(ctx context.Context, msg mqttpaho.Message) {
// callback for topic /{prefix}/{tvr_id}/state/set change's
device_name := strings.Split(msg.Topic(), "/")[1]
logger := zerolog.Ctx(ctx).With().
Str("action", "onSetStateMessage").
Str("device", device_name).
Logger()
ctx = logger.WithContext(ctx)
var state device.DeviceState
if err := json.Unmarshal(msg.Payload(), &state); err != nil {
logger.Error().Err(err).Msg("Error while parsing payload")
}
logger.Info().Interface("state", state).Msg("new state")
device, ok := app.DeviceManager.Get(device_name)
if !ok {
logger.Error().Msg("Device not found, abord")
return
}
if !device.SameState(state) {
device.State = state
err := app.DeviceManager.Check(ctx, device_name)
if err != nil {
logger.Error().Err(err).Msg("error while checking device")
}
// device.SetState(log, state, pubchan)
} else {
logger.Info().Msg("no state update, ignoring")
}
}
func (app *App) Run() {
ctx, ctxCancel := context.WithCancel(context.Background())
output := zerolog.ConsoleWriter{Out: os.Stderr}
log := zerolog.New(output).
With().
Logger()
ctx = log.WithContext(ctx)
defer ctxCancel()
if err := app.mqtt_hub.Connect(ctx); err != nil {
log.Fatal().Err(err)
}
app.ctx = ctx
app.subscribe("heater/+/settings", 2, app.onSettingsMessage)
app.subscribe("heater/+/state/set", 2, app.onSetStateMessage)
go func(ctx context.Context, pub *mqtt.Publisher) {
log := zerolog.Ctx(ctx).With().
Str("action", "publisher").
Logger()
defer log.Error().Msg("publisher stoped")
for {
select {
case msg, ok := <-app.PubChan:
if !ok {
log.Error().Msg("publish PubChan Not OK")
return
}
log.Debug().
Str("topic", msg.Topic).
Str("payload", string(msg.Payload)).
Msg("publish")
pub.Publish(msg.Topic, 1 ,msg.Payload, msg.Retain)
case err, ok := <-pub.OnError:
if !ok {
log.Error().Msg("publish OnError Not OK")
return
}
log.Error().Err(err).Msg("publish OnError")
case <-ctx.Done():
return
}
}
}(ctx, app.mqtt_hub.Publisher(ctx))
app.runTicker()
for {
select {
case <-ctx.Done():
return
}
}
}
func main() {
host := flag.String("host", "localhost", "mqtt host")
port := flag.String("port", "1883", "mqtt port")
username := flag.String("username", "", "mqtt username")
password := flag.String("password", "", "mqtt password")
clientid := flag.String("clientid", "goheater", "mqtt client id")
flag.Parse()
config := Config{
Host: *host,
Port: *port,
Username: *username,
Password: *password,
Clientid: *clientid,
}
app := NewApp(config)
go app.Run()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}