commit 682182803360d3f3032c0a990ad54e6d1cfc3499 Author: Nicolas Duhamel Date: Sun Oct 23 09:22:20 2022 +0200 First working version diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..132a163 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module citadel/heater + +go 1.19 + +require ( + github.com/eclipse/paho.mqtt.golang v1.4.1 // indirect + github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/ohler55/ojg v1.14.5 // indirect + github.com/rs/zerolog v1.28.0 // indirect + github.com/sirupsen/logrus v1.9.0 // indirect + golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b // indirect + golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde // indirect + golang.org/x/sys v0.1.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..91c759d --- /dev/null +++ b/go.sum @@ -0,0 +1,54 @@ +github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI= +github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2 h1:I5vl2HHSkUSZrtZtof5nRg2a1n5Arih1GqoGwGCwb7A= +github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2/go.mod h1:2geROsfVUlQLJCmrUgpgDzlf+OSrHp4X18Bj4kTTRk0= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/ohler55/ojg v1.14.5 h1:xCX2oyh/ZaoesbLH6fwVHStSJpk4o4eJs8ttXutzdg0= +github.com/ohler55/ojg v1.14.5/go.mod h1:7Ghirupn8NC8hSSDpI0gcjorPxj+vSVIONDWfliHR1k= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY= +github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b h1:ZmngSVLe/wycRns9MKikG9OWIEjGcGAkacif7oYQaUY= +golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..0dc01a7 --- /dev/null +++ b/main.go @@ -0,0 +1,283 @@ +package main + +import ( + "citadel/heater/pkg/device" + "citadel/heater/mqtt" + mqttpaho "github.com/eclipse/paho.mqtt.golang" + "context" + "encoding/json" + "os" + "os/signal" + "syscall" + "time" + "strings" + "github.com/rs/zerolog" + "flag" +) + +type Config struct { + Host string + Port string + Username string + Password string + Clientid string +} + +type App struct { + mqtt_hub *mqtt.Hub + ctx context.Context + + DeviceManager *device.DeviceManager + PubChan chan device.Message +} + +func NewApp(conf Config) *App { + hub := mqtt.NewHub(&mqtt.Config{ + Host: conf.Host, + Port: conf.Port, + Username: conf.Username, + Password: conf.Password, + ClientID: conf.Clientid, + CleanSession: true, + AutoReconnect: true, + Retained: false, + KeepAlive: 15 * time.Second, + MsgChanDept: 100, + }) + pubchan := make(chan device.Message, 10) + + app := &App{ + mqtt_hub: hub, + PubChan: pubchan, + } + app.DeviceManager = device.NewDeviceManager(pubchan, app.subscribe) + return app +} + +func (app *App) subscribe( + topic string, + qos int, + callback func(context.Context, mqttpaho.Message), + ) context.CancelFunc { + + ctx, cancelFunc := context.WithCancel(app.ctx) + + log := zerolog.Ctx(ctx) + + go func(ctx context.Context, sub *mqtt.Subscriber) { + for { + select { + case msg, ok := <-sub.OnMessage: + if !ok { + return + } + callback(ctx, msg) + case err, ok := <-sub.OnError: + if !ok { + return + } + log.Error().Err(err) + case <-ctx.Done(): + return + } + } + }(ctx, app.mqtt_hub.Subscribe(ctx, topic, qos)) + log.Info().Str("topic", topic).Msg("subscribe") + return cancelFunc +} + +func (app *App) runTicker() { + ticker := time.NewTicker(30 * time.Second) + logger := zerolog.Ctx(app.ctx).With().Str("action", "ticker").Logger() + logger.Info().Msg("Start ticker") + + ctx := logger.WithContext(app.ctx) + go func() { + for { + select { + case <- app.ctx.Done(): + return + case t := <-ticker.C: + logger.Info().Time("at", t).Msg("Tick") + app.DeviceManager.CheckAll(ctx) + } + } + }() + +} + +func (app *App) onSettingsMessage(ctx context.Context, msg mqttpaho.Message) { + // callback for topic /{prefix}/{tvr_id}/settings change's + device_name := strings.Split(msg.Topic(), "/")[1] + + logger := zerolog.Ctx(ctx).With(). + Str("action", "onSettingsMessage"). + Str("device", device_name). + Logger() + + logger.Debug().Str("topic", msg.Topic()).Str("payload", string(msg.Payload())).Msg("") + ctx = logger.WithContext(ctx) + + var device_settings device.DeviceSettings + if err := json.Unmarshal(msg.Payload(), &device_settings); err != nil { + logger.Error().Err(err).Msg("Parsing payload") + } + + tvr := device.Device{ + Name: device_name, + Settings: device_settings, + } + + if _, ok := app.DeviceManager.Get(device_name); !ok { + err := app.DeviceManager.Add(tvr) + if err != nil { + logger.Error().Err(err).Msg("Unexpected") + return + } + } else { + err := app.DeviceManager.SetSettings(device_name, device_settings) + if err != nil { + logger.Error().Err(err).Msg("Unexpected") + return + } + } + + err := app.DeviceManager.Check(ctx, device_name) + if err != nil { + logger.Error().Err(err).Msg("During device `Check`") + } +} + +func (app *App) onStateMessage(ctx context.Context, msg mqttpaho.Message) { + // callback for topic /{prefix}/{tvr_id}/state change's + device_name := strings.Split(msg.Topic(), "/")[1] + + logger := zerolog.Ctx(ctx).With(). + Str("action", "onStateMessage"). + Str("device", device_name). + Logger() + ctx = logger.WithContext(ctx) + + + var state device.DeviceState + if err := json.Unmarshal(msg.Payload(), &state); err != nil { + logger.Error().Err(err).Msg("Error while parsing payload") + } + + if _, ok := app.DeviceManager.Get(device_name); !ok { + tvr := device.Device{ + Name: device_name, + State: state, + } + app.DeviceManager.Add(tvr) + } + device, ok := app.DeviceManager.Get(device_name) + if !ok { + logger.Error().Msg("Device not found") + } + + logger.Warn().Interface("state", state).Interface("device_state", device.State).Msg("") + + if !device.SameState(state) { + device.State = state + err := app.DeviceManager.Check(ctx, device_name) + if err != nil { + logger.Error().Err(err).Msg("Error while checking device") + } + } else { + logger.Info().Msg("no state update, ignoring") + } + +} + + +func (app *App) Run() { + ctx, ctxCancel := context.WithCancel(context.Background()) + output := zerolog.ConsoleWriter{Out: os.Stderr} + log := zerolog.New(output). + With(). + Logger() + + ctx = log.WithContext(ctx) + + + defer ctxCancel() + + if err := app.mqtt_hub.Connect(ctx); err != nil { + log.Fatal().Err(err) + } + + app.ctx = ctx + + app.subscribe("heater/+/settings", 2, app.onSettingsMessage) + app.subscribe("heater/+/state", 2, app.onStateMessage) + + go func(ctx context.Context, pub *mqtt.Publisher) { + log := zerolog.Ctx(ctx).With(). + Str("action", "publisher"). + Logger() + defer log.Error().Msg("publisher stoped") + for { + select { + + case msg, ok := <-app.PubChan: + if !ok { + log.Error().Msg("publish PubChan Not OK") + return + } + log.Debug(). + Str("topic", msg.Topic). + Str("payload", string(msg.Payload)). + Msg("publish") + + pub.Publish(msg.Topic, 1 ,msg.Payload, msg.Retain) + + case err, ok := <-pub.OnError: + if !ok { + log.Error().Msg("publish OnError Not OK") + return + } + log.Error().Err(err).Msg("publish OnError") + + case <-ctx.Done(): + return + } + } + }(ctx, app.mqtt_hub.Publisher(ctx)) + + app.runTicker() + + for { + select { + case <-ctx.Done(): + return + } + } + +} + +func main() { + + host := flag.String("host", "localhost", "mqtt host") + port := flag.String("port", "1883", "mqtt port") + username := flag.String("username", "", "mqtt username") + password := flag.String("password", "", "mqtt password") + clientid := flag.String("clientid", "goheater", "mqtt client id") + + flag.Parse() + + config := Config{ + Host: *host, + Port: *port, + Username: *username, + Password: *password, + Clientid: *clientid, + } + + app := NewApp(config) + go app.Run() + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit +} + diff --git a/mqtt/config.go b/mqtt/config.go new file mode 100644 index 0000000..f01fe4f --- /dev/null +++ b/mqtt/config.go @@ -0,0 +1,61 @@ +package mqtt + +import ( + "github.com/google/uuid" + "os" + "strings" + "time" +) + +type Config struct { + Host string + Port string + Username string + Password string + ClientID string + CleanSession bool + AutoReconnect bool + Retained bool + KeepAlive time.Duration + MsgChanDept uint +} + +func NewConfig() *Config { + host := os.Getenv("MQTT_HOST") + if strings.TrimSpace(host) == "" { + host = "localhost" + } + + port := os.Getenv("MQTT_PORT") + if strings.TrimSpace(port) == "" { + port = "1883" + } + + username := os.Getenv("MQTT_USERNAME") + if strings.TrimSpace(username) == "" { + username = "guest" + } + + password := os.Getenv("MQTT_PASSWORD") + if strings.TrimSpace(password) == "" { + password = "guest" + } + + clientId := os.Getenv("MQTT_CLIENT_ID") + if strings.TrimSpace(clientId) == "" { + clientId = uuid.New().String() + } + + return &Config{ + Host: host, + Port: port, + Username: username, + Password: password, + ClientID: clientId, + CleanSession: true, + AutoReconnect: true, + Retained: false, + KeepAlive: 15 * time.Second, + MsgChanDept: 100, + } +} diff --git a/mqtt/connection.go b/mqtt/connection.go new file mode 100644 index 0000000..be98a6c --- /dev/null +++ b/mqtt/connection.go @@ -0,0 +1,52 @@ +package mqtt + +import ( + mqttLib "github.com/eclipse/paho.mqtt.golang" +) + +type connection struct { + conf *Config + client mqttLib.Client +} + +func newConnection(conf *Config) *connection { + conn := &connection{ + conf: conf, + } + + opts := mqttLib.NewClientOptions() + + broker := conn.conf.Host + ":" + conn.conf.Port + + opts.AddBroker(broker) + opts.SetClientID(conn.conf.ClientID) + opts.SetUsername(conn.conf.Username) + opts.SetPassword(conn.conf.Password) + opts.SetCleanSession(conn.conf.CleanSession) + opts.SetAutoReconnect(conn.conf.AutoReconnect) + opts.SetKeepAlive(conn.conf.KeepAlive) + opts.SetMessageChannelDepth(conn.conf.MsgChanDept) + if conn.conf.AutoReconnect { + opts.SetResumeSubs(true) + } + + conn.client = mqttLib.NewClient(opts) + + return conn +} + +func (conn *connection) connect() error { + if token := conn.client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + return nil +} + +func (conn *connection) publish(topic string, qos byte, payload []byte, retain bool) mqttLib.Token { + return conn.client.Publish(topic, qos, retain, payload) +} + +func (conn *connection) subscribe(topic string, qos byte, callback func(mqttClient mqttLib.Client, message mqttLib.Message)) mqttLib.Token { + return conn.client.Subscribe(topic, qos, callback) +} diff --git a/mqtt/hub.go b/mqtt/hub.go new file mode 100644 index 0000000..a66a438 --- /dev/null +++ b/mqtt/hub.go @@ -0,0 +1,124 @@ +package mqtt + +import ( + "context" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/sirupsen/logrus" +) + +const ( + DefaultPubQoS = 0 + DefaultSubQoS = 0 +) + +type Hub struct { + conn *connection +} + +type Subscriber struct { + OnMessage chan mqtt.Message + OnError chan error +} + +type Publisher struct { + OnError chan error + publish chan *frame +} + +type frame struct { + topic string + qos int + payload []byte + retain bool +} + +func NewHub(conf *Config) *Hub { + return &Hub{ + conn: newConnection(conf), + } +} + +// Connect to MQTT server +func (hub *Hub) Connect(ctx context.Context) error { + if err := hub.conn.connect(); err != nil { + return err + } + + go func(ctx context.Context) { + defer logrus.Warn("hub closed MQTT connection") + + for { + select { + case <-ctx.Done(): + hub.conn.client.Disconnect(1000) + return + } + } + }(ctx) + + return nil +} + +// Subscribe will create MQTT subscriber and listen for messages. +// Messages and errors are sent to OnMessage and OnError channels. +func (hub *Hub) Subscribe(ctx context.Context, topic string, qos int) *Subscriber { + sub := &Subscriber{ + OnMessage: make(chan mqtt.Message), + OnError: make(chan error), + } + + go func(ctx context.Context, sub *Subscriber) { + defer func() { + close(sub.OnMessage) + close(sub.OnError) + }() + + if token := hub.conn.subscribe(topic, byte(qos), func(mqttClient mqtt.Client, message mqtt.Message) { + sub.OnMessage <- message + }); token.Wait() && token.Error() != nil { + sub.OnError <- token.Error() + } + + <-ctx.Done() + }(ctx, sub) + + return sub +} + +// Publisher will create MQTT publisher and private listener for messages to be published. +// All messages to be published are sent through private publish channel. +// Errors will be sent to OnError channel. +func (hub *Hub) Publisher(ctx context.Context) *Publisher { + pub := &Publisher{ + OnError: make(chan error), + publish: make(chan *frame), + } + + go func(ctx context.Context, pub *Publisher) { + defer close(pub.OnError) + + for { + select { + case fr := <-pub.publish: + if token := hub.conn.publish(fr.topic, byte(fr.qos), fr.payload, fr.retain); token.Wait() && token.Error() != nil { + pub.OnError <- token.Error() + } + case <-ctx.Done(): + return + } + } + }(ctx, pub) + + return pub +} + +// Publish message to topic through private pub.publish channel. +// Thread-safe. +func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool) { + pub.publish <- &frame{ + topic: topic, + qos: qos, + payload: message, + retain: retain, + } +} diff --git a/pkg/device/device.go b/pkg/device/device.go new file mode 100644 index 0000000..afdd603 --- /dev/null +++ b/pkg/device/device.go @@ -0,0 +1,270 @@ +package device + +import ( + "github.com/eclipse/paho.mqtt.golang" + "github.com/ohler55/ojg/jp" + "github.com/ohler55/ojg/oj" + "fmt" + "time" + "encoding/json" + "context" + "github.com/rs/zerolog" + // "reflect" +) + +type DeviceState struct { + Mode string `json:"mode"` + Setpoint int `json:"setpoint"` + Time time.Time `json:"time"` + Program_name string `json:"program_name"` + Until_time time.Time `json:"until_time"` +} + +// Internal state +type Device struct { + Name string + Settings DeviceSettings + CurrentSetpoint int + State DeviceState +} + +func (d *Device) SameState(state DeviceState) bool { + if state.Mode != d.State.Mode { + return false + } + if state.Setpoint != d.State.Setpoint { + return false + } + if state.Program_name != d.State.Program_name { + return false + } + if !state.Time.Equal(d.State.Time) { + return false + } + if !state.Until_time.Equal(d.State.Until_time) { + return false + } + return true +} + +func (d *Device) StateTopic() string { + return fmt.Sprintf("heater/%s/state", d.Name) +} + +func (d *Device) CheckSetpoint(logger *zerolog.Logger, pubchan chan Message) error { + // Handle all the update setpoint logic + // push in pubChan a Message if setpoint need to be update + log := logger.With(). + Str("device", d.Name). + Int("current_setpoint", d.CurrentSetpoint). + Str("State.Mode", d.State.Mode). + Int("State.Setpoint", d.State.Setpoint). + Str("State.Program_name", d.State.Program_name). + Logger() + log.Info().Msg("Check if setpoint need an update") + + switch d.State.Mode { + case "always": + log.Info().Msg("Use always") + + case "until_time": + log.Info().Msg("Use until_time") + if d.State.Until_time.Before(time.Now()) { + log.Info().Time("until_time", d.State.Until_time). + Msg("until_time passed, reset") + return d.setProgAndReset(&log, pubchan) + } + + case "until_next": + log.Info().Msg("Use until_next") + + var prog string = "default" + if d.State.Program_name != "" { + prog = d.State.Program_name + } + program, ok := d.Settings.Programs[prog] + if !ok { + return fmt.Errorf("program not found") + } + next, err := program.NextTime(d.State.Time) + if err!= nil { + return err + } + if time.Now().After(next) { + // force current program + // reset state + log.Info().Time("now", time.Now()).Time("next", next).Msg("until_next expired") + return d.setProgAndReset(&log, pubchan) + } + case "program": + log.Info().Msg("Use program mode") + if d.State.Setpoint == 0 { + //in case of new program mode + var prog string = "default" + if d.State.Program_name != "" { + prog = d.State.Program_name + } + value, err := d.setpointValueProg(prog) + if err != nil { + return err + } + d.State.Setpoint = value + d.PublishState(d.State, pubchan) + } + + default: + log.Info().Msg("Use default mode") + return d.setProgAndReset(&log, pubchan) + } + + if d.State.Setpoint != d.CurrentSetpoint { + log.Warn().Msg("Need setpoint update") + return d.publishSetpoint(d.State.Setpoint, pubchan) + } else { + log.Warn().Msg("No setpoint update") + } + + return nil +} + +func (d *Device) setProgAndReset(logger *zerolog.Logger, pubchan chan Message) error { + prog_name := "default" + if d.State.Program_name != "" { + prog_name = d.State.Program_name + } + log := logger.With().Str("program", prog_name).Logger() + + value, err := d.setpointValueProg(prog_name) + if err != nil { + return err + } + log = log.With().Int("futur_setpoint", value).Logger() + + if d.CurrentSetpoint != value { + log.Warn().Msg("publish setpoint update") + err = d.publishSetpoint(value, pubchan) + if err != nil { + return err + } + } else { + log.Warn().Msg("no setpoint update") + } + + state := DeviceState{ + Setpoint: value, + Mode: "program", + Program_name: prog_name, + Time: time.Now(), + } + + d.State = state + + payload, err := json.Marshal(state) + if err != nil { + return err + } + + pubchan <- Message { + Topic: d.StateTopic(), + Payload: payload, + Retain: true, + } + return nil +} + +func (d *Device) publishSetpoint(value int, pubchan chan Message) error { + topic, err := d.Settings.TVR.FormatTopic(d.Name) + if err != nil { + return err + } + + payload, err := d.Settings.TVR.FormatPayload(value) + if err != nil { + return err + } + + pubchan <- Message{ + Topic: topic, + Payload: []byte(payload), + Retain: false, + } + return nil +} + +func (d *Device) PublishState(state DeviceState, pubchan chan Message) error { + payload, err := json.Marshal(state) + if err != nil { + return err + } + pubchan <- Message{ + Topic: d.StateTopic(), + Payload: payload, + Retain: true, + } + return nil +} + +func (d Device) ListenTopic() (string, error) { + return d.Settings.TVR.FormatTopicState(d.Name) +} + +func (d *Device) onMessage(ctx context.Context, msg mqtt.Message) { + log := zerolog.Ctx(ctx).With(). + Str("action", "device state receive"). + Str("device", d.Name). + Logger() + + log.Debug(). + Str("topic", msg.Topic()). + Str("payload", string(msg.Payload())). + Msg("Message get") + + obj, err := oj.ParseString(string(msg.Payload())) + if err != nil { + log.Error().Err(err).Msg("during payload parse") + return + } + + x, err := jp.ParseString(d.Settings.TVR.Setpoint_state_jp) + if err != nil { + log.Error().Err(err).Msg("while parsing payload") + return + } + + r := x.First(obj) + + if v, ok := r.(int64); ok{ + d.CurrentSetpoint = int(v) + } else { + log.Error().Err(err).Interface("parsing payload", r).Msg("while parsing payload") + } + +} + +func (d *Device) programSetpoint(prog_name string) (Setpoint, error) { + program, ok := d.Settings.Programs[prog_name] + if !ok { + return Setpoint{}, fmt.Errorf("device %s don't have %s program", d.Name, prog_name) + } + setpoint := program.Current() + return setpoint, nil +} + +func (d *Device) setpointValue(setpoint Setpoint) (int, error) { + if len(d.Settings.Presets) < setpoint.Preset_id + 1 { + return 0, fmt.Errorf("Preset id %d didn't found", setpoint.Preset_id) + } + return d.Settings.Presets[setpoint.Preset_id].Value, nil +} + +func (d *Device) setpointValueProg(prog_name string) (int, error) { + setpoint, err := d.programSetpoint(prog_name) + if err != nil{ + return 0, err + } + val, err := d.setpointValue(setpoint) + if err != nil{ + return 0, err + } + return val, nil +} diff --git a/pkg/device/device_manager.go b/pkg/device/device_manager.go new file mode 100644 index 0000000..0cea6da --- /dev/null +++ b/pkg/device/device_manager.go @@ -0,0 +1,99 @@ +package device + +import ( + "fmt" + "context" + "github.com/eclipse/paho.mqtt.golang" + "github.com/rs/zerolog" +) + +type Subscriber func(string, int, func(context.Context, mqtt.Message)) context.CancelFunc + +type DeviceManager struct { + devices map[string]*Device + subscriber Subscriber + sub_cancel map[string]context.CancelFunc + PubChan chan Message +} + +func NewDeviceManager(pubchan chan Message, subscriber Subscriber) *DeviceManager { + return &DeviceManager{ + devices: make(map[string]*Device), + sub_cancel: make(map[string]context.CancelFunc), + subscriber: subscriber, + PubChan: pubchan, + } +} + +func (m *DeviceManager) Get(name string) (*Device, bool) { + device, ok := m.devices[name] + return device, ok +} + +func (m *DeviceManager) Add(device Device) error { + if _, prs := m.devices[device.Name]; prs { + return fmt.Errorf("device %s already exist", device.Name) + } + m.devices[device.Name] = &device + + // subscribe to state topic + topic, err := device.ListenTopic() + if err != nil { + return err + } + if topic != "" { + cancel := m.subscriber(topic, 2, device.onMessage) + m.sub_cancel[device.Name] = cancel + } + return nil +} + +func (m *DeviceManager) Delete(name string) error { + return nil +} + +func (m *DeviceManager) SetSettings(name string, settings DeviceSettings) error { + device, ok := m.devices[name] + if !ok { + return fmt.Errorf("Not existings device %s", name) + } + device.Settings = settings + return nil +} + +func (m *DeviceManager) SetState(name string, state DeviceState) error { + device, ok := m.devices[name] + if !ok { + return fmt.Errorf("Not existings device %s", name) + } + device.State = state + return nil +} + +func (m *DeviceManager) Check(ctx context.Context, name string) error { + logger := zerolog.Ctx(ctx) + + device, ok := m.devices[name] + if !ok { + return fmt.Errorf("Device %s don't exist", name) + } + err := device.CheckSetpoint(logger, m.PubChan) + if err != nil { + logger.Error().Err(err).Msg("") + return err + } + return nil +} + +func (m *DeviceManager) CheckAll(ctx context.Context) error { + logger := zerolog.Ctx(ctx) + + for _, device := range m.devices { + err := device.CheckSetpoint(logger, m.PubChan) + if err != nil { + logger.Error().Err(err).Msg("") + return err + } + } + return nil +} diff --git a/pkg/device/settings.go b/pkg/device/settings.go new file mode 100644 index 0000000..e736e99 --- /dev/null +++ b/pkg/device/settings.go @@ -0,0 +1,206 @@ +package device + +import ( + "time" + "bytes" + "text/template" + "fmt" +) + +type DayOfWeek int + +func (d DayOfWeek) Next() DayOfWeek { + if d == Sunday { + return Monday + } + return d+1 +} + +func (d DayOfWeek) Previous() DayOfWeek { + if d == Monday { + return Sunday + } + return d-1 +} + +func (d DayOfWeek) DurationBetween(n DayOfWeek) time.Duration { + // return duration between two day of week + var duration time.Duration + if (d-n == 0) { + duration, _ = time.ParseDuration("168h") + } else { + duration, _ = time.ParseDuration(fmt.Sprintf("%dh", (d-n)*24 )) + } + return duration +} + + +const ( + Monday DayOfWeek = 0 + Thuesday = 1 + Wednesday = 2 + Thursday = 3 + Friday = 4 + Saturday = 5 + Sunday = 6 +) + +func WeekDayEnToFr(weekday time.Weekday) DayOfWeek { + // translate weekday to french week, start by Monday + return map[time.Weekday]DayOfWeek { + time.Monday : Monday, + time.Tuesday : Thuesday, + time.Wednesday: Wednesday, + time.Thursday : Thursday, + time.Friday: Friday, + time.Saturday: Saturday, + time.Sunday: Sunday, + }[weekday] +} + +func daytime(t time.Time) int { + return t.Hour()*60 + t.Minute() +} + +func weekday(t time.Time) DayOfWeek { + return WeekDayEnToFr(t.Weekday()) +} + +type Message struct { + Payload []byte + Topic string + Retain bool +} + +type Setpoint struct { + Start int `json:"start"` + Preset_id int `json:"preset_id"` +} + +type WeekProgram map[DayOfWeek][]Setpoint + +func (p WeekProgram) Current() Setpoint { + // return current Setpoint + now := time.Now() + weekday := weekday(now) + daytime := daytime(now) + setpoint := Setpoint{} + for _, sp := range p[weekday] { + if daytime < sp.Start { + break + } + setpoint = sp + } + return setpoint +} + +func (p WeekProgram) NextTime(t time.Time) (time.Time, error) { + // return next program change + setpoint := Setpoint{} + var next time.Time + + weekday := weekday(t) + daytime := daytime(t) + + // Recursive func to find setpoint on weekday + get := func (weekday DayOfWeek, daytime int) Setpoint { + setpoint := Setpoint{} + for _, sp := range p[weekday] { + setpoint = sp + if daytime < sp.Start { + return setpoint + } + } + return Setpoint{} + } + + startweekday := weekday + for (setpoint == Setpoint{}) { + setpoint = get(weekday, daytime) + if (setpoint != Setpoint{}) { + // setpoint found, compute time + next := t.Add( startweekday.DurationBetween(weekday) ) + next = t.Add( time.Duration(setpoint.Start - daytime ) * time.Minute ) + return next, nil + } + weekday = weekday.Next() + daytime = 0 + if weekday == startweekday { + return next, fmt.Errorf("Shouldn't happen no setpoint found over the week") + } + } + + return next, nil +} + +type Programs map[string]WeekProgram + +type Preset struct { + Label string `json:"label"` + Value int `json:"value"` + Color string `json:"color"` +} + +type TVRSettings struct { + Setpoint_topic string `json:"setpoint_topic"` + Setpoint_payload string `json:"setpoint_payload"` + Setpoint_state_topic string `json:"setpoint_state_topic"` + Setpoint_state_jp string `json:"setpoint_state_jp"` +} + +func (s TVRSettings) FormatTopicState(device_name string) (string, error) { + type Variable struct { + Device string + } + variables := Variable{device_name} + t, err := template.New("topic").Parse(s.Setpoint_state_topic) + if err != nil { + return "", err + } + buf := new(bytes.Buffer) + err = t.Execute(buf, variables) + if err != nil { + return "", err + } + return buf.String(), nil +} + +func (s TVRSettings) FormatTopic(device_name string) (string, error) { + type Variable struct { + Device string + } + variables := Variable{device_name} + t, err := template.New("topic").Parse(s.Setpoint_topic) + if err != nil { + return "", err + } + buf := new(bytes.Buffer) + err = t.Execute(buf, variables) + if err != nil { + return "", err + } + return buf.String(), nil +} + +func (s TVRSettings) FormatPayload(setpoint int) (string, error) { + type Variable struct { + Setpoint int + } + variables := Variable{setpoint} + t, err := template.New("payload").Parse(s.Setpoint_payload) + if err != nil { + return "", err + } + buf := new(bytes.Buffer) + err = t.Execute(buf, variables) + if err != nil { + return "", err + } + return buf.String(), nil +} + +type DeviceSettings struct { + Programs Programs `json:"programs"` + Presets []Preset `json:"presets"` + TVR TVRSettings `json:"TVR"` +}