diff --git a/src/citadel/scene/__init__.py b/src/citadel/scene/__init__.py index 72f8f16..d806c9b 100644 --- a/src/citadel/scene/__init__.py +++ b/src/citadel/scene/__init__.py @@ -1,332 +1,5 @@ -import time -import threading -import queue -import logging -import concurrent.futures -from typing import Union, Callable - -import pebble - -import paho.mqtt.client as mqtt - -from citadel.mqtt import Configuration - -class Scene(threading.Thread, mqtt.Client): - """ A scene run as a thread, has his own mqtt session - - Can be activated by topic: - - cmnd/scene/{mqtt_name}/activate ON|OFF|null - - And return status on topic: - - stat/scene/{mqtt_name}/state ON|OFF - - Special class attributes: - - mqtt_name: define topic name, default class name in lower case - - """ - TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate" - TOPIC_STAT = "stat/scene/{mqtt_name}/activate" - - STATE_DEACTIVATE = 0 - STATE_ACTIVATE = 1 - STATE_ACTIVATING = 2 - - def __init__(self, config: Configuration, logger: logging.Logger=None): - threading.Thread.__init__(self) - mqtt.Client.__init__(self) - - self.configuration = config - - if not hasattr(self, 'mqtt_name'): - self.mqtt_name = self.__class__.__name__.lower() - - if not logger: - logger = logging.getLogger('scene.%s' % self.mqtt_name) - self.logger = logger - - self.__pub_queue = queue.Queue() - self.__state = Scene.STATE_DEACTIVATE - - self.__executor = None - self.__activating_r_futures = [] - - @property - def scene_state(self): - return self.__state - - def run(self): - self.username_pw_set(self.configuration.user, - self.configuration.password) - self.connect(self.configuration.host, - self.configuration.port) - - self.subscribe(self.TOPIC_CMND.format(mqtt_name=self.mqtt_name), 2) - - for watch in self.WATCH: - self.subscribe(watch[0], 1) - - run = True - while run: - #Mqtt client loop - self.loop() - - #Check publish queue - try: - (args, kwargs) = self.__pub_queue.get(block=False) - self.publish(*args, **kwargs) - except queue.Empty: - pass - - # Check for futures if activating - if self.scene_state == Scene.STATE_ACTIVATING: - for (action, futures) in self.__activating_r_futures: - try: - r = futures.result(timeout=0) - if r != Action.SUCCESS: - self.logger.error("Action: '%s' error", action.name) - self.logger.info('Activating error') - self.__executor.join() - self.__executor = None - self.__activating_r_futures = [] - self.__state = Scene.STATE_DEACTIVATE - self.__send_state() - break - except concurrent.futures.TimeoutError: - break - else: - self.logger.info('Activating terminate') - self.__executor.join() - self.__executor = None - self.__activating_r_futures = [] - self.__state = Scene.STATE_ACTIVATE - self.__send_state() - - - def threaded_publish(self, *args, **kwargs): - self.__pub_queue.put((args, kwargs)) - - def on_message(self, mqttc, obj, msg): - payload = msg.payload.decode().lower() - topic = msg.topic - - # on CMND - if topic == self.TOPIC_CMND.format(mqtt_name=self.mqtt_name): - if payload == 'on': - self.logger.info('CMND activate ON') - self.__activate() - elif payload == 'off': - self.logger.info('CMND activate OFF') - self.__deactivate() - elif payload == '': - self.logger.info('CMND activate state') - self.__send_state() - else: - self.logger.warn('CMND activate invalide') - - elif self.scene_state and self.__check_watched(topic, payload): - self.logger.info('Watched topic %s with invalid payload %s deactivating', topic, payload) - self.__deactivate() - - def __check_watched(self, topic, payload): - """ return True if is a watched topic with an invalid payload """ - for watch in self.WATCH: - if topic == watch[0]: - if payload != watch[1].lower(): - return True - return False - return False - - def __activate(self): - if self.scene_state: - self.logger.info('Already activate/activating ignore cmnd') - return - if self.__executor != None: - self.logger.error('Trying to activate but __executor not None') - return - - self.__state = Scene.STATE_ACTIVATING - self.__executor = pebble.ThreadPool() - - for a in self.ON: - self.__activating_r_futures.append( - (a, self.__executor.schedule(a.setup(self).run)) - ) - - self.__executor.close() - self.__send_state() - - def __deactivate(self): - self.__state = Scene.STATE_DEACTIVATE - self.__send_state() - - def __send_state(self): - self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name), - payload='ON' if self.scene_state else 'OFF', - qos=2, - retain=False) - -class WaitForTopic(threading.Thread, mqtt.Client): - - def __init__(self, - config: Configuration, - topic: str, - payload: Union[str, Callable[[str],bool]], - logger: logging.Logger=None): - """ payload: str or function stop if payload is received or function return True, the function must have this signature func(payload: str) -> bool """ - - threading.Thread.__init__(self) - mqtt.Client.__init__(self) - - self.configuration = config - - self.topic = topic - - if callable(payload): - self.check_payload = payload - else: - self.check_payload = lambda x: x == payload - - self.daemon = True - - if not logger: - logger = logging.getLogger('watcher') - self.logger = logger - - self.__run = True - - self.username_pw_set(self.configuration.user, - self.configuration.password) - self.connect(self.configuration.host, - self.configuration.port) - - self.subscribe(self.topic, 1) - - self.start() - - def on_message(self, mqttc, obj, msg): - payload = msg.payload.decode() - topic = msg.topic - self.logger.info('Topic: %s, Payload: %s', topic, payload) - - if topic != self.topic: - self.logger.error('Invaliad topic: %s', topic) - return - - if self.check_payload(payload): - self.__run = False - - def stop(self): - self.__run = False - - def run(self): - while self.__run: - self.loop() - self.disconnect() - -class Action(object): - SUCCESS = 0 - FAILED = 1 - - def setup(self, proxy: Scene): - """ - proxy: a thread safe proxy object with publish, subscribe method - """ - self.proxy = proxy - return self - - def ok(self): - return Action.SUCCESS - - def failed(self): - return Action.FAILED - - @property - def name(self): - return self.__class__.__name__ - -class Publish(Action): - - def __init__(self, topic: str): - Action.__init__(self) - self.topic = topic - self.payload = '' - self.qos = 0 - - self.check_response = None - - def Payload(self, payload): - self.payload = payload - return self - - def Qos(self, qos): - self.qos = qos - return self - - def CheckResponse(self, topic: str, payload: str=None, timeout: int=None): - self.check_response= {'topic': topic, 'payload':payload, 'timeout': timeout} - return self - - def run(self): - checker = None - if self.check_response: - checker = WaitForTopic(self.proxy.configuration, - self.check_response['topic'], - self.check_response['payload']) - - self.proxy.threaded_publish(self.topic, - payload=self.payload, qos=self.qos, retain=False) - - if self.check_response: - checker.join(timeout=self.check_response['timeout']) - if checker.is_alive(): #timeout occur - checker.stop() - return self.failed() - - return self.ok() - -class TasmotaShutter(Action): - - TOPIC_FORMAT = "cmnd/tasmota/shutter/{shutter_name}/{action}" - TOPIC_CHECK_FORMAT = "stat/tasmota/shutter/{shutter_name}/SHUTTER1" - ACTION_OPEN = 'ShutterOpen' - ACTION_CLOSE = 'ShutterClose' - - def __init__(self, shutter_name: str): - Action.__init__(self) - self.shutter_name = shutter_name - self.action = None - - def Close(self): - self.action = self.ACTION_CLOSE - return self - - def Open(self): - self.action = self.ACTION_OPEN - return self - - def run(self): - if self.action is None: - raise RuntimeError('No action specified') - - topic = self.TOPIC_FORMAT.format( - shutter_name = self.shutter_name, - action = self.action - ) - - self.proxy.threaded_publish(topic, qos=1, retain=False) - - checker = WaitForTopic(self.proxy.configuration, - self.TOPIC_CHECK_FORMAT.format(shutter_name = self.shutter_name), - payload= '100' if self.action == self.ACTION_OPEN else '0') - - checker.join(timeout=30) - if checker.is_alive(): - return self.failed() - - return self.ok() +from .action import Action, Publish, TasmotaShutter +from .scene import Scene class Sleep(Scene): @@ -342,8 +15,4 @@ class Sleep(Scene): WATCH = [ ('stat/tasmota/screen/Screen/POWER', 'OFF') - # ('cmnd/tasmota/shutter/Bedroom1x1/ShutterClose', ''), - # ('cmnd/tasmota/shutter/Bedroom1x2/ShutterClose', ''), - # ('cmnd/tasmota/shutter/Bathroom/ShutterClose', ''), - # ('cmnd/tasmota/shutter/Staicase1/ShutterClose', ''), ] diff --git a/src/citadel/scene/__main__.py b/src/citadel/scene/__main__.py index 0295531..fefe078 100644 --- a/src/citadel/scene/__main__.py +++ b/src/citadel/scene/__main__.py @@ -4,6 +4,7 @@ import typer import systemd.journal from citadel.mqtt import Configuration + from . import Sleep def main(\ diff --git a/src/citadel/scene/action.py b/src/citadel/scene/action.py new file mode 100644 index 0000000..95f3d72 --- /dev/null +++ b/src/citadel/scene/action.py @@ -0,0 +1,108 @@ +import time + +from .helper import WaitForTopic + +class Action(object): + SUCCESS = 0 + FAILED = 1 + + def setup(self, proxy): + """ + proxy: a thread safe proxy object with publish, subscribe method + """ + self.proxy = proxy + return self + + def ok(self): + return Action.SUCCESS + + def failed(self): + return Action.FAILED + + @property + def name(self): + return self.__class__.__name__ + + +class Publish(Action): + + def __init__(self, topic: str): + Action.__init__(self) + self.topic = topic + self.payload = '' + self.qos = 0 + + self.check_response = None + + def Payload(self, payload): + self.payload = payload + return self + + def Qos(self, qos): + self.qos = qos + return self + + def CheckResponse(self, topic: str, payload: str=None, timeout: int=None): + self.check_response= {'topic': topic, 'payload':payload, 'timeout': timeout} + return self + + def run(self): + checker = None + if self.check_response: + checker = WaitForTopic(self.proxy.configuration, + self.check_response['topic'], + self.check_response['payload']) + + self.proxy.threaded_publish(self.topic, + payload=self.payload, qos=self.qos, retain=False) + + if self.check_response: + checker.join(timeout=self.check_response['timeout']) + if checker.is_alive(): #timeout occur + checker.stop() + return self.failed() + + return self.ok() + +class TasmotaShutter(Action): + + TOPIC_FORMAT = "cmnd/tasmota/shutter/{shutter_name}/{action}" + TOPIC_CHECK_FORMAT = "stat/tasmota/shutter/{shutter_name}/RESULT" + ACTION_OPEN = 'ShutterOpen' + ACTION_CLOSE = 'ShutterClose' + + def __init__(self, shutter_name: str): + Action.__init__(self) + self.shutter_name = shutter_name + self.action = None + + def Close(self): + self.action = self.ACTION_CLOSE + return self + + def Open(self): + self.action = self.ACTION_OPEN + return self + + def run(self): + if self.action is None: + raise RuntimeError('No action specified') + + topic = self.TOPIC_FORMAT.format( + shutter_name = self.shutter_name, + action = self.action + ) + + checker = WaitForTopic(self.proxy.configuration, + self.TOPIC_CHECK_FORMAT.format(shutter_name = self.shutter_name), + payload= '{"Shutter1":{"Position":100,"Direction":0,"Target":100}}' \ + if self.action == self.ACTION_OPEN \ + else '{"Shutter1":{"Position":0,"Direction":0,"Target":0}}') + time.sleep(1) + self.proxy.threaded_publish(topic, qos=1, retain=False) + + checker.join(timeout=35) + if checker.is_alive(): + return self.failed() + + return self.ok() diff --git a/src/citadel/scene/helper.py b/src/citadel/scene/helper.py new file mode 100644 index 0000000..2d9cca1 --- /dev/null +++ b/src/citadel/scene/helper.py @@ -0,0 +1,65 @@ +import threading +import logging +from typing import Union, Callable + +import paho.mqtt.client as mqtt + +from citadel.mqtt import Configuration + +class WaitForTopic(threading.Thread, mqtt.Client): + + def __init__(self, + config: Configuration, + topic: str, + payload: Union[str, Callable[[str],bool]], + logger: logging.Logger=None): + """ payload: str or function stop if payload is received or function return True, the function must have this signature func(payload: str) -> bool """ + + threading.Thread.__init__(self) + mqtt.Client.__init__(self) + + self.configuration = config + + self.topic = topic + + if callable(payload): + self.check_payload = payload + else: + self.check_payload = lambda x: x == payload + + self.daemon = True + + if not logger: + logger = logging.getLogger('watcher') + self.logger = logger + + self.__run = True + + self.username_pw_set(self.configuration.user, + self.configuration.password) + self.connect(self.configuration.host, + self.configuration.port) + + self.subscribe(self.topic, 1) + + self.start() + + def on_message(self, mqttc, obj, msg): + payload = msg.payload.decode() + topic = msg.topic + self.logger.debug('Topic: %s, Payload: %s', topic, payload) + + if topic != self.topic: + self.logger.error('Invaliad topic: %s', topic) + return + + if self.check_payload(payload): + self.__run = False + + def stop(self): + self.__run = False + + def run(self): + while self.__run: + self.loop() + self.disconnect() diff --git a/src/citadel/scene/scene.py b/src/citadel/scene/scene.py new file mode 100644 index 0000000..8f88506 --- /dev/null +++ b/src/citadel/scene/scene.py @@ -0,0 +1,169 @@ +import threading +import logging +import queue +import concurrent.futures + +import pebble + +import paho.mqtt.client as mqtt + +from citadel.mqtt import Configuration + +from .action import Action + +class Scene(threading.Thread, mqtt.Client): + """ A scene run as a thread, has his own mqtt session + + Can be activated by topic: + + cmnd/scene/{mqtt_name}/activate ON|OFF|null + + And return status on topic: + + stat/scene/{mqtt_name}/state ON|OFF + + Special class attributes: + + mqtt_name: define topic name, default class name in lower case + + """ + TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate" + TOPIC_STAT = "stat/scene/{mqtt_name}/activate" + + STATE_DEACTIVATE = 0 + STATE_ACTIVATE = 1 + STATE_ACTIVATING = 2 + + def __init__(self, config: Configuration, logger: logging.Logger=None): + threading.Thread.__init__(self) + mqtt.Client.__init__(self) + + self.configuration = config + + if not hasattr(self, 'mqtt_name'): + self.mqtt_name = self.__class__.__name__.lower() + + if not logger: + logger = logging.getLogger('scene.%s' % self.mqtt_name) + self.logger = logger + + self.__pub_queue = queue.Queue() + self.__state = Scene.STATE_DEACTIVATE + + self.__executor = None + self.__activating_r_futures = [] + + @property + def scene_state(self): + return self.__state + + def run(self): + self.username_pw_set(self.configuration.user, + self.configuration.password) + self.connect(self.configuration.host, + self.configuration.port) + + self.subscribe(self.TOPIC_CMND.format(mqtt_name=self.mqtt_name), 2) + + for watch in self.WATCH: + self.subscribe(watch[0], 1) + + run = True + while run: + #Mqtt client loop + self.loop() + + #Check publish queue + try: + (args, kwargs) = self.__pub_queue.get(block=False) + self.publish(*args, **kwargs) + except queue.Empty: + pass + + # Check for futures if activating + if self.scene_state == Scene.STATE_ACTIVATING: + for (action, futures) in self.__activating_r_futures: + try: + r = futures.result(timeout=0) + if r != Action.SUCCESS: + self.logger.error("Action: '%s' error", action.name) + self.logger.info('Activating error') + self.__executor.join() + self.__executor = None + self.__activating_r_futures = [] + self.__state = Scene.STATE_DEACTIVATE + self.__send_state() + break + except concurrent.futures.TimeoutError: + break + else: + self.logger.info('Activating terminate') + self.__executor.join() + self.__executor = None + self.__activating_r_futures = [] + self.__state = Scene.STATE_ACTIVATE + self.__send_state() + + + def threaded_publish(self, *args, **kwargs): + self.__pub_queue.put((args, kwargs)) + + def on_message(self, mqttc, obj, msg): + payload = msg.payload.decode().lower() + topic = msg.topic + + # on CMND + if topic == self.TOPIC_CMND.format(mqtt_name=self.mqtt_name): + if payload == 'on': + self.logger.info('CMND activate ON') + self.__activate() + elif payload == 'off': + self.logger.info('CMND activate OFF') + self.__deactivate() + elif payload == '': + self.logger.info('CMND activate state') + self.__send_state() + else: + self.logger.warn('CMND activate invalide') + + elif self.scene_state and self.__check_watched(topic, payload): + self.logger.info('Watched topic %s with invalid payload %s deactivating', topic, payload) + self.__deactivate() + + def __check_watched(self, topic, payload): + """ return True if is a watched topic with an invalid payload """ + for watch in self.WATCH: + if topic == watch[0]: + if payload != watch[1].lower(): + return True + return False + return False + + def __activate(self): + if self.scene_state: + self.logger.info('Already activate/activating ignore cmnd') + return + if self.__executor != None: + self.logger.error('Trying to activate but __executor not None') + return + + self.__state = Scene.STATE_ACTIVATING + self.__executor = pebble.ThreadPool() + + for a in self.ON: + self.__activating_r_futures.append( + (a, self.__executor.schedule(a.setup(self).run)) + ) + + self.__executor.close() + self.__send_state() + + def __deactivate(self): + self.__state = Scene.STATE_DEACTIVATE + self.__send_state() + + def __send_state(self): + self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name), + payload='ON' if self.scene_state else 'OFF', + qos=2, + retain=False)