Use threadpool for activating actions of scene

This commit is contained in:
Nicolas Duhamel 2021-03-23 20:03:40 +01:00
parent c0446c8c7f
commit 2d2de33304

View File

@ -2,11 +2,10 @@ import time
import threading import threading
import queue import queue
import logging import logging
from concurrent.futures import wait, ALL_COMPLETED import concurrent.futures
import pebble import pebble
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from citadel.mqtt import Configuration from citadel.mqtt import Configuration
@ -82,12 +81,23 @@ class Scene(threading.Thread, mqtt.Client):
# Check for futures if activating # Check for futures if activating
if self.scene_state == Scene.STATE_ACTIVATING: if self.scene_state == Scene.STATE_ACTIVATING:
if self.__activating_r_futures: for (action, futures) in self.__activating_r_futures:
r = wait(self.__activating_r_futures, timeout=1, return_when=ALL_COMPLETED) try:
r = futures.result(timeout=0)
if r != Action.SUCCESS:
self.logger.error("Action error")
# empty the futures list to jump on else statement
self.__activating_r_futures = [] self.__activating_r_futures = []
break
except concurrent.futures.TimeoutError:
break
else:
self.logger.info('Activating terminate')
self.__executor.join()
self.__executor = None
self.__state = Scene.STATE_ACTIVATE self.__state = Scene.STATE_ACTIVATE
for r in r.done: self.__send_state()
print(r.result())
def threaded_publish(self, *args, **kwargs): def threaded_publish(self, *args, **kwargs):
self.__pub_queue.put((args, kwargs)) self.__pub_queue.put((args, kwargs))
@ -127,17 +137,22 @@ class Scene(threading.Thread, mqtt.Client):
if self.scene_state: if self.scene_state:
self.logger.info('Already activate/activating ignore cmnd') self.logger.info('Already activate/activating ignore cmnd')
return return
if self.__executor != None:
with pebble.ThreadPool() as executor: self.logger.error('Trying to activate but __executor not None')
for a in self.ON: return
self.__activating_r_futures.append(
executor.schedule(a.setup(self).run))
self.__state = Scene.STATE_ACTIVATING self.__state = Scene.STATE_ACTIVATING
self.__executor = pebble.ThreadPool()
for a in self.ON:
self.__activating_r_futures.append(
(a, self.__executor.schedule(a.setup(self).run))
)
self.__executor.close()
self.__send_state() self.__send_state()
def __deactivate(self): def __deactivate(self):
#TODO cancel activating...
self.__state = Scene.STATE_DEACTIVATE self.__state = Scene.STATE_DEACTIVATE
self.__send_state() self.__send_state()
@ -148,7 +163,7 @@ class Scene(threading.Thread, mqtt.Client):
retain=False) retain=False)
class Action(object): class Action(object):
OK = 0 SUCCESS = 0
FAILED = 1 FAILED = 1
def setup(self, proxy: Scene): def setup(self, proxy: Scene):
@ -159,7 +174,7 @@ class Action(object):
return self return self
def ok(self): def ok(self):
return Action.OK return Action.SUCCESS
def failed(self): def failed(self):
return Action.FAILED return Action.FAILED