From 30b3e4417b566feed387ca7eecadef47ef082d47 Mon Sep 17 00:00:00 2001 From: Nicolas Duhamel Date: Thu, 27 Oct 2022 07:45:09 +0200 Subject: [PATCH] Use set state topic, improve update logic and test --- go.mod | 3 +- go.sum | 4 + main.go | 26 ++-- pkg/device/device.go | 320 +++++++++++++++++++++++--------------- pkg/device/device_test.go | 277 +++++++++++++++++++++++++++++++-- 5 files changed, 478 insertions(+), 152 deletions(-) diff --git a/go.mod b/go.mod index 132a163..6d990ca 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,8 @@ require ( github.com/ohler55/ojg v1.14.5 // indirect github.com/rs/zerolog v1.28.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect - golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b // indirect + golang.org/x/net v0.1.0 // indirect golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde // indirect golang.org/x/sys v0.1.0 // indirect + golang.org/x/tools v0.2.0 // indirect ) diff --git a/go.sum b/go.sum index 91c759d..4d1e977 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbP golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b h1:ZmngSVLe/wycRns9MKikG9OWIEjGcGAkacif7oYQaUY= golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc= @@ -50,5 +52,7 @@ golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= +golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 0dc01a7..76923d3 100644 --- a/main.go +++ b/main.go @@ -148,12 +148,12 @@ func (app *App) onSettingsMessage(ctx context.Context, msg mqttpaho.Message) { } } -func (app *App) onStateMessage(ctx context.Context, msg mqttpaho.Message) { - // callback for topic /{prefix}/{tvr_id}/state change's +func (app *App) onSetStateMessage(ctx context.Context, msg mqttpaho.Message) { + // callback for topic /{prefix}/{tvr_id}/state/set change's device_name := strings.Split(msg.Topic(), "/")[1] logger := zerolog.Ctx(ctx).With(). - Str("action", "onStateMessage"). + Str("action", "onSetStateMessage"). Str("device", device_name). Logger() ctx = logger.WithContext(ctx) @@ -164,26 +164,22 @@ func (app *App) onStateMessage(ctx context.Context, msg mqttpaho.Message) { logger.Error().Err(err).Msg("Error while parsing payload") } - if _, ok := app.DeviceManager.Get(device_name); !ok { - tvr := device.Device{ - Name: device_name, - State: state, - } - app.DeviceManager.Add(tvr) - } + logger.Info().Interface("state", state).Msg("new state") + device, ok := app.DeviceManager.Get(device_name) if !ok { - logger.Error().Msg("Device not found") + logger.Error().Msg("Device not found, abord") + return } - logger.Warn().Interface("state", state).Interface("device_state", device.State).Msg("") - if !device.SameState(state) { device.State = state + err := app.DeviceManager.Check(ctx, device_name) if err != nil { - logger.Error().Err(err).Msg("Error while checking device") + logger.Error().Err(err).Msg("error while checking device") } + // device.SetState(log, state, pubchan) } else { logger.Info().Msg("no state update, ignoring") } @@ -210,7 +206,7 @@ func (app *App) Run() { app.ctx = ctx app.subscribe("heater/+/settings", 2, app.onSettingsMessage) - app.subscribe("heater/+/state", 2, app.onStateMessage) + app.subscribe("heater/+/state/set", 2, app.onSetStateMessage) go func(ctx context.Context, pub *mqtt.Publisher) { log := zerolog.Ctx(ctx).With(). diff --git a/pkg/device/device.go b/pkg/device/device.go index 309949c..10f5b9a 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -17,10 +17,10 @@ var timeNow = func() time.Time { } - type Error string func (e Error) Error() string { return string(e) } + type DeviceState struct { Mode string `json:"mode"` Setpoint int `json:"setpoint"` @@ -29,6 +29,40 @@ type DeviceState struct { Until_time time.Time `json:"until_time"` } +func (s *DeviceState) Equivalent(state DeviceState) bool { + if state.Mode != s.Mode { + return false + } + if !state.Time.Equal(s.Time) { + return false + } + + switch state.Mode { + case "always": + if state.Setpoint != s.Setpoint { + return false + } + case "until_next": + if state.Setpoint != s.Setpoint { + return false + } + case "until_time": + if state.Setpoint != s.Setpoint { + return false + } + if !state.Until_time.Equal(s.Until_time) { + return false + } + case "program": + if state.Program_name != s.Program_name { + return false + } + } + + return true +} + + // Internal state type Device struct { Name string @@ -37,24 +71,6 @@ type Device struct { State DeviceState } -func (d *Device) SameState(state DeviceState) bool { - if state.Mode != d.State.Mode { - return false - } - if state.Setpoint != d.State.Setpoint { - return false - } - if state.Program_name != d.State.Program_name { - return false - } - if !state.Time.Equal(d.State.Time) { - return false - } - if !state.Until_time.Equal(d.State.Until_time) { - return false - } - return true -} func (d *Device) StateTopic() string { return fmt.Sprintf("heater/%s/state", d.Name) @@ -64,6 +80,7 @@ func (d Device) ListenTopic() (string, error) { return d.Settings.TVR.FormatTopicState(d.Name) } + func (d *Device) Program() (WeekProgram, error) { // return current device program if specified or default one prog_name := "default" @@ -79,6 +96,7 @@ func (d *Device) Program() (WeekProgram, error) { return program, nil } + func (d *Device) ProgramName() (string) { prog_name := "default" if d.State.Program_name != "" { @@ -87,9 +105,10 @@ func (d *Device) ProgramName() (string) { return prog_name } -func (d *Device) SetState(state DeviceState, pubchan chan Message) error { - payload, err := json.Marshal(state) +func (d *Device) publishState(pubchan chan Message) error { + + payload, err := json.Marshal(d.State) if err != nil { return err } @@ -100,11 +119,10 @@ func (d *Device) SetState(state DeviceState, pubchan chan Message) error { Retain: true, } - d.State = state - return nil } + func (d *Device) SetSetpoint(value int, pubchan chan Message) error { topic, err := d.Settings.TVR.FormatTopic(d.Name) if err != nil { @@ -125,10 +143,50 @@ func (d *Device) SetSetpoint(value int, pubchan chan Message) error { } -func (d *Device) CheckSetpoint(logger *zerolog.Logger, pubchan chan Message) error { +func (d *Device) SetState(log *zerolog.Logger, state DeviceState, pubchan chan Message) error { + // If same state do nothing + // else use checksetpoint for changing state + if d.State.Equivalent(state) { + log.Debug().Msg("same state no change") + return nil + } + + d.State = state + + // ignore change info, we already known + _, err := d.update(log, pubchan) + if err != nil { + return err + } + + if err := d.publishState(pubchan); err != nil { + return err + } + + return nil +} + + +func (d *Device) CheckSetpoint(log *zerolog.Logger, pubchan chan Message) error { + change, err := d.update(log, pubchan) + if err != nil { + return err + } + + if change { + if err := d.publishState(pubchan); err != nil { + return err + } + } + + return nil +} + + +func (d *Device) update(log *zerolog.Logger, pubchan chan Message) (bool, error) { // Handle all the update setpoint logic // push in pubChan a Message if setpoint need to be update - log := logger.With(). + *log = log.With(). Str("device", d.Name). Int("current_setpoint", d.CurrentSetpoint). Str("State.Mode", d.State.Mode). @@ -139,55 +197,22 @@ func (d *Device) CheckSetpoint(logger *zerolog.Logger, pubchan chan Message) err switch d.State.Mode { case "always": - return d.handle_always(&log, pubchan) + return d.handle_always(log, pubchan) case "until_time": - return d.handle_until_time(&log, pubchan) + return d.handle_until_time(log, pubchan) case "until_next": - return d.handle_until_next(&log, pubchan) + return d.handle_until_next(log, pubchan) case "program": - return d.handle_program(&log, pubchan) + return d.handle_program(log, pubchan) default: log.Info().Msg("Use default mode") - return d.setProgAndReset(&log, pubchan) + return d.handle_reset_state(log, pubchan) } } -func (d *Device) setProgAndReset(log *zerolog.Logger, pubchan chan Message) error { - program, err := d.Program() - if err != nil { - return err - } - - current_setpoint := program.Current() - - value, err := current_setpoint.Value(d.Settings.Presets) - if err != nil { - return err - } - - if d.CurrentSetpoint != value { - log.Info().Msg("publish setpoint update") - err = d.SetSetpoint(value, pubchan) - if err != nil { - return err - } - } else { - log.Info().Msg("no setpoint update") - } - - state := DeviceState{ - Setpoint: value, - Mode: "program", - Program_name: d.ProgramName(), - Time: timeNow(), - } - - return d.SetState(state, pubchan) -} - func (d *Device) onMessage(ctx context.Context, msg mqtt.Message) { log := zerolog.Ctx(ctx).With(). @@ -223,82 +248,129 @@ func (d *Device) onMessage(ctx context.Context, msg mqtt.Message) { } -func (d *Device) handle_always(log *zerolog.Logger, pubchan chan Message) error { - if d.State.Setpoint != d.CurrentSetpoint { - return d.SetSetpoint(d.State.Setpoint, pubchan) - } - - return nil -} - -func (d *Device) handle_until_time(log *zerolog.Logger, pubchan chan Message) error { - *log = log.With().Time("until_time", d.State.Until_time).Logger() - - if d.State.Until_time.Before(timeNow()) { - log.Info().Msg("until_time passed, reset") - return d.setProgAndReset(log, pubchan) - } - - if d.State.Setpoint != d.CurrentSetpoint { - log.Info().Msg("need setpoint update") - return d.SetSetpoint(d.State.Setpoint, pubchan) - } - - return nil -} - -func (d *Device) handle_until_next(log *zerolog.Logger, pubchan chan Message) error { - *log = log.With().Time("until_next", d.State.Time).Logger() - +func (d *Device) handle_reset_state(log *zerolog.Logger, pubchan chan Message) (bool, error) { + // called when need to fallback to program mode previous or default program, err := d.Program() if err != nil { - return err - } - - next, err := program.NextTime(d.State.Time) - if err!= nil { - return err - } - - if timeNow().After(next) { - // force current program - // reset state - log.Info().Time("now", timeNow()).Time("next", next).Msg("until_next expired") - return d.setProgAndReset(log, pubchan) - } - - if d.State.Setpoint != d.CurrentSetpoint { - log.Info().Msg("need setpoint update") - return d.SetSetpoint(d.State.Setpoint, pubchan) - } - - return nil -} - -func (d *Device) handle_program(log *zerolog.Logger, pubchan chan Message) error { - *log = log.With().Str("program", d.State.Program_name).Logger() - - program, err := d.Program() - if err != nil { - return err + return false, err } current_setpoint := program.Current() value, err := current_setpoint.Value(d.Settings.Presets) if err != nil { - return err + return false, err + } + + d.State = DeviceState{ + Setpoint: value, + Mode: "program", + Program_name: d.ProgramName(), + Time: timeNow(), } if d.CurrentSetpoint != value { log.Info().Msg("publish setpoint update") err = d.SetSetpoint(value, pubchan) if err != nil { - return err + return false, err } - } else { - log.Info().Msg("no setpoint update") } - return nil + return true, nil +} + + +func (d *Device) handle_always(log *zerolog.Logger, pubchan chan Message) (bool, error) { + // return true if change made + + if d.State.Setpoint != d.CurrentSetpoint { + if err:= d.SetSetpoint(d.State.Setpoint, pubchan); err != nil { + return false, err + } + return true, nil + } + + log.Info().Msg("no setpoint update") + return false, nil +} + +func (d *Device) handle_until_time(log *zerolog.Logger, pubchan chan Message) (bool, error) { + *log = log.With().Time("until_time", d.State.Until_time).Logger() + + if d.State.Until_time.Before(timeNow()) { + log.Info().Msg("until_time passed, reset") + return d.handle_reset_state(log, pubchan) + } + + if d.State.Setpoint != d.CurrentSetpoint { + log.Info().Msg("need setpoint update") + if err := d.SetSetpoint(d.State.Setpoint, pubchan); err != nil { + return false, err + } + return true, nil + } + + log.Info().Msg("no setpoint update") + return false, nil +} + +func (d *Device) handle_until_next(log *zerolog.Logger, pubchan chan Message) (bool, error) { + *log = log.With().Time("until_next", d.State.Time).Logger() + + program, err := d.Program() + if err != nil { + return false, err + } + + next, err := program.NextTime(d.State.Time) + if err!= nil { + return false, err + } + + if timeNow().After(next) { + // force current program + // reset state + log.Info().Time("now", timeNow()).Time("next", next).Msg("until_next expired") + return d.handle_reset_state(log, pubchan) + } + + if d.State.Setpoint != d.CurrentSetpoint { + log.Info().Msg("need setpoint update") + if err := d.SetSetpoint(d.State.Setpoint, pubchan); err != nil { + return false, err + } + return true, nil + } + + log.Info().Msg("no setpoint update") + return false, nil +} + +func (d *Device) handle_program(log *zerolog.Logger, pubchan chan Message) (bool, error) { + *log = log.With().Str("program", d.State.Program_name).Logger() + + program, err := d.Program() + if err != nil { + return false, err + } + + current_setpoint := program.Current() + + value, err := current_setpoint.Value(d.Settings.Presets) + if err != nil { + return false, err + } + + if d.CurrentSetpoint != value { + log.Info().Msg("publish setpoint update") + if err := d.SetSetpoint(value, pubchan); err != nil { + return false, err + } + d.State.Setpoint = value + return true, nil + } + + log.Info().Msg("no setpoint update") + return false, nil } diff --git a/pkg/device/device_test.go b/pkg/device/device_test.go index 2e21942..661ba89 100644 --- a/pkg/device/device_test.go +++ b/pkg/device/device_test.go @@ -13,8 +13,6 @@ import ( var test_time = time.Date(2022, time.October, 24, 0, 0, 0, 0, time.Local) - - var test_presets = []Preset{ {Label: "default", Value: 17, Color: "#012a36"}, {Label: "normal", Value: 19, Color: "#b6244f"}, @@ -57,6 +55,192 @@ var test_device = Device { }, } +func TestStateEquivalent(t *testing.T) { + + var tests = []struct{ + state1 DeviceState + state2 DeviceState + want bool + }{ + { + DeviceState{ + Mode: "program", + Setpoint: 14, + Time: test_time, + Program_name: "default", + }, + DeviceState{ + Mode: "always", + Setpoint: 13, + Time: test_time, + }, + false, + }, + { + DeviceState{ + Mode: "program", + Setpoint: 14, + Time: test_time, + Program_name: "default", + }, + DeviceState{ + Mode: "program", + Setpoint: 14, + Time: test_time.Add(1*time.Minute), + Program_name: "default", + }, + false, + }, + { + DeviceState{ + Mode: "program", + Setpoint: 14, + Time: test_time, + Program_name: "default", + }, + DeviceState{ + Mode: "program", + Setpoint: 13, + Time: test_time, + Program_name: "default", + }, + true, + }, + { + DeviceState{ + Mode: "program", + Setpoint: 14, + Time: test_time, + Program_name: "default", + }, + DeviceState{ + Mode: "program", + Setpoint: 13, + Time: test_time, + Program_name: "other", + }, + false, + }, + { + DeviceState{ + Mode: "always", + Setpoint: 14, + Time: test_time, + }, + DeviceState{ + Mode: "always", + Setpoint: 14, + Time: test_time, + Program_name: "other", + Until_time: test_time, + }, + true, + }, + { + DeviceState{ + Mode: "always", + Setpoint: 14, + Time: test_time, + }, + DeviceState{ + Mode: "always", + Setpoint: 15, + Time: test_time, + Program_name: "other", + Until_time: test_time, + }, + false, + }, + { + DeviceState{ + Mode: "until_next", + Setpoint: 14, + Time: test_time, + }, + DeviceState{ + Mode: "until_next", + Setpoint: 14, + Time: test_time, + Program_name: "other", + Until_time: test_time, + }, + true, + }, + { + DeviceState{ + Mode: "until_next", + Setpoint: 14, + Time: test_time, + }, + DeviceState{ + Mode: "until_next", + Setpoint: 13, + Time: test_time, + Program_name: "other", + Until_time: test_time, + }, + false, + }, + { + DeviceState{ + Mode: "until_time", + Setpoint: 14, + Time: test_time, + Until_time: test_time.Add(1*time.Hour), + }, + DeviceState{ + Mode: "until_time", + Setpoint: 14, + Time: test_time, + Program_name: "other", + Until_time: test_time.Add(1*time.Hour), + }, + true, + }, + { + DeviceState{ + Mode: "until_time", + Setpoint: 14, + Time: test_time, + Until_time: test_time.Add(1*time.Hour), + }, + DeviceState{ + Mode: "until_time", + Setpoint: 13, + Time: test_time, + Program_name: "other", + Until_time: test_time.Add(1*time.Hour), + }, + false, + }, + { + DeviceState{ + Mode: "until_time", + Setpoint: 14, + Time: test_time, + Until_time: test_time.Add(1*time.Hour), + }, + DeviceState{ + Mode: "until_time", + Setpoint: 14, + Time: test_time, + Program_name: "other", + Until_time: test_time.Add(2*time.Hour), + }, + false, + }, + } + for i, tt := range tests { + testname := fmt.Sprintf("%d", i ) + t.Run(testname, func(t *testing.T) { + r := tt.state1.Equivalent(tt.state2) + if r != tt.want { + t.Errorf("got %t, want %t", r, tt.want) + } + }) + } +} + func TestStateTopic(t *testing.T) { topic := test_device.StateTopic() @@ -65,6 +249,17 @@ func TestStateTopic(t *testing.T) { } } +func TestListenTopic(t *testing.T) { + topic, err := test_device.ListenTopic() + want := "zigbee2mqtt/TVR/valid" + if err != nil { + t.Errorf("Got %s", err.Error()) + } + if topic != want { + t.Errorf("Got %s; want %s", topic, want) + } +} + func TestProgram(t *testing.T) { //case 1: no program set in state return default case1_device := test_device @@ -116,7 +311,13 @@ func TestProgram(t *testing.T) { } -func TestCheckSetpoint(t *testing.T) { +func TestUpdate(t *testing.T) { + // device 1: currentSetpoint 0 program, not specified expect default + // device 2: currentSetpoint 0 program, unknown one, expect error + // device 3: currentSetpoint 0 always, 22 + // device 4: currentSetpoint 22 until_time, fixed at timeNow, for 2h 22 + // device 5: currentSetpoint 17 indem 4 but fixed timeNow-2h + timeNow = func() time.Time { return test_time } @@ -134,17 +335,56 @@ func TestCheckSetpoint(t *testing.T) { device2.Name = "2" device2.State.Program_name = "unknown" + device3 := test_device + device3.Name = "3" + device3.State = DeviceState{ + Mode: "always", + Setpoint: 22, + } + + device4 := test_device + device4.Name = "4" + device4.CurrentSetpoint = 22 + device4.State = DeviceState{ + Mode: "until_time", + Setpoint: 22, + Time: timeNow(), + Until_time: timeNow().Add(2*time.Hour), + } + + device5 := test_device + device5.Name = "5" + device5.CurrentSetpoint = 17 + device5.State = DeviceState{ + Mode: "until_time", + Setpoint: 22, + Time: timeNow().Add(-2*time.Hour), + Until_time: timeNow().Add(-1*time.Minute), + } + var tests = []struct{ device Device want []Message + change bool err error }{ - {device1, []Message{Message{ - Payload: []byte("{\"current_heating_setpoint\": 17}"), - Topic: "zigbee2mqtt/TVR/1/set", - Retain: false, - }}, nil} , - {device2, []Message{}, Error("device 2 don't have unknown program")}, + {device1, []Message{ + Message{ + Payload: []byte("{\"current_heating_setpoint\": 17}"), + Topic: "zigbee2mqtt/TVR/1/set", + Retain: false, + }, + }, true, nil} , + {device2, []Message{}, false, Error("device 2 don't have unknown program")}, + {device3, []Message{ + Message{ + Payload: []byte("{\"current_heating_setpoint\": 22}"), + Topic: "zigbee2mqtt/TVR/3/set", + Retain: false, + }, + }, true, nil} , + {device4, []Message{}, false, nil} , + {device5, []Message{}, true, nil} , } for _, tt := range tests { @@ -155,13 +395,17 @@ func TestCheckSetpoint(t *testing.T) { rchan := make(chan Message, 10) errchan := make(chan error, 1) + changechan := make(chan bool, 1) - go func(result chan Message, errchan chan error) { + go func(result chan Message, changechan chan bool, errchan chan error) { logger := zerolog.New(ioutil.Discard).With().Timestamp().Logger() - errchan <- tt.device.CheckSetpoint(&logger, result) + change, err := tt.device.update(&logger, result) + errchan <- err + changechan <- change close(rchan) + close(changechan) close(errchan) - }(rchan, errchan) + }(rchan, changechan, errchan) for msg := range rchan { result = append(result, msg) @@ -176,6 +420,15 @@ func TestCheckSetpoint(t *testing.T) { t.Errorf("got %s, want %s", err, tt.err) } + change, ok := <- changechan + if !ok { + t.Fatal(err) + } + + if change != tt.change { + t.Errorf("got %s, want %s", err, tt.err) + } + if !reflect.DeepEqual(result,tt.want) { t.Errorf("got %v, want %v", result, tt.want) }