First concurrent

This commit is contained in:
Nicolas Duhamel 2021-03-19 07:34:22 +01:00
parent 1531708338
commit 98bfac1dc3
2 changed files with 69 additions and 16 deletions

View File

@ -1,4 +1,6 @@
import time
import threading import threading
import queue
import logging import logging
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
@ -38,6 +40,8 @@ class Scene(threading.Thread, mqtt.Client):
logger = logging.getLogger('scene.%s' % self.mqtt_name) logger = logging.getLogger('scene.%s' % self.mqtt_name)
self.logger = logger self.logger = logger
self._pub_queue = queue.Queue()
@property @property
def scene_state(self): def scene_state(self):
return self._scene_state return self._scene_state
@ -56,6 +60,14 @@ class Scene(threading.Thread, mqtt.Client):
run = True run = True
while run: while run:
self.loop() self.loop()
try:
(args, kwargs) = self._pub_queue.get(block=False)
self.publish(*args, **kwargs)
except queue.Empty:
continue
def threaded_publish(self, *args, **kwargs):
self._pub_queue.put((args, kwargs))
def on_message(self, mqttc, obj, msg): def on_message(self, mqttc, obj, msg):
payload = msg.payload.decode().lower() payload = msg.payload.decode().lower()
@ -90,8 +102,15 @@ class Scene(threading.Thread, mqtt.Client):
def __activate(self): def __activate(self):
#activate scene #activate scene
for action in self.ACTIVATE: # lock for activate again or deactivate
self.publish(action[0], payload=action[1], qos=2, retain=False) # start all activate action concurently
# start result catcher thread
# result catcher set active if ok else log error
results = queue.Queue()
for a in self.ON:
a.setup(self, results).start()
self._scene_state = True self._scene_state = True
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name), self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
@ -114,21 +133,55 @@ class Scene(threading.Thread, mqtt.Client):
qos=2, qos=2,
retain=False) retain=False)
class Action(threading.Thread):
OK = 0
FAILED = 1
def __init__(self):
threading.Thread.__init__(self)
def setup(self, proxy: Scene, result: queue.Queue):
"""
proxy: a thread safe proxy object with publish, subscribe method
result: a queue for put result
"""
self.scene = proxy
self.result = result
return self
def ok(self):
self.result.put(Action.OK)
def failed(self):
self.result.put(Action.FAILED)
class Publish(Action):
def __init__(self, topic: str):
Action.__init__(self)
self.topic = topic
self.payload = ''
self.qos = 0
def Payload(self, payload):
self.payload = payload
return self
def Qos(self, qos):
self.qos = qos
return self
def run(self):
self.scene.threaded_publish(self.topic,
payload=self.payload, qos=self.qos, retain=False)
self.ok()
class Sleep(Scene): class Sleep(Scene):
ACTIVATE = [ ON = [
# Close all Shutter Publish('cmnd/tasmota/screen/Screen/POWER').Payload('OFF').Qos(2),
('cmnd/tasmota/shutter/Bedroom1x1/ShutterClose', ''), Publish('cmnd/tasmota/light/LivingroomFireplace/POWER').Payload('OFF').Qos(2),
('cmnd/tasmota/shutter/Bedroom1x2/ShutterClose', ''),
('cmnd/tasmota/shutter/Bathroom/ShutterClose', ''),
('cmnd/tasmota/shutter/Staicase1/ShutterClose', ''),
# Shutdown all light
# Set thermostat to sleepmode
# Close screen
('cmnd/tasmota/screen/Screen/POWER', 'OFF')
] ]
WATCH = [ WATCH = [

View File

@ -38,7 +38,7 @@ def main(\
port = mqtt_port port = mqtt_port
scene = Sleep(Config) scene = Sleep(Config)
scene.run() scene.start()
if __name__ == "__main__": if __name__ == "__main__":
typer.run(main) typer.run(main)