package device import ( "github.com/eclipse/paho.mqtt.golang" "github.com/ohler55/ojg/jp" "github.com/ohler55/ojg/oj" "fmt" "time" "encoding/json" "context" "github.com/rs/zerolog" // "reflect" ) var timeNow = func() time.Time { return time.Now() } type Error string func (e Error) Error() string { return string(e) } type DeviceState struct { Mode string `json:"mode"` Setpoint int `json:"setpoint"` Time time.Time `json:"time"` Program_name string `json:"program_name"` Until_time time.Time `json:"until_time"` } func (s *DeviceState) Equivalent(state DeviceState) bool { if state.Mode != s.Mode { return false } if !state.Time.Equal(s.Time) { return false } switch state.Mode { case "always": if state.Setpoint != s.Setpoint { return false } case "until_next": if state.Setpoint != s.Setpoint { return false } case "until_time": if state.Setpoint != s.Setpoint { return false } if !state.Until_time.Equal(s.Until_time) { return false } case "program": if state.Program_name != s.Program_name { return false } } return true } // Internal state type Device struct { Name string Settings DeviceSettings CurrentSetpoint int State DeviceState } func (d *Device) StateTopic() string { return fmt.Sprintf("heater/%s/state", d.Name) } func (d Device) ListenTopic() (string, error) { return d.Settings.TVR.FormatTopicState(d.Name) } func (d *Device) Program() (WeekProgram, error) { // return current device program if specified or default one prog_name := "default" if d.State.Program_name != "" { prog_name = d.State.Program_name } program, ok := d.Settings.Programs[prog_name] if !ok { return WeekProgram{}, Error(fmt.Sprintf("device %s don't have %s program", d.Name, prog_name)) } return program, nil } func (d *Device) ProgramName() (string) { prog_name := "default" if d.State.Program_name != "" { prog_name = d.State.Program_name } return prog_name } func (d *Device) publishState(pubchan chan Message) error { payload, err := json.Marshal(d.State) if err != nil { return err } pubchan <- Message { Topic: d.StateTopic(), Payload: payload, Retain: true, } return nil } func (d *Device) SetSetpoint(value int, pubchan chan Message) error { topic, err := d.Settings.TVR.FormatTopic(d.Name) if err != nil { return err } payload, err := d.Settings.TVR.FormatPayload(value) if err != nil { return err } pubchan <- Message{ Topic: topic, Payload: []byte(payload), Retain: false, } return nil } func (d *Device) SetState(log *zerolog.Logger, state DeviceState, pubchan chan Message) error { // If same state do nothing // else use checksetpoint for changing state if d.State.Equivalent(state) { log.Debug().Msg("same state no change") return nil } d.State = state // ignore change info, we already known _, err := d.update(log, pubchan) if err != nil { return err } if err := d.publishState(pubchan); err != nil { return err } return nil } func (d *Device) CheckSetpoint(log *zerolog.Logger, pubchan chan Message) error { change, err := d.update(log, pubchan) if err != nil { return err } if change { if err := d.publishState(pubchan); err != nil { return err } } return nil } func (d *Device) update(log *zerolog.Logger, pubchan chan Message) (bool, error) { // Handle all the update setpoint logic // push in pubChan a Message if setpoint need to be update *log = log.With(). Str("device", d.Name). Int("current_setpoint", d.CurrentSetpoint). Str("State.Mode", d.State.Mode). Int("State.Setpoint", d.State.Setpoint). Str("State.Program_name", d.State.Program_name). Logger() log.Info().Msg("Check if setpoint need an update") switch d.State.Mode { case "always": return d.handle_always(log, pubchan) case "until_time": return d.handle_until_time(log, pubchan) case "until_next": return d.handle_until_next(log, pubchan) case "program": return d.handle_program(log, pubchan) default: log.Info().Msg("Use default mode") return d.handle_reset_state(log, pubchan) } } func (d *Device) onMessage(ctx context.Context, msg mqtt.Message) { log := zerolog.Ctx(ctx).With(). Str("action", "device state receive"). Str("device", d.Name). Logger() log.Debug(). Str("topic", msg.Topic()). Str("payload", string(msg.Payload())). Msg("Message get") obj, err := oj.ParseString(string(msg.Payload())) if err != nil { log.Error().Err(err).Msg("during payload parse") return } x, err := jp.ParseString(d.Settings.TVR.Setpoint_state_jp) if err != nil { log.Error().Err(err).Msg("while parsing payload") return } r := x.First(obj) if v, ok := r.(int64); ok{ d.CurrentSetpoint = int(v) } else { log.Error().Err(err).Interface("parsing payload", r).Msg("while parsing payload") } } func (d *Device) handle_reset_state(log *zerolog.Logger, pubchan chan Message) (bool, error) { // called when need to fallback to program mode previous or default program, err := d.Program() if err != nil { return false, err } current_setpoint := program.Current() value, err := current_setpoint.Value(d.Settings.Presets) if err != nil { return false, err } d.State = DeviceState{ Setpoint: value, Mode: "program", Program_name: d.ProgramName(), Time: timeNow(), } if d.CurrentSetpoint != value { log.Info().Msg("publish setpoint update") err = d.SetSetpoint(value, pubchan) if err != nil { return false, err } } return true, nil } func (d *Device) handle_always(log *zerolog.Logger, pubchan chan Message) (bool, error) { // return true if change made if d.State.Setpoint != d.CurrentSetpoint { if err:= d.SetSetpoint(d.State.Setpoint, pubchan); err != nil { return false, err } return true, nil } log.Info().Msg("no setpoint update") return false, nil } func (d *Device) handle_until_time(log *zerolog.Logger, pubchan chan Message) (bool, error) { *log = log.With().Time("until_time", d.State.Until_time).Logger() if d.State.Until_time.Before(timeNow()) { log.Info().Msg("until_time passed, reset") return d.handle_reset_state(log, pubchan) } if d.State.Setpoint != d.CurrentSetpoint { log.Info().Msg("need setpoint update") if err := d.SetSetpoint(d.State.Setpoint, pubchan); err != nil { return false, err } return true, nil } log.Info().Msg("no setpoint update") return false, nil } func (d *Device) handle_until_next(log *zerolog.Logger, pubchan chan Message) (bool, error) { *log = log.With().Time("until_next", d.State.Time).Logger() program, err := d.Program() if err != nil { return false, err } next, err := program.NextTime(d.State.Time) if err!= nil { return false, err } if timeNow().After(next) { // force current program // reset state log.Info().Time("now", timeNow()).Time("next", next).Msg("until_next expired") return d.handle_reset_state(log, pubchan) } if d.State.Setpoint != d.CurrentSetpoint { log.Info().Msg("need setpoint update") if err := d.SetSetpoint(d.State.Setpoint, pubchan); err != nil { return false, err } return true, nil } log.Info().Msg("no setpoint update") return false, nil } func (d *Device) handle_program(log *zerolog.Logger, pubchan chan Message) (bool, error) { *log = log.With().Str("program", d.State.Program_name).Logger() program, err := d.Program() if err != nil { return false, err } current_setpoint := program.Current() value, err := current_setpoint.Value(d.Settings.Presets) if err != nil { return false, err } if d.CurrentSetpoint != value { log.Info().Msg("publish setpoint update") if err := d.SetSetpoint(value, pubchan); err != nil { return false, err } d.State.Setpoint = value return true, nil } log.Info().Msg("no setpoint update") return false, nil }