import logging import time import json import typer from pyamaha import Device, System, Zone from citadel.mqtt import Client class Error(Exception): """Base class for exceptions in this module.""" pass class ApiError(Exception): def __init__(self, request_url, response_code): self.request = request_url self.response_code = response_code class Senario: activate = [] deactivate = [] @classmethod def name(cls): if getattr(cls, 'NAME', None): return cls.NAME else: return cls.__name__.lower() class TV(Senario): activate = [ Zone.set_power('main', 'on'), System.set_speaker_pattern(1), Zone.set_input('main', 'av1', ''), Zone.set_dialogue_level('main', 3) ] deactivate = [ Zone.set_dialogue_level('main', 0) ] class Spotify(Senario): activate = [ Zone.set_power('main', 'on'), System.set_speaker_pattern(2), Zone.set_input('main', 'spotify', ''), System.set_partymode(True) ] deactivate = [ System.set_partymode(False), System.set_speaker_pattern(1) ] class Yamaha: def __init__(self, device: Device, mqtt_client: Client, logger: logging.Logger=None): self._device = device self._client = mqtt_client self.logger = logger if not logger: self.logger = logging.getLogger('yamaha') self._cur_senario = None self._client.subscribe_callback('yamaha/cmnd/senario', self.handle_senario) self._client.subscribe_callback('yamaha/cmnd/volume', self.handle_volume) self._client.subscribe_callback('yamaha/cmnd/volume/params', self.handle_volume_params) self._client.subscribe_callback('yamaha/cmnd/power', self.handle_power) @property def available_senarios(self): return [TV, Spotify] def senario_activate(self, senario: Senario): for action in senario.activate: self.request(action) def senario_deactivate(self, senario: Senario): for action in senario.deactivate: self.request(action) def request(self, request): r = self._device.request(request) r_json = r.json() if r_json['response_code'] != 0: raise ApiResponse(r.url, r_json['response_code']) return r_json def search_senario(self, name: str): for s in self.available_senarios: if s.name() == name.lower(): return s else: return None def handle_senario(self, client, userdata, msg): senario_name =msg.payload.decode().lower() #2/ check if exists if (matched_senario := self.search_senario(senario_name)): # if not the current switch if matched_senario != self._cur_senario: if self._cur_senario != None: self.senario_deactivate(self._cur_senario) self._cur_senario = matched_senario self.senario_activate(matched_senario) # else return current name or '' else: senario_name = self._cur_senario.name() if self._cur_senario else '' self._client.publish('yamaha/stat/senario', senario_name) def handle_volume_params(self, client, userdata, msg): r = self.request(Zone().get_status('main')) msg = { 'maximum': r['max_volume'] } self._client.publish('yamaha/stat/volume/params', json.dumps(msg)) def handle_volume(self, client, userdata, msg): param =msg.payload.decode().lower() if param == 'up': self.request(Zone().set_volume('main', 'up', '')) elif param == 'down': self.request(Zone().set_volume('main', 'down', '')) elif param.isdigit(): self.request(Zone().set_volume('main', param, '')) r = self.request(Zone().get_status('main')) self._client.publish('yamaha/stat/volume', r['volume']) def handle_power(self, client, userdata, msg): param =msg.payload.decode().lower() if param == 'on': self.request(Zone().set_power('main', 'on')) elif param == 'off': self.request(Zone().set_power('main', 'standby')) r = self.request(Zone().get_status('main')) self._client.publish('yamaha/stat/power', r['power']) def main(\ mqtt_user: str = typer.Option(... , envvar="YAMAHA_MQTT_USER"),\ mqtt_pwd: str = typer.Option(... , envvar="YAMAHA_MQTT_PWD"),\ mqtt_host: str = typer.Option(... , envvar="YAMAHA_MQTT_HOST"),\ mqtt_port: int = typer.Option(... , envvar="YAMAHA_MQTT_PORT"),\ yamaha_host: str = typer.Option(... , envvar="YAMAHA_HOST"),\ ): client = Client() client.setup(mqtt_host, mqtt_port, mqtt_user, mqtt_pwd) yamaha = Yamaha(Device(yamaha_host), client) client.loop_forever() if __name__ == "__main__": # create logger logger = logging.getLogger('') logger.setLevel(logging.DEBUG) # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # create formatter formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # add formatter to ch ch.setFormatter(formatter) # add ch to logger logger.addHandler(ch) #deactive urllib3 logger too verbose logger = logging.getLogger('urllib3') logger.setLevel(logging.ERROR) typer.run(main)