First working version

This commit is contained in:
Nicolas Duhamel 2022-10-23 09:22:20 +02:00
commit 6821828033
9 changed files with 1167 additions and 0 deletions

18
go.mod Normal file
View File

@ -0,0 +1,18 @@
module citadel/heater
go 1.19
require (
github.com/eclipse/paho.mqtt.golang v1.4.1 // indirect
github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/ohler55/ojg v1.14.5 // indirect
github.com/rs/zerolog v1.28.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b // indirect
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde // indirect
golang.org/x/sys v0.1.0 // indirect
)

54
go.sum Normal file
View File

@ -0,0 +1,54 @@
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI=
github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2 h1:I5vl2HHSkUSZrtZtof5nRg2a1n5Arih1GqoGwGCwb7A=
github.com/gobackpack/mqtt v0.0.0-20220830204110-1047a4ce1fe2/go.mod h1:2geROsfVUlQLJCmrUgpgDzlf+OSrHp4X18Bj4kTTRk0=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/ohler55/ojg v1.14.5 h1:xCX2oyh/ZaoesbLH6fwVHStSJpk4o4eJs8ttXutzdg0=
github.com/ohler55/ojg v1.14.5/go.mod h1:7Ghirupn8NC8hSSDpI0gcjorPxj+vSVIONDWfliHR1k=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.28.0 h1:MirSo27VyNi7RJYP3078AA1+Cyzd2GB66qy3aUHvsWY=
github.com/rs/zerolog v1.28.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b h1:ZmngSVLe/wycRns9MKikG9OWIEjGcGAkacif7oYQaUY=
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc=
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

283
main.go Normal file
View File

