Split files and add senarios file loading

This commit is contained in:
Nicolas Duhamel 2021-01-05 17:53:36 +01:00
parent 79d9fa8a3e
commit c444cb6c9b
5 changed files with 160 additions and 149 deletions

4
__init__.py Normal file
View File

@ -0,0 +1,4 @@
from .error import *
from .senario import Senario
from .app import Yamaha

View File

@ -1,158 +1,13 @@
import logging
import time
import json
from pathlib import Path
from typing import Optional
import typer
import requests
from pyamaha import Device, System, Zone
from citadel.mqtt import Client
class Error(Exception):
"""Base class for exceptions in this module."""
pass
class ApiError(Error):
def __init__(self, request_url, response_code):
self.request = request_url
self.response_code = response_code
class ConnectionTimeout(Error):
pass
class Senario:
activate = []
deactivate = []
@classmethod
def name(cls):
if getattr(cls, 'NAME', None):
return cls.NAME
else:
return cls.__name__.lower()
class TV(Senario):
activate = [
Zone.set_power('main', 'on'),
System.set_speaker_pattern(1),
Zone.set_input('main', 'av1', ''),
Zone.set_dialogue_level('main', 3)
]
deactivate = [
Zone.set_dialogue_level('main', 0)
]
class Spotify(Senario):
activate = [
Zone.set_power('main', 'on'),
System.set_speaker_pattern(2),
Zone.set_input('main', 'spotify', ''),
System.set_partymode(True)
]
deactivate = [
System.set_partymode(False),
System.set_speaker_pattern(1)
]
class Yamaha:
def __init__(self, ip: str, mqtt_client: Client, logger: logging.Logger=None, timeout=None):
self.timeout = timeout if timeout else 10
self._device = Device(ip, self.timeout)
self._client = mqtt_client
self.logger = logger
if not logger:
self.logger = logging.getLogger('yamaha')
self._cur_senario = None
self._client.subscribe_callback('yamaha/cmnd/senario', self.handle_senario)
self._client.subscribe_callback('yamaha/cmnd/volume', self.handle_volume)
self._client.subscribe_callback('yamaha/cmnd/volume/params', self.handle_volume_params)
self._client.subscribe_callback('yamaha/cmnd/power', self.handle_power)
@property
def available_senarios(self):
return [TV, Spotify]
def senario_activate(self, senario: Senario):
for action in senario.activate:
self.request(action)
def senario_deactivate(self, senario: Senario):
for action in senario.deactivate:
self.request(action)
def request(self, request):
try:
r = self._device.request(request)
except requests.exceptions.ReadTimeout:
raise ConnectionTimeout()
r_json = r.json()
if r_json['response_code'] != 0:
raise ApiResponse(r.url, r_json['response_code'])
return r_json
def search_senario(self, name: str):
for s in self.available_senarios:
if s.name() == name.lower():
return s
else:
return None
def handle_senario(self, client, userdata, msg):
senario_name =msg.payload.decode().lower()
#2/ check if exists
if (matched_senario := self.search_senario(senario_name)):
# if not the current switch
if matched_senario != self._cur_senario:
if self._cur_senario != None:
self.senario_deactivate(self._cur_senario)
self._cur_senario = matched_senario
self.senario_activate(matched_senario)
# else return current name or ''
else:
senario_name = self._cur_senario.name() if self._cur_senario else ''
self._client.publish('yamaha/stat/senario', senario_name)
def handle_volume_params(self, client, userdata, msg):
r = self.request(Zone().get_status('main'))
msg = {
'maximum': r['max_volume']
}
self._client.publish('yamaha/stat/volume/params', json.dumps(msg))
def handle_volume(self, client, userdata, msg):
param =msg.payload.decode().lower()
if param == 'up':
self.request(Zone().set_volume('main', 'up', ''))
elif param == 'down':
self.request(Zone().set_volume('main', 'down', ''))
elif param.isdigit():
self.request(Zone().set_volume('main', param, ''))
r = self.request(Zone().get_status('main'))
self._client.publish('yamaha/stat/volume', r['volume'])
def handle_power(self, client, userdata, msg):
param =msg.payload.decode().lower()
if param == 'on':
self.request(Zone().set_power('main', 'on'))
elif param == 'off':
self.request(Zone().set_power('main', 'standby'))
r = self.request(Zone().get_status('main'))
self._client.publish('yamaha/stat/power', r['power'])
from .app import Yamaha
from .senario import load_from_file, SENARIOS
def main(\
mqtt_user: str = typer.Option(... , envvar="YAMAHA_MQTT_USER"),\
@ -160,8 +15,13 @@ def main(\
mqtt_host: str = typer.Option(... , envvar="YAMAHA_MQTT_HOST"),\
mqtt_port: int = typer.Option(... , envvar="YAMAHA_MQTT_PORT"),\
yamaha_host: str = typer.Option(... , envvar="YAMAHA_HOST"),\
senarios_file: Optional[Path] = typer.Option(None, exists=True,
file_okay=True, dir_okay=False, readable=True, resolve_path=True)\
):
if senarios_file:
load_from_file(senarios_file)
client = Client()
client.setup(mqtt_host, mqtt_port, mqtt_user, mqtt_pwd)
yamaha = Yamaha(yamaha_host, client)
@ -192,3 +52,4 @@ if __name__ == "__main__":
typer.run(main)

105
app.py Normal file
View File

@ -0,0 +1,105 @@
import logging
import json
import requests
from pyamaha import Device, System, Zone
from citadel.mqtt import Client
from . import *
from .senario import SENARIOS
class Yamaha:
def __init__(self, ip: str, mqtt_client: Client, logger: logging.Logger=None, timeout=None):
self.timeout = timeout if timeout else 10
self._device = Device(ip, self.timeout)
self._client = mqtt_client
self.logger = logger
if not logger:
self.logger = logging.getLogger('yamaha')
self._cur_senario = None
self._client.subscribe_callback('yamaha/cmnd/senario', self.handle_senario)
self._client.subscribe_callback('yamaha/cmnd/volume', self.handle_volume)
self._client.subscribe_callback('yamaha/cmnd/volume/params', self.handle_volume_params)
self._client.subscribe_callback('yamaha/cmnd/power', self.handle_power)
@property
def available_senarios(self):
return SENARIOS
def senario_activate(self, senario: Senario):
for action in senario.activate:
self.request(action)
def senario_deactivate(self, senario: Senario):
for action in senario.deactivate:
self.request(action)
def request(self, request):
try:
r = self._device.request(request)
except requests.exceptions.ReadTimeout:
raise ConnectionTimeoutError()
r_json = r.json()
if r_json['response_code'] != 0:
raise ApiResponse(r.url, r_json['response_code'])
return r_json
def search_senario(self, name: str):
for s in self.available_senarios:
if s.name() == name.lower():
return s
else:
return None
def handle_senario(self, client, userdata, msg):
senario_name =msg.payload.decode().lower()
#2/ check if exists
if (matched_senario := self.search_senario(senario_name)):
# if not the current switch
if matched_senario != self._cur_senario:
if self._cur_senario != None:
self.senario_deactivate(self._cur_senario)
self._cur_senario = matched_senario
self.senario_activate(matched_senario)
# else return current name or ''
else:
senario_name = self._cur_senario.name() if self._cur_senario else ''
self._client.publish('yamaha/stat/senario', senario_name)
def handle_volume_params(self, client, userdata, msg):
r = self.request(Zone().get_status('main'))
msg = {
'maximum': r['max_volume']
}
self._client.publish('yamaha/stat/volume/params', json.dumps(msg))
def handle_volume(self, client, userdata, msg):
param =msg.payload.decode().lower()
if param == 'up':
self.request(Zone().set_volume('main', 'up', ''))
elif param == 'down':
self.request(Zone().set_volume('main', 'down', ''))
elif param.isdigit():
self.request(Zone().set_volume('main', param, ''))
r = self.request(Zone().get_status('main'))
self._client.publish('yamaha/stat/volume', r['volume'])
def handle_power(self, client, userdata, msg):
param =msg.payload.decode().lower()
if param == 'on':
self.request(Zone().set_power('main', 'on'))
elif param == 'off':
self.request(Zone().set_power('main', 'standby'))
r = self.request(Zone().get_status('main'))
self._client.publish('yamaha/stat/power', r['power'])

12
error.py Normal file
View File

@ -0,0 +1,12 @@
class Error(Exception):
"""Base class for exceptions in this module."""
pass
class ApiError(Error):
def __init__(self, request_url, response_code):
self.request = request_url
self.response_code = response_code
class ConnectionTimeoutError(Error):
pass

29
senario.py Normal file
View File

@ -0,0 +1,29 @@
import importlib.util
import sys
import inspect
from pyamaha import Device, System, Zone
SENARIOS = []
class Senario:
activate = []
deactivate = []
@classmethod
def name(cls):
if getattr(cls, 'NAME', None):
return cls.NAME
else:
return cls.__name__.lower()
def load_from_file(file_path: str):
module_name = 'senarios'
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, Senario) and obj != Senario:
SENARIOS.append(obj)