From c0446c8c7f16733795ccc11827a81f224a9a4283 Mon Sep 17 00:00:00 2001 From: Nicolas Duhamel Date: Tue, 23 Mar 2021 13:04:03 +0100 Subject: [PATCH] WIP todo: timeout --- src/citadel/scene/__init__.py | 87 +++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/src/citadel/scene/__init__.py b/src/citadel/scene/__init__.py index 8c1c8f1..1dd872e 100644 --- a/src/citadel/scene/__init__.py +++ b/src/citadel/scene/__init__.py @@ -2,6 +2,10 @@ import time import threading import queue import logging +from concurrent.futures import wait, ALL_COMPLETED + +import pebble + import paho.mqtt.client as mqtt @@ -26,12 +30,15 @@ class Scene(threading.Thread, mqtt.Client): TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate" TOPIC_STAT = "stat/scene/{mqtt_name}/activate" + STATE_DEACTIVATE = 0 + STATE_ACTIVATE = 1 + STATE_ACTIVATING = 2 + def __init__(self, config: Configuration, logger: logging.Logger=None): threading.Thread.__init__(self) mqtt.Client.__init__(self) self.configuration = config - self._scene_state = False if not hasattr(self, 'mqtt_name'): self.mqtt_name = self.__class__.__name__.lower() @@ -40,11 +47,15 @@ class Scene(threading.Thread, mqtt.Client): logger = logging.getLogger('scene.%s' % self.mqtt_name) self.logger = logger - self._pub_queue = queue.Queue() + self.__pub_queue = queue.Queue() + self.__state = Scene.STATE_DEACTIVATE + + self.__executor = None + self.__activating_r_futures = [] @property def scene_state(self): - return self._scene_state + return self.__state def run(self): self.username_pw_set(self.configuration.user, @@ -59,15 +70,27 @@ class Scene(threading.Thread, mqtt.Client): run = True while run: + #Mqtt client loop self.loop() + + #Check publish queue try: - (args, kwargs) = self._pub_queue.get(block=False) + (args, kwargs) = self.__pub_queue.get(block=False) self.publish(*args, **kwargs) except queue.Empty: - continue + pass + + # Check for futures if activating + if self.scene_state == Scene.STATE_ACTIVATING: + if self.__activating_r_futures: + r = wait(self.__activating_r_futures, timeout=1, return_when=ALL_COMPLETED) + self.__activating_r_futures = [] + self.__state = Scene.STATE_ACTIVATE + for r in r.done: + print(r.result()) def threaded_publish(self, *args, **kwargs): - self._pub_queue.put((args, kwargs)) + self.__pub_queue.put((args, kwargs)) def on_message(self, mqttc, obj, msg): payload = msg.payload.decode().lower() @@ -101,59 +124,45 @@ class Scene(threading.Thread, mqtt.Client): return False def __activate(self): - #activate scene - # lock for activate again or deactivate - # start all activate action concurently - # start result catcher thread - # result catcher set active if ok else log error + if self.scene_state: + self.logger.info('Already activate/activating ignore cmnd') + return - results = queue.Queue() + with pebble.ThreadPool() as executor: + for a in self.ON: + self.__activating_r_futures.append( + executor.schedule(a.setup(self).run)) - for a in self.ON: - a.setup(self, results).start() - - self._scene_state = True - self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name), - payload='ON', - qos=2, - retain=False) + self.__state = Scene.STATE_ACTIVATING + self.__send_state() def __deactivate(self): - #deactivate scene - self._scene_state = False - self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name), - payload='OFF', - qos=2, - retain=False) + #TODO cancel activating... + self.__state = Scene.STATE_DEACTIVATE + self.__send_state() def __send_state(self): - print(self._scene_state) self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name), payload='ON' if self.scene_state else 'OFF', qos=2, retain=False) -class Action(threading.Thread): +class Action(object): OK = 0 FAILED = 1 - def __init__(self): - threading.Thread.__init__(self) - - def setup(self, proxy: Scene, result: queue.Queue): + def setup(self, proxy: Scene): """ proxy: a thread safe proxy object with publish, subscribe method - result: a queue for put result """ - self.scene = proxy - self.result = result + self.proxy = proxy return self def ok(self): - self.result.put(Action.OK) + return Action.OK def failed(self): - self.result.put(Action.FAILED) + return Action.FAILED class Publish(Action): @@ -172,9 +181,9 @@ class Publish(Action): return self def run(self): - self.scene.threaded_publish(self.topic, + self.proxy.threaded_publish(self.topic, payload=self.payload, qos=self.qos, retain=False) - self.ok() + return self.ok() class Sleep(Scene):