@ -0,0 +1,283 @@
package main
import (
"citadel/heater/pkg/device"
"citadel/heater/mqtt"
mqttpaho "github.com/eclipse/paho.mqtt.golang"
"context"
"encoding/json"
"os"
"os/signal"
"syscall"
"time"
"strings"
"github.com/rs/zerolog"
"flag"
)
type Config struct {
Host string
Port string
Username string
Password string
Clientid string
}
type App struct {
mqtt_hub *mqtt.Hub
ctx context.Context
DeviceManager *device.DeviceManager
PubChan chan device.Message
}
func NewApp(conf Config) *App {
hub := mqtt.NewHub(&mqtt.Config{
Host: conf.Host,
Port: conf.Port,
Username: conf.Username,
Password: conf.Password,
ClientID: conf.Clientid,
CleanSession: true,
AutoReconnect: true,
Retained: false,
KeepAlive: 15 * time.Second,
MsgChanDept: 100,
})
pubchan := make(chan device.Message, 10)
app := &App{
mqtt_hub: hub,
PubChan: pubchan,
}
app.DeviceManager = device.NewDeviceManager(pubchan, app.subscribe)
return app
}
func (app *App) subscribe(
topic string,
qos int,
callback func(context.Context, mqttpaho.Message),
) context.CancelFunc {
ctx, cancelFunc := context.WithCancel(app.ctx)
log := zerolog.Ctx(ctx)
go func(ctx context.Context, sub *mqtt.Subscriber) {
for {
select {
case msg, ok := <-sub.OnMessage:
if !ok {
return
}
callback(ctx, msg)
case err, ok := <-sub.OnError:
if !ok {
return
}
log.Error().Err(err)
case <-ctx.Done():
return
}
}
}(ctx, app.mqtt_hub.Subscribe(ctx, topic, qos))
log.Info().Str("topic", topic).Msg("subscribe")
return cancelFunc
}
func (app *App) runTicker() {
ticker := time.NewTicker(30 * time.Second)
logger := zerolog.Ctx(app.ctx).With().Str("action", "ticker").Logger()
logger.Info().Msg("Start ticker")
ctx := logger.WithContext(app.ctx)
go func() {
for {
select {
case <- app.ctx.Done():
return
case t := <-ticker.C:
logger.Info().Time("at", t).Msg("Tick")
app.DeviceManager.CheckAll(ctx)
}
}
}()
}
func (app *App) onSettingsMessage(ctx context.Context, msg mqttpaho.Message) {
// callback for topic /{prefix}/{tvr_id}/settings change's
device_name := strings.Split(msg.Topic(), "/")[1]
logger := zerolog.Ctx(ctx).With().
Str("action", "onSettingsMessage").
Str("device", device_name).
Logger()
logger.Debug().Str("topic", msg.Topic()).Str("payload", string(msg.Payload())).Msg("")
ctx = logger.WithContext(ctx)
var device_settings device.DeviceSettings
if err := json.Unmarshal(msg.Payload(), &device_settings); err != nil {
logger.Error().Err(err).Msg("Parsing payload")
}
tvr := device.Device{
Name: device_name,
Settings: device_settings,
}
if _, ok := app.DeviceManager.Get(device_name); !ok {
err := app.DeviceManager.Add(tvr)
if err != nil {
logger.Error().Err(err).Msg("Unexpected")
return
}
} else {
err := app.DeviceManager.SetSettings(device_name, device_settings)
if err != nil {
logger.Error().Err(err).Msg("Unexpected")
return
}
}
err := app.DeviceManager.Check(ctx, device_name)
if err != nil {
logger.Error().Err(err).Msg("During device `Check`")
}
}
func (app *App) onStateMessage(ctx context.Context, msg mqttpaho.Message) {
// callback for topic /{prefix}/{tvr_id}/state change's
device_name := strings.Split(msg.Topic(), "/")[1]
logger := zerolog.Ctx(ctx).With().
Str("action", "onStateMessage").
Str("device", device_name).
Logger()
ctx = logger.WithContext(ctx)
var state device.DeviceState
if err := json.Unmarshal(msg.Payload(), &state); err != nil {
logger.Error().Err(err).Msg("Error while parsing payload")
}
if _, ok := app.DeviceManager.Get(device_name); !ok {
tvr := device.Device{
Name: device_name,
State: state,
}
app.DeviceManager.Add(tvr)
}
device, ok := app.DeviceManager.Get(device_name)
if !ok {
logger.Error().Msg("Device not found")
}
logger.Warn().Interface("state", state).Interface("device_state", device.State).Msg("")
if !device.SameState(state) {
device.State = state
err := app.DeviceManager.Check(ctx, device_name)
if err != nil {
logger.Error().Err(err).Msg("Error while checking device")
}
} else {
logger.Info().Msg("no state update, ignoring")
}
}
func (app *App) Run() {
ctx, ctxCancel := context.WithCancel(context.Background())
output := zerolog.ConsoleWriter{Out: os.Stderr}
log := zerolog.New(output).
With().
Logger()
ctx = log.WithContext(ctx)
defer ctxCancel()
if err := app.mqtt_hub.Connect(ctx); err != nil {
log.Fatal().Err(err)
}
app.ctx = ctx
app.subscribe("heater/+/settings", 2, app.onSettingsMessage)
app.subscribe("heater/+/state", 2, app.onStateMessage)
go func(ctx context.Context, pub *mqtt.Publisher) {
log := zerolog.Ctx(ctx).With().
Str("action", "publisher").
Logger()
defer log.Error().Msg("publisher stoped")
for {
select {
case msg, ok := <-app.PubChan:
if !ok {
log.Error().Msg("publish PubChan Not OK")
return
}
log.Debug().
Str("topic", msg.Topic).
Str("payload", string(msg.Payload)).
Msg("publish")
pub.Publish(msg.Topic, 1 ,msg.Payload, msg.Retain)
case err, ok := <-pub.OnError:
if !ok {
log.Error().Msg("publish OnError Not OK")
return
}
log.Error().Err(err).Msg("publish OnError")
case <-ctx.Done():
return
}
}
}(ctx, app.mqtt_hub.Publisher(ctx))
app.runTicker()
for {
select {
case <-ctx.Done():
return
}
}
}
func main() {
host := flag.String("host", "localhost", "mqtt host")
port := flag.String("port", "1883", "mqtt port")
username := flag.String("username", "", "mqtt username")
password := flag.String("password", "", "mqtt password")
clientid := flag.String("clientid", "goheater", "mqtt client id")
flag.Parse()
config := Config{
Host: *host,
Port: *port,
Username: *username,
Password: *password,
Clientid: *clientid,
}
app := NewApp(config)
go app.Run()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}

