279 lines
5.9 KiB
Go
279 lines
5.9 KiB
Go
package main
|
|
|
|
import (
|
|
"citadel/heater/pkg/device"
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.quimbo.fr/nicolas/mqtt"
|
|
|
|
_ "net/http/pprof"
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
type Config struct {
|
|
Host string
|
|
Port string
|
|
Username string
|
|
Password string
|
|
ClientId string
|
|
}
|
|
|
|
type App struct {
|
|
mqtt *mqtt.Client
|
|
ctx context.Context
|
|
|
|
DeviceManager *device.DeviceManager
|
|
PubChan chan device.Message
|
|
}
|
|
|
|
func NewApp(conf Config) *App {
|
|
client := mqtt.New(mqtt.Config{
|
|
Host: conf.Host,
|
|
Port: conf.Port,
|
|
Username: conf.Username,
|
|
Password: conf.Password,
|
|
ClientID: conf.ClientId,
|
|
CleanSession: true,
|
|
AutoReconnect: true,
|
|
Retained: false,
|
|
KeepAlive: 15 * time.Second,
|
|
MsgChanDept: 100,
|
|
})
|
|
pubchan := make(chan device.Message, 10)
|
|
|
|
app := &App{
|
|
mqtt: client,
|
|
PubChan: pubchan,
|
|
}
|
|
app.DeviceManager = device.NewDeviceManager(pubchan, app.subscribe)
|
|
return app
|
|
}
|
|
|
|
func (app *App) subscribe(
|
|
topic string,
|
|
qos int,
|
|
callback func(context.Context, mqtt.Message),
|
|
) context.CancelFunc {
|
|
|
|
ctx, cancelFunc := context.WithCancel(app.ctx)
|
|
ctx = context.WithValue(ctx, "pubchan", app.PubChan)
|
|
|
|
log := zerolog.Ctx(ctx)
|
|
|
|
go func(ctx context.Context, sub *mqtt.Subscriber) {
|
|
for {
|
|
select {
|
|
case msg, ok := <-sub.OnMessage:
|
|
if !ok {
|
|
return
|
|
}
|
|
callback(ctx, msg)
|
|
case err, ok := <-sub.OnError:
|
|
if !ok {
|
|
return
|
|
}
|
|
log.Error().Err(err)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}(ctx, app.mqtt.Subscribe(ctx, topic, qos))
|
|
log.Info().Str("topic", topic).Msg("subscribe")
|
|
return cancelFunc
|
|
}
|
|
|
|
func (app *App) runTicker() {
|
|
ticker := time.NewTicker(60 * time.Second)
|
|
logger := zerolog.Ctx(app.ctx).With().Str("action", "ticker").Logger()
|
|
logger.Info().Msg("Start ticker")
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-app.ctx.Done():
|
|
return
|
|
case t := <-ticker.C:
|
|
// case <-ticker.C:
|
|
ctx, cancel := context.WithCancel(app.ctx)
|
|
ctx = logger.WithContext(app.ctx)
|
|
logger.Info().Time("at", t).Msg("Tick")
|
|
app.DeviceManager.CheckAll(ctx)
|
|
cancel()
|
|
}
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
func (app *App) onSettingsMessage(ctx context.Context, msg mqtt.Message) {
|
|
// callback for topic /{prefix}/{tvr_id}/settings change's
|
|
device_name := strings.Split(msg.Topic(), "/")[1]
|
|
|
|
logger := zerolog.Ctx(ctx).With().
|
|
Str("action", "onSettingsMessage").
|
|
Str("device", device_name).
|
|
Logger()
|
|
|
|
ctx = logger.WithContext(ctx)
|
|
|
|
logger.Debug().
|
|
Str("topic", msg.Topic()).
|
|
Str("payload", string(msg.Payload())).
|
|
Msg("")
|
|
|
|
var device_settings device.DeviceSettings
|
|
if err := json.Unmarshal(msg.Payload(), &device_settings); err != nil {
|
|
logger.Error().Err(err).Msg("Parsing payload")
|
|
}
|
|
|
|
tvr := device.Device{
|
|
Name: device_name,
|
|
Settings: device_settings,
|
|
}
|
|
|
|
if _, ok := app.DeviceManager.Get(device_name); !ok {
|
|
if err := app.DeviceManager.Add(tvr); err != nil {
|
|
logger.Error().Err(err).Msg("unexpected, abord")
|
|
return
|
|
}
|
|
} else {
|
|
if err := app.DeviceManager.
|
|
SetSettings(device_name, device_settings); err != nil {
|
|
logger.Error().Err(err).Msg("unexpected, abord")
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := app.DeviceManager.Check(ctx, device_name); err != nil {
|
|
logger.Error().Err(err).Msg("During device `Check`")
|
|
}
|
|
}
|
|
|
|
func (app *App) onSetStateMessage(ctx context.Context, msg mqtt.Message) {
|
|
// callback for topic /{prefix}/{tvr_id}/state/set change's
|
|
|
|
device_name := strings.Split(msg.Topic(), "/")[1]
|
|
|
|
logger := zerolog.Ctx(ctx).With().
|
|
Str("action", "onSetStateMessage").
|
|
Str("device", device_name).
|
|
Logger()
|
|
// ctx = logger.WithContext(ctx)
|
|
|
|
var state device.DeviceState
|
|
if err := json.Unmarshal(msg.Payload(), &state); err != nil {
|
|
logger.Error().Err(err).Msg("Error while parsing payload")
|
|
return
|
|
}
|
|
|
|
logger.Debug().Interface("state", state).Msg("new state")
|
|
|
|
if device, ok := app.DeviceManager.Get(device_name); ok {
|
|
if err := device.SetState(logger, state, app.PubChan); err != nil {
|
|
logger.Error().Err(err).Msg("unexpected")
|
|
return
|
|
}
|
|
} else {
|
|
logger.Error().Msg("Device not found, abord")
|
|
}
|
|
}
|
|
|
|
func (app *App) Run() {
|
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
|
output := zerolog.ConsoleWriter{Out: os.Stderr}
|
|
log := zerolog.New(output).
|
|
With().
|
|
Logger()
|
|
|
|
ctx = log.WithContext(ctx)
|
|
|
|
defer ctxCancel()
|
|
|
|
if err := app.mqtt.Connect(ctx); err != nil {
|
|
log.Fatal().Err(err)
|
|
}
|
|
|
|
app.ctx = ctx
|
|
|
|
app.subscribe("heater/+/settings", 2, app.onSettingsMessage)
|
|
app.subscribe("heater/+/state/set", 2, app.onSetStateMessage)
|
|
|
|
go func(ctx context.Context, pub *mqtt.Publisher) {
|
|
log := zerolog.Ctx(ctx).With().
|
|
Str("action", "publisher").
|
|
Logger()
|
|
defer log.Error().Msg("publisher stoped")
|
|
for {
|
|
select {
|
|
|
|
case msg, ok := <-app.PubChan:
|
|
if !ok {
|
|
log.Error().Msg("publish PubChan Not OK")
|
|
return
|
|
}
|
|
log.Debug().
|
|
Str("topic", msg.Topic).
|
|
Str("payload", string(msg.Payload)).
|
|
Msg("publish")
|
|
|
|
pub.Publish(msg.Topic, 1, msg.Payload, msg.Retain)
|
|
|
|
case err, ok := <-pub.OnError:
|
|
if !ok {
|
|
log.Error().Msg("publish OnError Not OK")
|
|
return
|
|
}
|
|
log.Error().Err(err).Msg("publish OnError")
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}(ctx, app.mqtt.Publisher(ctx))
|
|
|
|
app.runTicker()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func main() {
|
|
host := flag.String("host", "localhost", "mqtt host")
|
|
port := flag.String("port", "1883", "mqtt port")
|
|
username := flag.String("username", "", "mqtt username")
|
|
password := flag.String("password", "", "mqtt password")
|
|
clientid := flag.String("clientid", "goheater", "mqtt client id")
|
|
pPort := flag.String("pport", "9641", "prometheus metrics server port")
|
|
|
|
flag.Parse()
|
|
|
|
config := Config{
|
|
Host: *host,
|
|
Port: *port,
|
|
Username: *username,
|
|
Password: *password,
|
|
ClientId: *clientid,
|
|
}
|
|
|
|
app := NewApp(config)
|
|
go app.Run()
|
|
|
|
http.Handle("/metrics", promhttp.Handler())
|
|
http.ListenAndServe(fmt.Sprintf(":%s", *pPort), nil)
|
|
|
|
}
|