diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..231ddba --- /dev/null +++ b/__init__.py @@ -0,0 +1,4 @@ +from .error import * +from .senario import Senario +from .app import Yamaha + diff --git a/__main__.py b/__main__.py index 6a4d00f..88e1bcb 100644 --- a/__main__.py +++ b/__main__.py @@ -1,158 +1,13 @@ import logging import time -import json +from pathlib import Path +from typing import Optional import typer -import requests - -from pyamaha import Device, System, Zone - from citadel.mqtt import Client -class Error(Exception): - """Base class for exceptions in this module.""" - pass - -class ApiError(Error): - def __init__(self, request_url, response_code): - self.request = request_url - self.response_code = response_code - -class ConnectionTimeout(Error): - pass - -class Senario: - activate = [] - deactivate = [] - - @classmethod - def name(cls): - if getattr(cls, 'NAME', None): - return cls.NAME - else: - return cls.__name__.lower() - -class TV(Senario): - - activate = [ - Zone.set_power('main', 'on'), - System.set_speaker_pattern(1), - Zone.set_input('main', 'av1', ''), - Zone.set_dialogue_level('main', 3) - ] - - deactivate = [ - Zone.set_dialogue_level('main', 0) - ] - -class Spotify(Senario): - - activate = [ - Zone.set_power('main', 'on'), - System.set_speaker_pattern(2), - Zone.set_input('main', 'spotify', ''), - System.set_partymode(True) - ] - - deactivate = [ - System.set_partymode(False), - System.set_speaker_pattern(1) - ] - -class Yamaha: - - def __init__(self, ip: str, mqtt_client: Client, logger: logging.Logger=None, timeout=None): - self.timeout = timeout if timeout else 10 - self._device = Device(ip, self.timeout) - self._client = mqtt_client - - self.logger = logger - if not logger: - self.logger = logging.getLogger('yamaha') - - self._cur_senario = None - - self._client.subscribe_callback('yamaha/cmnd/senario', self.handle_senario) - self._client.subscribe_callback('yamaha/cmnd/volume', self.handle_volume) - self._client.subscribe_callback('yamaha/cmnd/volume/params', self.handle_volume_params) - self._client.subscribe_callback('yamaha/cmnd/power', self.handle_power) - - @property - def available_senarios(self): - return [TV, Spotify] - - def senario_activate(self, senario: Senario): - for action in senario.activate: - self.request(action) - - def senario_deactivate(self, senario: Senario): - for action in senario.deactivate: - self.request(action) - - def request(self, request): - try: - r = self._device.request(request) - except requests.exceptions.ReadTimeout: - raise ConnectionTimeout() - r_json = r.json() - if r_json['response_code'] != 0: - raise ApiResponse(r.url, r_json['response_code']) - return r_json - - def search_senario(self, name: str): - for s in self.available_senarios: - if s.name() == name.lower(): - return s - else: - return None - - def handle_senario(self, client, userdata, msg): - senario_name =msg.payload.decode().lower() - - #2/ check if exists - if (matched_senario := self.search_senario(senario_name)): - # if not the current switch - if matched_senario != self._cur_senario: - if self._cur_senario != None: - self.senario_deactivate(self._cur_senario) - self._cur_senario = matched_senario - self.senario_activate(matched_senario) - # else return current name or '' - else: - senario_name = self._cur_senario.name() if self._cur_senario else '' - - self._client.publish('yamaha/stat/senario', senario_name) - - def handle_volume_params(self, client, userdata, msg): - r = self.request(Zone().get_status('main')) - msg = { - 'maximum': r['max_volume'] - } - self._client.publish('yamaha/stat/volume/params', json.dumps(msg)) - - def handle_volume(self, client, userdata, msg): - param =msg.payload.decode().lower() - - if param == 'up': - self.request(Zone().set_volume('main', 'up', '')) - elif param == 'down': - self.request(Zone().set_volume('main', 'down', '')) - elif param.isdigit(): - self.request(Zone().set_volume('main', param, '')) - - r = self.request(Zone().get_status('main')) - self._client.publish('yamaha/stat/volume', r['volume']) - - def handle_power(self, client, userdata, msg): - param =msg.payload.decode().lower() - - if param == 'on': - self.request(Zone().set_power('main', 'on')) - elif param == 'off': - self.request(Zone().set_power('main', 'standby')) - - r = self.request(Zone().get_status('main')) - self._client.publish('yamaha/stat/power', r['power']) +from .app import Yamaha +from .senario import load_from_file, SENARIOS def main(\ mqtt_user: str = typer.Option(... , envvar="YAMAHA_MQTT_USER"),\ @@ -160,8 +15,13 @@ def main(\ mqtt_host: str = typer.Option(... , envvar="YAMAHA_MQTT_HOST"),\ mqtt_port: int = typer.Option(... , envvar="YAMAHA_MQTT_PORT"),\ yamaha_host: str = typer.Option(... , envvar="YAMAHA_HOST"),\ + senarios_file: Optional[Path] = typer.Option(None, exists=True, + file_okay=True, dir_okay=False, readable=True, resolve_path=True)\ ): + if senarios_file: + load_from_file(senarios_file) + client = Client() client.setup(mqtt_host, mqtt_port, mqtt_user, mqtt_pwd) yamaha = Yamaha(yamaha_host, client) @@ -192,3 +52,4 @@ if __name__ == "__main__": typer.run(main) + diff --git a/app.py b/app.py new file mode 100644 index 0000000..1abc6b2 --- /dev/null +++ b/app.py @@ -0,0 +1,105 @@ +import logging +import json + +import requests +from pyamaha import Device, System, Zone + +from citadel.mqtt import Client + +from . import * +from .senario import SENARIOS + +class Yamaha: + + def __init__(self, ip: str, mqtt_client: Client, logger: logging.Logger=None, timeout=None): + self.timeout = timeout if timeout else 10 + self._device = Device(ip, self.timeout) + self._client = mqtt_client + + self.logger = logger + if not logger: + self.logger = logging.getLogger('yamaha') + + self._cur_senario = None + + self._client.subscribe_callback('yamaha/cmnd/senario', self.handle_senario) + self._client.subscribe_callback('yamaha/cmnd/volume', self.handle_volume) + self._client.subscribe_callback('yamaha/cmnd/volume/params', self.handle_volume_params) + self._client.subscribe_callback('yamaha/cmnd/power', self.handle_power) + + @property + def available_senarios(self): + return SENARIOS + + def senario_activate(self, senario: Senario): + for action in senario.activate: + self.request(action) + + def senario_deactivate(self, senario: Senario): + for action in senario.deactivate: + self.request(action) + + def request(self, request): + try: + r = self._device.request(request) + except requests.exceptions.ReadTimeout: + raise ConnectionTimeoutError() + r_json = r.json() + if r_json['response_code'] != 0: + raise ApiResponse(r.url, r_json['response_code']) + return r_json + + def search_senario(self, name: str): + for s in self.available_senarios: + if s.name() == name.lower(): + return s + else: + return None + + def handle_senario(self, client, userdata, msg): + senario_name =msg.payload.decode().lower() + + #2/ check if exists + if (matched_senario := self.search_senario(senario_name)): + # if not the current switch + if matched_senario != self._cur_senario: + if self._cur_senario != None: + self.senario_deactivate(self._cur_senario) + self._cur_senario = matched_senario + self.senario_activate(matched_senario) + # else return current name or '' + else: + senario_name = self._cur_senario.name() if self._cur_senario else '' + + self._client.publish('yamaha/stat/senario', senario_name) + + def handle_volume_params(self, client, userdata, msg): + r = self.request(Zone().get_status('main')) + msg = { + 'maximum': r['max_volume'] + } + self._client.publish('yamaha/stat/volume/params', json.dumps(msg)) + + def handle_volume(self, client, userdata, msg): + param =msg.payload.decode().lower() + + if param == 'up': + self.request(Zone().set_volume('main', 'up', '')) + elif param == 'down': + self.request(Zone().set_volume('main', 'down', '')) + elif param.isdigit(): + self.request(Zone().set_volume('main', param, '')) + + r = self.request(Zone().get_status('main')) + self._client.publish('yamaha/stat/volume', r['volume']) + + def handle_power(self, client, userdata, msg): + param =msg.payload.decode().lower() + + if param == 'on': + self.request(Zone().set_power('main', 'on')) + elif param == 'off': + self.request(Zone().set_power('main', 'standby')) + + r = self.request(Zone().get_status('main')) + self._client.publish('yamaha/stat/power', r['power']) diff --git a/error.py b/error.py new file mode 100644 index 0000000..bd0e6d1 --- /dev/null +++ b/error.py @@ -0,0 +1,12 @@ + +class Error(Exception): + """Base class for exceptions in this module.""" + pass + +class ApiError(Error): + def __init__(self, request_url, response_code): + self.request = request_url + self.response_code = response_code + +class ConnectionTimeoutError(Error): + pass diff --git a/senario.py b/senario.py new file mode 100644 index 0000000..f50550f --- /dev/null +++ b/senario.py @@ -0,0 +1,29 @@ +import importlib.util +import sys +import inspect + +from pyamaha import Device, System, Zone + +SENARIOS = [] + +class Senario: + activate = [] + deactivate = [] + + @classmethod + def name(cls): + if getattr(cls, 'NAME', None): + return cls.NAME + else: + return cls.__name__.lower() + +def load_from_file(file_path: str): + module_name = 'senarios' + spec = importlib.util.spec_from_file_location(module_name, file_path) + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + + for name, obj in inspect.getmembers(module): + if inspect.isclass(obj) and issubclass(obj, Senario) and obj != Senario: + SENARIOS.append(obj)