61
mqtt/config.go Normal file
View File

@ -0,0 +1,61 @@
package mqtt
import (
"github.com/google/uuid"
"os"
"strings"
"time"
)
type Config struct {
Host string
Port string
Username string
Password string
ClientID string
CleanSession bool
AutoReconnect bool
Retained bool
KeepAlive time.Duration
MsgChanDept uint
}
func NewConfig() *Config {
host := os.Getenv("MQTT_HOST")
if strings.TrimSpace(host) == "" {
host = "localhost"
}
port := os.Getenv("MQTT_PORT")
if strings.TrimSpace(port) == "" {
port = "1883"
}
username := os.Getenv("MQTT_USERNAME")
if strings.TrimSpace(username) == "" {
username = "guest"
}
password := os.Getenv("MQTT_PASSWORD")
if strings.TrimSpace(password) == "" {
password = "guest"
}
clientId := os.Getenv("MQTT_CLIENT_ID")
if strings.TrimSpace(clientId) == "" {
clientId = uuid.New().String()
}
return &Config{
Host: host,
Port: port,
Username: username,
Password: password,
ClientID: clientId,
CleanSession: true,
AutoReconnect: true,
Retained: false,
KeepAlive: 15 * time.Second,
MsgChanDept: 100,
}
}

52
mqtt/connection.go Normal file
View File

