package device import ( "github.com/eclipse/paho.mqtt.golang" "github.com/ohler55/ojg/jp" "github.com/ohler55/ojg/oj" "fmt" "time" "encoding/json" "context" "github.com/rs/zerolog" // "reflect" ) var timeNow = func() time.Time { return time.Now() } type Error string func (e Error) Error() string { return string(e) } type DeviceState struct { Mode string `json:"mode"` Setpoint int `json:"setpoint"` Time time.Time `json:"time"` Program_name string `json:"program_name"` Until_time time.Time `json:"until_time"` } // Internal state type Device struct { Name string Settings DeviceSettings CurrentSetpoint int State DeviceState } func (d *Device) SameState(state DeviceState) bool { if state.Mode != d.State.Mode { return false } if state.Setpoint != d.State.Setpoint { return false } if state.Program_name != d.State.Program_name { return false } if !state.Time.Equal(d.State.Time) { return false } if !state.Until_time.Equal(d.State.Until_time) { return false } return true } func (d *Device) StateTopic() string { return fmt.Sprintf("heater/%s/state", d.Name) } func (d Device) ListenTopic() (string, error) { return d.Settings.TVR.FormatTopicState(d.Name) } func (d *Device) Program() (WeekProgram, error) { // return current device program if specified or default one prog_name := "default" if d.State.Program_name != "" { prog_name = d.State.Program_name } program, ok := d.Settings.Programs[prog_name] if !ok { return WeekProgram{}, Error(fmt.Sprintf("device %s don't have %s program", d.Name, prog_name)) } return program, nil } func (d *Device) ProgramName() (string) { prog_name := "default" if d.State.Program_name != "" { prog_name = d.State.Program_name } return prog_name } func (d *Device) SetState(state DeviceState, pubchan chan Message) error { payload, err := json.Marshal(state) if err != nil { return err } pubchan <- Message { Topic: d.StateTopic(), Payload: payload, Retain: true, } d.State = state return nil } func (d *Device) SetSetpoint(value int, pubchan chan Message) error { topic, err := d.Settings.TVR.FormatTopic(d.Name) if err != nil { return err } payload, err := d.Settings.TVR.FormatPayload(value) if err != nil { return err } pubchan <- Message{ Topic: topic, Payload: []byte(payload), Retain: false, } return nil } func (d *Device) CheckSetpoint(logger *zerolog.Logger, pubchan chan Message) error { // Handle all the update setpoint logic // push in pubChan a Message if setpoint need to be update log := logger.With(). Str("device", d.Name). Int("current_setpoint", d.CurrentSetpoint). Str("State.Mode", d.State.Mode). Int("State.Setpoint", d.State.Setpoint). Str("State.Program_name", d.State.Program_name). Logger() log.Info().Msg("Check if setpoint need an update") switch d.State.Mode { case "always": return d.handle_always(&log, pubchan) case "until_time": return d.handle_until_time(&log, pubchan) case "until_next": return d.handle_until_next(&log, pubchan) case "program": return d.handle_program(&log, pubchan) default: log.Info().Msg("Use default mode") return d.setProgAndReset(&log, pubchan) } } func (d *Device) setProgAndReset(log *zerolog.Logger, pubchan chan Message) error { program, err := d.Program() if err != nil { return err } current_setpoint := program.Current() value, err := current_setpoint.Value(d.Settings.Presets) if err != nil { return err } if d.CurrentSetpoint != value { log.Info().Msg("publish setpoint update") err = d.SetSetpoint(value, pubchan) if err != nil { return err } } else { log.Info().Msg("no setpoint update") } state := DeviceState{ Setpoint: value, Mode: "program", Program_name: d.ProgramName(), Time: timeNow(), } return d.SetState(state, pubchan) } func (d *Device) onMessage(ctx context.Context, msg mqtt.Message) { log := zerolog.Ctx(ctx).With(). Str("action", "device state receive"). Str("device", d.Name). Logger() log.Debug(). Str("topic", msg.Topic()). Str("payload", string(msg.Payload())). Msg("Message get") obj, err := oj.ParseString(string(msg.Payload())) if err != nil { log.Error().Err(err).Msg("during payload parse") return } x, err := jp.ParseString(d.Settings.TVR.Setpoint_state_jp) if err != nil { log.Error().Err(err).Msg("while parsing payload") return } r := x.First(obj) if v, ok := r.(int64); ok{ d.CurrentSetpoint = int(v) } else { log.Error().Err(err).Interface("parsing payload", r).Msg("while parsing payload") } } func (d *Device) handle_always(log *zerolog.Logger, pubchan chan Message) error { if d.State.Setpoint != d.CurrentSetpoint { return d.SetSetpoint(d.State.Setpoint, pubchan) } return nil } func (d *Device) handle_until_time(log *zerolog.Logger, pubchan chan Message) error { *log = log.With().Time("until_time", d.State.Until_time).Logger() if d.State.Until_time.Before(timeNow()) { log.Info().Msg("until_time passed, reset") return d.setProgAndReset(log, pubchan) } if d.State.Setpoint != d.CurrentSetpoint { log.Info().Msg("need setpoint update") return d.SetSetpoint(d.State.Setpoint, pubchan) } return nil } func (d *Device) handle_until_next(log *zerolog.Logger, pubchan chan Message) error { *log = log.With().Time("until_next", d.State.Time).Logger() program, err := d.Program() if err != nil { return err } next, err := program.NextTime(d.State.Time) if err!= nil { return err } if timeNow().After(next) { // force current program // reset state log.Info().Time("now", timeNow()).Time("next", next).Msg("until_next expired") return d.setProgAndReset(log, pubchan) } if d.State.Setpoint != d.CurrentSetpoint { log.Info().Msg("need setpoint update") return d.SetSetpoint(d.State.Setpoint, pubchan) } return nil } func (d *Device) handle_program(log *zerolog.Logger, pubchan chan Message) error { *log = log.With().Str("program", d.State.Program_name).Logger() program, err := d.Program() if err != nil { return err } current_setpoint := program.Current() value, err := current_setpoint.Value(d.Settings.Presets) if err != nil { return err } if d.CurrentSetpoint != value { log.Info().Msg("publish setpoint update") err = d.SetSetpoint(value, pubchan) if err != nil { return err } } else { log.Info().Msg("no setpoint update") } return nil }