WIP todo: timeout
This commit is contained in:
parent
98bfac1dc3
commit
c0446c8c7f
@ -2,6 +2,10 @@ import time
|
||||
import threading
|
||||
import queue
|
||||
import logging
|
||||
from concurrent.futures import wait, ALL_COMPLETED
|
||||
|
||||
import pebble
|
||||
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
@ -26,12 +30,15 @@ class Scene(threading.Thread, mqtt.Client):
|
||||
TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate"
|
||||
TOPIC_STAT = "stat/scene/{mqtt_name}/activate"
|
||||
|
||||
STATE_DEACTIVATE = 0
|
||||
STATE_ACTIVATE = 1
|
||||
STATE_ACTIVATING = 2
|
||||
|
||||
def __init__(self, config: Configuration, logger: logging.Logger=None):
|
||||
threading.Thread.__init__(self)
|
||||
mqtt.Client.__init__(self)
|
||||
|
||||
self.configuration = config
|
||||
self._scene_state = False
|
||||
|
||||
if not hasattr(self, 'mqtt_name'):
|
||||
self.mqtt_name = self.__class__.__name__.lower()
|
||||
@ -40,11 +47,15 @@ class Scene(threading.Thread, mqtt.Client):
|
||||
logger = logging.getLogger('scene.%s' % self.mqtt_name)
|
||||
self.logger = logger
|
||||
|
||||
self._pub_queue = queue.Queue()
|
||||
self.__pub_queue = queue.Queue()
|
||||
self.__state = Scene.STATE_DEACTIVATE
|
||||
|
||||
self.__executor = None
|
||||
self.__activating_r_futures = []
|
||||
|
||||
@property
|
||||
def scene_state(self):
|
||||
return self._scene_state
|
||||
return self.__state
|
||||
|
||||
def run(self):
|
||||
self.username_pw_set(self.configuration.user,
|
||||
@ -59,15 +70,27 @@ class Scene(threading.Thread, mqtt.Client):
|
||||
|
||||
run = True
|
||||
while run:
|
||||
#Mqtt client loop
|
||||
self.loop()
|
||||
|
||||
#Check publish queue
|
||||
try:
|
||||
(args, kwargs) = self._pub_queue.get(block=False)
|
||||
(args, kwargs) = self.__pub_queue.get(block=False)
|
||||
self.publish(*args, **kwargs)
|
||||
except queue.Empty:
|
||||
continue
|
||||
pass
|
||||
|
||||
# Check for futures if activating
|
||||
if self.scene_state == Scene.STATE_ACTIVATING:
|
||||
if self.__activating_r_futures:
|
||||
r = wait(self.__activating_r_futures, timeout=1, return_when=ALL_COMPLETED)
|
||||
self.__activating_r_futures = []
|
||||
self.__state = Scene.STATE_ACTIVATE
|
||||
for r in r.done:
|
||||
print(r.result())
|
||||
|
||||
def threaded_publish(self, *args, **kwargs):
|
||||
self._pub_queue.put((args, kwargs))
|
||||
self.__pub_queue.put((args, kwargs))
|
||||
|
||||
def on_message(self, mqttc, obj, msg):
|
||||
payload = msg.payload.decode().lower()
|
||||
@ -101,59 +124,45 @@ class Scene(threading.Thread, mqtt.Client):
|
||||
return False
|
||||
|
||||
def __activate(self):
|
||||
#activate scene
|
||||
# lock for activate again or deactivate
|
||||
# start all activate action concurently
|
||||
# start result catcher thread
|
||||
# result catcher set active if ok else log error
|
||||
if self.scene_state:
|
||||
self.logger.info('Already activate/activating ignore cmnd')
|
||||
return
|
||||
|
||||
results = queue.Queue()
|
||||
with pebble.ThreadPool() as executor:
|
||||
for a in self.ON:
|
||||
self.__activating_r_futures.append(
|
||||
executor.schedule(a.setup(self).run))
|
||||
|
||||
for a in self.ON:
|
||||
a.setup(self, results).start()
|
||||
|
||||
self._scene_state = True
|
||||
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
|
||||
payload='ON',
|
||||
qos=2,
|
||||
retain=False)
|
||||
self.__state = Scene.STATE_ACTIVATING
|
||||
self.__send_state()
|
||||
|
||||
def __deactivate(self):
|
||||
#deactivate scene
|
||||
self._scene_state = False
|
||||
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
|
||||
payload='OFF',
|
||||
qos=2,
|
||||
retain=False)
|
||||
#TODO cancel activating...
|
||||
self.__state = Scene.STATE_DEACTIVATE
|
||||
self.__send_state()
|
||||
|
||||
def __send_state(self):
|
||||
print(self._scene_state)
|
||||
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
|
||||
payload='ON' if self.scene_state else 'OFF',
|
||||
qos=2,
|
||||
retain=False)
|
||||
|
||||
class Action(threading.Thread):
|
||||
class Action(object):
|
||||
OK = 0
|
||||
FAILED = 1
|
||||
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
def setup(self, proxy: Scene, result: queue.Queue):
|
||||
def setup(self, proxy: Scene):
|
||||
"""
|
||||
proxy: a thread safe proxy object with publish, subscribe method
|
||||
result: a queue for put result
|
||||
"""
|
||||
self.scene = proxy
|
||||
self.result = result
|
||||
self.proxy = proxy
|
||||
return self
|
||||
|
||||
def ok(self):
|
||||
self.result.put(Action.OK)
|
||||
return Action.OK
|
||||
|
||||
def failed(self):
|
||||
self.result.put(Action.FAILED)
|
||||
return Action.FAILED
|
||||
|
||||
class Publish(Action):
|
||||
|
||||
@ -172,9 +181,9 @@ class Publish(Action):
|
||||
return self
|
||||
|
||||
def run(self):
|
||||
self.scene.threaded_publish(self.topic,
|
||||
self.proxy.threaded_publish(self.topic,
|
||||
payload=self.payload, qos=self.qos, retain=False)
|
||||
self.ok()
|
||||
return self.ok()
|
||||
|
||||
|
||||
class Sleep(Scene):
|
||||
|
Loading…
x
Reference in New Issue
Block a user