@ -0,0 +1,52 @@
package mqtt
import (
mqttLib "github.com/eclipse/paho.mqtt.golang"
)
type connection struct {
conf *Config
client mqttLib.Client
}
func newConnection(conf *Config) *connection {
conn := &connection{
conf: conf,
}
opts := mqttLib.NewClientOptions()
broker := conn.conf.Host + ":" + conn.conf.Port
opts.AddBroker(broker)
opts.SetClientID(conn.conf.ClientID)
opts.SetUsername(conn.conf.Username)
opts.SetPassword(conn.conf.Password)
opts.SetCleanSession(conn.conf.CleanSession)
opts.SetAutoReconnect(conn.conf.AutoReconnect)
opts.SetKeepAlive(conn.conf.KeepAlive)
opts.SetMessageChannelDepth(conn.conf.MsgChanDept)
if conn.conf.AutoReconnect {
opts.SetResumeSubs(true)
}
conn.client = mqttLib.NewClient(opts)
return conn
}
func (conn *connection) connect() error {
if token := conn.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
func (conn *connection) publish(topic string, qos byte, payload []byte, retain bool) mqttLib.Token {
return conn.client.Publish(topic, qos, retain, payload)
}
func (conn *connection) subscribe(topic string, qos byte, callback func(mqttClient mqttLib.Client, message mqttLib.Message)) mqttLib.Token {
return conn.client.Subscribe(topic, qos, callback)
}

124
mqtt/hub.go Normal file
View File

@ -0,0 +1,124 @@
package mqtt
import (
"context"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/sirupsen/logrus"
)
const (
DefaultPubQoS = 0
DefaultSubQoS = 0
)
type Hub struct {
conn *connection
}
type Subscriber struct {
OnMessage chan mqtt.Message
OnError chan error
}
type Publisher struct {
OnError chan error
publish chan *frame
}
type frame struct {
topic string
qos int
payload []byte
retain bool
}
func NewHub(conf *Config) *Hub {
return &Hub{
conn: newConnection(conf),
}
}
// Connect to MQTT server
func (hub *Hub) Connect(ctx context.Context) error {
if err := hub.conn.connect(); err != nil {
return err
}
go func(ctx context.Context) {
defer logrus.Warn("hub closed MQTT connection")
for {
select {
case <-ctx.Done():
hub.conn.client.Disconnect(1000)
return
}
}
}(ctx)
return nil
}
// Subscribe will create MQTT subscriber and listen for messages.
// Messages and errors are sent to OnMessage and OnError channels.
func (hub *Hub) Subscribe(ctx context.Context, topic string, qos int) *Subscriber {
sub := &Subscriber{
OnMessage: make(chan mqtt.Message),
OnError: make(chan error),
}
go func(ctx context.Context, sub *Subscriber) {
defer func() {
close(sub.OnMessage)
close(sub.OnError)
}()
if token := hub.conn.subscribe(topic, byte(qos), func(mqttClient mqtt.Client, message mqtt.Message) {
sub.OnMessage <- message
}); token.Wait() && token.Error() != nil {
sub.OnError <- token.Error()
}
<-ctx.Done()
}(ctx, sub)
return sub
}
// Publisher will create MQTT publisher and private listener for messages to be published.
// All messages to be published are sent through private publish channel.
// Errors will be sent to OnError channel.
func (hub *Hub) Publisher(ctx context.Context) *Publisher {
pub := &Publisher{
OnError: make(chan error),
publish: make(chan *frame),
}
go func(ctx context.Context, pub *Publisher) {
defer close(pub.OnError)
for {
select {
case fr := <-pub.publish:
if token := hub.conn.publish(fr.topic, byte(fr.qos), fr.payload, fr.retain); token.Wait() && token.Error() != nil {
pub.OnError <- token.Error()
}
case <-ctx.Done():
return
}
}
}(ctx, pub)
return pub
}
// Publish message to topic through private pub.publish channel.
// Thread-safe.
func (pub *Publisher) Publish(topic string, qos int, message []byte, retain bool) {
pub.publish <- &frame{
topic: topic,
qos: qos,
payload: message,
retain: retain,
}
}

270
pkg/device/device.go Normal file
View File

@ -0,0 +1,270 @@
package device
import (
"github.com/eclipse/paho.mqtt.golang"
"github.com/ohler55/ojg/jp"
"github.com/ohler55/ojg/oj"
"fmt"
"time"
"encoding/json"
"context"
"github.com/rs/zerolog"
// "reflect"
)
type DeviceState struct {
Mode string `json:"mode"`
Setpoint int `json:"setpoint"`
Time time.Time `json:"time"`
Program_name string `json:"program_name"`
Until_time time.Time `json:"until_time"`
}
// Internal state
type Device struct {
Name string
Settings DeviceSettings
CurrentSetpoint int
State DeviceState
}
func (d *Device) SameState(state DeviceState) bool {
if state.Mode != d.State.Mode {
return false
}
if state.Setpoint != d.State.Setpoint {
return false
}
if state.Program_name != d.State.Program_name {
return false
}
if !state.Time.Equal(d.State.Time) {
return false
}
if !state.Until_time.Equal(d.State.Until_time) {
return false
}
return true
}
func (d *Device) StateTopic() string {
return fmt.Sprintf("heater/%s/state", d.Name)
}
func (d *Device) CheckSetpoint(logger *zerolog.Logger, pubchan chan Message) error {
// Handle all the update setpoint logic
// push in pubChan a Message if setpoint need to be update
log := logger.With().
Str("device", d.Name).
Int("current_setpoint", d.CurrentSetpoint).
Str("State.Mode", d.State.Mode).
Int("State.Setpoint", d.State.Setpoint).
Str("State.Program_name", d.State.Program_name).
Logger()
log.Info().Msg("Check if setpoint need an update")
switch d.State.Mode {
case "always":
log.Info().Msg("Use always")
case "until_time":
log.Info().Msg("Use until_time")
if d.State.Until_time.Before(time.Now()) {
log.Info().Time("until_time", d.State.Until_time).
Msg("until_time passed, reset")
return d.setProgAndReset(&log, pubchan)
}
case "until_next":
log.Info().Msg("Use until_next")
var prog string = "default"
if d.State.Program_name != "" {
prog = d.State.Program_name
}
program, ok := d.Settings.Programs[prog]
if !ok {
return fmt.Errorf("program not found")
}
next, err := program.NextTime(d.State.Time)
if err!= nil {
return err
}
if time.Now().After(next) {
// force current program
// reset state
log.Info().Time("now", time.Now()).Time("next", next).Msg("until_next expired")
return d.setProgAndReset(&log, pubchan)
}
case "program":
log.Info().Msg("Use program mode")
if d.State.Setpoint == 0 {
//in case of new program mode
var prog string = "default"
if d.State.Program_name != "" {
prog = d.State.Program_name
}
value, err := d.setpointValueProg(prog)
if err != nil {
return err
}
d.State.Setpoint = value
d.PublishState(d.State, pubchan)
}
default:
log.Info().Msg("Use default mode")
return d.setProgAndReset(&log, pubchan)
}
if d.State.Setpoint != d.CurrentSetpoint {
log.Warn().Msg("Need setpoint update")
return d.publishSetpoint(d.State.Setpoint, pubchan)
} else {
log.Warn().Msg("No setpoint update")
}
return nil
}
func (d *Device) setProgAndReset(logger *zerolog.Logger, pubchan chan Message) error {
prog_name := "default"
if d.State.Program_name != "" {
prog_name = d.State.Program_name
}
log := logger.With().Str("program", prog_name).Logger()
value, err := d.setpointValueProg(prog_name)
if err != nil {
return err
}
log = log.With().Int("futur_setpoint", value).Logger()
if d.CurrentSetpoint != value {
log.Warn().Msg("publish setpoint update")
err = d.publishSetpoint(value, pubchan)
if err != nil {
return err
}
} else {
log.Warn().Msg("no setpoint update")
}
state := DeviceState{
Setpoint: value,
Mode: "program",
Program_name: prog_name,
Time: time.Now(),
}
d.State = state
payload, err := json.Marshal(state)
if err != nil {
return err
}
pubchan <- Message {
Topic: d.StateTopic(),
Payload: payload,
Retain: true,
}
return nil
}
func (d *Device) publishSetpoint(value int, pubchan chan Message) error {
topic, err := d.Settings.TVR.FormatTopic(d.Name)
if err != nil {
return err
}
payload, err := d.Settings.TVR.FormatPayload(value)
if err != nil {
return err
}
pubchan <- Message{
Topic: topic,
Payload: []byte(payload),
Retain: false,
}
return nil
}
func (d *Device) PublishState(state DeviceState, pubchan chan Message) error {
payload, err := json.Marshal(state)
if err != nil {
return err
}
pubchan <- Message{
Topic: d.StateTopic(),
Payload: payload,
Retain: true,
}
return nil
}
func (d Device) ListenTopic() (string, error) {
return d.Settings.TVR.FormatTopicState(d.Name)
}
func (d *Device) onMessage(ctx context.Context, msg mqtt.Message) {
log := zerolog.Ctx(ctx).With().
Str("action", "device state receive").
Str("device", d.Name).
Logger()
log.Debug().
Str("topic", msg.Topic()).
Str("payload", string(msg.Payload())).
Msg("Message get")
obj, err := oj.ParseString(string(msg.Payload()))
if err != nil {
log.Error().Err(err).Msg("during payload parse")
return
}
x, err := jp.ParseString(d.Settings.TVR.Setpoint_state_jp)
if err != nil {
log.Error().Err(err).Msg("while parsing payload")
return
}
r := x.First(obj)
if v, ok := r.(int64); ok{
d.CurrentSetpoint = int(v)
} else {
log.Error().Err(err).Interface("parsing payload", r).Msg("while parsing payload")
}
}
func (d *Device) programSetpoint(prog_name string) (Setpoint, error) {
program, ok := d.Settings.Programs[prog_name]
if !ok {
return Setpoint{}, fmt.Errorf("device %s don't have %s program", d.Name, prog_name)
}
setpoint := program.Current()
return setpoint, nil
}
func (d *Device) setpointValue(setpoint Setpoint) (int, error) {
if len(d.Settings.Presets) < setpoint.Preset_id + 1 {
return 0, fmt.Errorf("Preset id %d didn't found", setpoint.Preset_id)
}
return d.Settings.Presets[setpoint.Preset_id].Value, nil
}
func (d *Device) setpointValueProg(prog_name string) (int, error) {
setpoint, err := d.programSetpoint(prog_name)
if err != nil{
return 0, err
}
val, err := d.setpointValue(setpoint)
if err != nil{
return 0, err
}
return val, nil
}

View File

@ -0,0 +1,99 @@
package device
import (
"fmt"
"context"
"github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog"
)
type Subscriber func(string, int, func(context.Context, mqtt.Message)) context.CancelFunc
type DeviceManager struct {
devices map[string]*Device
subscriber Subscriber
sub_cancel map[string]context.CancelFunc
PubChan chan Message
}
func NewDeviceManager(pubchan chan Message, subscriber Subscriber) *DeviceManager {
return &DeviceManager{
devices: make(map[string]*Device),
sub_cancel: make(map[string]context.CancelFunc),
subscriber: subscriber,
PubChan: pubchan,
}
}
func (m *DeviceManager) Get(name string) (*Device, bool) {
device, ok := m.devices[name]
return device, ok
}
func (m *DeviceManager) Add(device Device) error {
if _, prs := m.devices[device.Name]; prs {
return fmt.Errorf("device %s already exist", device.Name)
}
m.devices[device.Name] = &device
// subscribe to state topic
topic, err := device.ListenTopic()
if err != nil {
return err
}
if topic != "" {
cancel := m.subscriber(topic, 2, device.onMessage)
m.sub_cancel[device.Name] = cancel
}
return nil
}
func (m *DeviceManager) Delete(name string) error {
return nil
}
func (m *DeviceManager) SetSettings(name string, settings DeviceSettings) error {
device, ok := m.devices[name]
if !ok {
return fmt.Errorf("Not existings device %s", name)
}
device.Settings = settings
return nil
}
func (m *DeviceManager) SetState(name string, state DeviceState) error {
device, ok := m.devices[name]
if !ok {
return fmt.Errorf("Not existings device %s", name)
}
device.State = state
return nil
}
func (m *DeviceManager) Check(ctx context.Context, name string) error {
logger := zerolog.Ctx(ctx)
device, ok := m.devices[name]
if !ok {
return fmt.Errorf("Device %s don't exist", name)
}
err := device.CheckSetpoint(logger, m.PubChan)
if err != nil {
logger.Error().Err(err).Msg("")
return err
}
return nil
}
func (m *DeviceManager) CheckAll(ctx context.Context) error {
logger := zerolog.Ctx(ctx)
for _, device := range m.devices {
err := device.CheckSetpoint(logger, m.PubChan)
if err != nil {
logger.Error().Err(err).Msg("")
return err
}
}
return nil
}

206
pkg/device/settings.go Normal file
View File

@ -0,0 +1,206 @@
package device
import (
"time"
"bytes"
"text/template"
"fmt"
)
type DayOfWeek int
func (d DayOfWeek) Next() DayOfWeek {
if d == Sunday {
return Monday
}
return d+1
}
func (d DayOfWeek) Previous() DayOfWeek {
if d == Monday {
return Sunday
}
return d-1
}
func (d DayOfWeek) DurationBetween(n DayOfWeek) time.Duration {
// return duration between two day of week
var duration time.Duration
if (d-n == 0) {
duration, _ = time.ParseDuration("168h")
} else {
duration, _ = time.ParseDuration(fmt.Sprintf("%dh", (d-n)*24 ))
}
return duration
}
const (
Monday DayOfWeek = 0
Thuesday = 1
Wednesday = 2
Thursday = 3
Friday = 4
Saturday = 5
Sunday = 6
)
func WeekDayEnToFr(weekday time.Weekday) DayOfWeek {
// translate weekday to french week, start by Monday
return map[time.Weekday]DayOfWeek {
time.Monday : Monday,
time.Tuesday : Thuesday,
time.Wednesday: Wednesday,
time.Thursday : Thursday,
time.Friday: Friday,
time.Saturday: Saturday,
time.Sunday: Sunday,
}[weekday]
}
func daytime(t time.Time) int {
return t.Hour()*60 + t.Minute()
}
func weekday(t time.Time) DayOfWeek {
return WeekDayEnToFr(t.Weekday())
}
type Message struct {
Payload []byte
Topic string
Retain bool
}
type Setpoint struct {
Start int `json:"start"`
Preset_id int `json:"preset_id"`
}
type WeekProgram map[DayOfWeek][]Setpoint
func (p WeekProgram) Current() Setpoint {
// return current Setpoint
now := time.Now()
weekday := weekday(now)
daytime := daytime(now)
setpoint := Setpoint{}
for _, sp := range p[weekday] {
if daytime < sp.Start {
break
}
setpoint = sp
}
return setpoint
}
func (p WeekProgram) NextTime(t time.Time) (time.Time, error) {
// return next program change
setpoint := Setpoint{}
var next time.Time
weekday := weekday(t)
daytime := daytime(t)
// Recursive func to find setpoint on weekday
get := func (weekday DayOfWeek, daytime int) Setpoint {
setpoint := Setpoint{}
for _, sp := range p[weekday] {
setpoint = sp
if daytime < sp.Start {
return setpoint
}
}
return Setpoint{}
}
startweekday := weekday
for (setpoint == Setpoint{}) {
setpoint = get(weekday, daytime)
if (setpoint != Setpoint{}) {
// setpoint found, compute time
next := t.Add( startweekday.DurationBetween(weekday) )
next = t.Add( time.Duration(setpoint.Start - daytime ) * time.Minute )
return next, nil
}
weekday = weekday.Next()
daytime = 0
if weekday == startweekday {
return next, fmt.Errorf("Shouldn't happen no setpoint found over the week")
}
}
return next, nil
}
type Programs map[string]WeekProgram
type Preset struct {
Label string `json:"label"`
Value int `json:"value"`
Color string `json:"color"`
}
type TVRSettings struct {
Setpoint_topic string `json:"setpoint_topic"`
Setpoint_payload string `json:"setpoint_payload"`
Setpoint_state_topic string `json:"setpoint_state_topic"`
Setpoint_state_jp string `json:"setpoint_state_jp"`
}
func (s TVRSettings) FormatTopicState(device_name string) (string, error) {
type Variable struct {
Device string
}
variables := Variable{device_name}
t, err := template.New("topic").Parse(s.Setpoint_state_topic)
if err != nil {
return "", err
}
buf := new(bytes.Buffer)
err = t.Execute(buf, variables)
if err != nil {
return "", err
}
return buf.String(), nil
}
func (s TVRSettings) FormatTopic(device_name string) (string, error) {
type Variable struct {
Device string
}
variables := Variable{device_name}
t, err := template.New("topic").Parse(s.Setpoint_topic)
if err != nil {
return "", err
}
buf := new(bytes.Buffer)
err = t.Execute(buf, variables)
if err != nil {
return "", err
}
return buf.String(), nil
}
func (s TVRSettings) FormatPayload(setpoint int) (string, error) {
type Variable struct {
Setpoint int
}
variables := Variable{setpoint}
t, err := template.New("payload").Parse(s.Setpoint_payload)
if err != nil {
return "", err
}
buf := new(bytes.Buffer)
err = t.Execute(buf, variables)
if err != nil {
return "", err
}
return buf.String(), nil
}
type DeviceSettings struct {
Programs Programs `json:"programs"`
Presets []Preset `json:"presets"`
TVR TVRSettings `json:"TVR"`
}