import logging import json import requests from tenacity import retry, stop_after_attempt, wait_fixed from pyamaha import Device, System, Zone from citadel.mqtt import Client from . import * from .senario import SENARIOS class Yamaha: def __init__(self, ip: str, mqtt_client: Client, logger: logging.Logger=None, timeout=None): self.timeout = timeout if timeout else 10 self._device = Device(ip, timeout=self.timeout) self._client = mqtt_client self.logger = logger if not logger: self.logger = logging.getLogger('yamaha') self._cur_senario = None self._client.subscribe_callback('yamaha/cmnd/senario', self.handle_senario) self._client.subscribe_callback('yamaha/cmnd/senario/available', self.handle_senario_available) self._client.subscribe_callback('yamaha/cmnd/volume', self.handle_volume) self._client.subscribe_callback('yamaha/cmnd/power', self.handle_power) @property def available_senarios(self): return SENARIOS def senario_activate(self, senario: Senario): for action in senario.activate: self.request(action) def senario_deactivate(self, senario: Senario): for action in senario.deactivate: self.request(action) @retry(stop=stop_after_attempt(3), wait=wait_fixed(1), reraise=True) def request(self, request): try: r = self._device.request(request) except requests.exceptions.ReadTimeout: raise ConnectionTimeoutError() try: r_json = r.json() except json.decoder.JSONDecodeError: raise ApiInvalidResponseError() if r_json['response_code'] != 0: raise ApiError(r.url, r_json['response_code']) return r_json def search_senario(self, name: str): for s in self.available_senarios: if s.name() == name.lower(): return s else: return None @exception_handler def handle_senario(self, client, userdata, msg): senario_name =msg.payload.decode().lower() #2/ check if exists if (matched_senario := self.search_senario(senario_name)): # if not the current switch if matched_senario != self._cur_senario: if self._cur_senario != None: self.senario_deactivate(self._cur_senario) self._cur_senario = matched_senario self.senario_activate(matched_senario) # else return current name or '' else: senario_name = self._cur_senario.name() if self._cur_senario else '' self._client.publish('yamaha/stat/senario', senario_name) @exception_handler def handle_senario_available(self, client, userdata, msg): self._client.publish('yamaha/stat/senario/available', json.dumps([s.__name__.lower() for s in self.available_senarios])) @exception_handler def handle_volume(self, client, userdata, msg): param =msg.payload.decode().lower() if param == 'up': self.request(Zone().set_volume('main', 'up', '')) elif param == 'down': self.request(Zone().set_volume('main', 'down', '')) elif param.isdigit(): self.request(Zone().set_volume('main', param, '')) r = self.request(Zone().get_status('main')) self._client.publish('yamaha/stat/volume', r['volume']) self._client.publish('yamaha/stat/volume/max', r['max_volume']) @exception_handler def handle_power(self, client, userdata, msg): param =msg.payload.decode().lower() if param == 'on': self.request(Zone().set_power('main', 'on')) elif param == 'off': self.request(Zone().set_power('main', 'standby')) r = self.request(Zone().get_status('main')) state = r['power'] state = 'off' if state == 'standby' else state self._client.publish('yamaha/stat/power', state)