diff --git a/src/citadel/scene/__init__.py b/src/citadel/scene/__init__.py index 1dd872e..8135717 100644 --- a/src/citadel/scene/__init__.py +++ b/src/citadel/scene/__init__.py @@ -2,11 +2,10 @@ import time import threading import queue import logging -from concurrent.futures import wait, ALL_COMPLETED +import concurrent.futures import pebble - import paho.mqtt.client as mqtt from citadel.mqtt import Configuration @@ -82,12 +81,23 @@ class Scene(threading.Thread, mqtt.Client): # Check for futures if activating if self.scene_state == Scene.STATE_ACTIVATING: - if self.__activating_r_futures: - r = wait(self.__activating_r_futures, timeout=1, return_when=ALL_COMPLETED) - self.__activating_r_futures = [] - self.__state = Scene.STATE_ACTIVATE - for r in r.done: - print(r.result()) + for (action, futures) in self.__activating_r_futures: + try: + r = futures.result(timeout=0) + if r != Action.SUCCESS: + self.logger.error("Action error") + # empty the futures list to jump on else statement + self.__activating_r_futures = [] + break + except concurrent.futures.TimeoutError: + break + else: + self.logger.info('Activating terminate') + self.__executor.join() + self.__executor = None + self.__state = Scene.STATE_ACTIVATE + self.__send_state() + def threaded_publish(self, *args, **kwargs): self.__pub_queue.put((args, kwargs)) @@ -127,17 +137,22 @@ class Scene(threading.Thread, mqtt.Client): if self.scene_state: self.logger.info('Already activate/activating ignore cmnd') return - - with pebble.ThreadPool() as executor: - for a in self.ON: - self.__activating_r_futures.append( - executor.schedule(a.setup(self).run)) + if self.__executor != None: + self.logger.error('Trying to activate but __executor not None') + return self.__state = Scene.STATE_ACTIVATING + self.__executor = pebble.ThreadPool() + + for a in self.ON: + self.__activating_r_futures.append( + (a, self.__executor.schedule(a.setup(self).run)) + ) + + self.__executor.close() self.__send_state() def __deactivate(self): - #TODO cancel activating... self.__state = Scene.STATE_DEACTIVATE self.__send_state() @@ -148,7 +163,7 @@ class Scene(threading.Thread, mqtt.Client): retain=False) class Action(object): - OK = 0 + SUCCESS = 0 FAILED = 1 def setup(self, proxy: Scene): @@ -159,7 +174,7 @@ class Action(object): return self def ok(self): - return Action.OK + return Action.SUCCESS def failed(self): return Action.FAILED