package main import ( "citadel/heater/pkg/device" "context" "encoding/json" "flag" "os" "os/signal" "strings" "syscall" "time" "git.quimbo.fr/nicolas/mqtt" "github.com/rs/zerolog" ) type Config struct { Host string Port string Username string Password string Clientid string } type App struct { mqtt *mqtt.Client ctx context.Context DeviceManager *device.DeviceManager PubChan chan device.Message } func NewApp(conf Config) *App { client := mqtt.New(mqtt.Config{ Host: conf.Host, Port: conf.Port, Username: conf.Username, Password: conf.Password, ClientID: conf.Clientid, CleanSession: true, AutoReconnect: true, Retained: false, KeepAlive: 15 * time.Second, MsgChanDept: 100, }) pubchan := make(chan device.Message, 10) app := &App{ mqtt: client, PubChan: pubchan, } app.DeviceManager = device.NewDeviceManager(pubchan, app.subscribe) return app } func (app *App) subscribe( topic string, qos int, callback func(context.Context, mqtt.Message), ) context.CancelFunc { ctx, cancelFunc := context.WithCancel(app.ctx) log := zerolog.Ctx(ctx) go func(ctx context.Context, sub *mqtt.Subscriber) { for { select { case msg, ok := <-sub.OnMessage: if !ok { return } callback(ctx, msg) case err, ok := <-sub.OnError: if !ok { return } log.Error().Err(err) case <-ctx.Done(): return } } }(ctx, app.mqtt.Subscribe(ctx, topic, qos)) log.Info().Str("topic", topic).Msg("subscribe") return cancelFunc } func (app *App) runTicker() { ticker := time.NewTicker(30 * time.Second) logger := zerolog.Ctx(app.ctx).With().Str("action", "ticker").Logger() logger.Info().Msg("Start ticker") ctx := logger.WithContext(app.ctx) go func() { for { select { case <-app.ctx.Done(): return case t := <-ticker.C: logger.Info().Time("at", t).Msg("Tick") app.DeviceManager.CheckAll(ctx) } } }() } func (app *App) onSettingsMessage(ctx context.Context, msg mqtt.Message) { // callback for topic /{prefix}/{tvr_id}/settings change's device_name := strings.Split(msg.Topic(), "/")[1] logger := zerolog.Ctx(ctx).With(). Str("action", "onSettingsMessage"). Str("device", device_name). Logger() ctx = logger.WithContext(ctx) logger.Debug(). Str("topic", msg.Topic()). Str("payload", string(msg.Payload())). Msg("") var device_settings device.DeviceSettings if err := json.Unmarshal(msg.Payload(), &device_settings); err != nil { logger.Error().Err(err).Msg("Parsing payload") } tvr := device.Device{ Name: device_name, Settings: device_settings, } if _, ok := app.DeviceManager.Get(device_name); !ok { if err := app.DeviceManager.Add(tvr); err != nil { logger.Error().Err(err).Msg("unexpected, abord") return } } else { if err := app.DeviceManager. SetSettings(device_name, device_settings); err != nil { logger.Error().Err(err).Msg("unexpected, abord") return } } if err := app.DeviceManager.Check(ctx, device_name); err != nil { logger.Error().Err(err).Msg("During device `Check`") } } func (app *App) onSetStateMessage(ctx context.Context, msg mqtt.Message) { // callback for topic /{prefix}/{tvr_id}/state/set change's device_name := strings.Split(msg.Topic(), "/")[1] logger := zerolog.Ctx(ctx).With(). Str("action", "onSetStateMessage"). Str("device", device_name). Logger() // ctx = logger.WithContext(ctx) var state device.DeviceState if err := json.Unmarshal(msg.Payload(), &state); err != nil { logger.Error().Err(err).Msg("Error while parsing payload") return } logger.Debug().Interface("state", state).Msg("new state") if device, ok := app.DeviceManager.Get(device_name); ok { if err := device.SetState(&logger, state, app.PubChan); err != nil { logger.Error().Err(err).Msg("unexpected") return } } else { logger.Error().Msg("Device not found, abord") } } func (app *App) Run() { ctx, ctxCancel := context.WithCancel(context.Background()) output := zerolog.ConsoleWriter{Out: os.Stderr} log := zerolog.New(output). With(). Logger() ctx = log.WithContext(ctx) defer ctxCancel() if err := app.mqtt.Connect(ctx); err != nil { log.Fatal().Err(err) } app.ctx = ctx app.subscribe("heater/+/settings", 2, app.onSettingsMessage) app.subscribe("heater/+/state/set", 2, app.onSetStateMessage) go func(ctx context.Context, pub *mqtt.Publisher) { log := zerolog.Ctx(ctx).With(). Str("action", "publisher"). Logger() defer log.Error().Msg("publisher stoped") for { select { case msg, ok := <-app.PubChan: if !ok { log.Error().Msg("publish PubChan Not OK") return } log.Debug(). Str("topic", msg.Topic). Str("payload", string(msg.Payload)). Msg("publish") pub.Publish(msg.Topic, 1, msg.Payload, msg.Retain) case err, ok := <-pub.OnError: if !ok { log.Error().Msg("publish OnError Not OK") return } log.Error().Err(err).Msg("publish OnError") case <-ctx.Done(): return } } }(ctx, app.mqtt.Publisher(ctx)) app.runTicker() for { select { case <-ctx.Done(): return } } } func main() { host := flag.String("host", "localhost", "mqtt host") port := flag.String("port", "1883", "mqtt port") username := flag.String("username", "", "mqtt username") password := flag.String("password", "", "mqtt password") clientid := flag.String("clientid", "goheater", "mqtt client id") flag.Parse() config := Config{ Host: *host, Port: *port, Username: *username, Password: *password, Clientid: *clientid, } app := NewApp(config) go app.Run() quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit }