From 98bfac1dc310fa754c84a3541567306f248edbf0 Mon Sep 17 00:00:00 2001 From: Nicolas Duhamel Date: Fri, 19 Mar 2021 07:34:22 +0100 Subject: [PATCH] First concurrent --- src/citadel/scene/__init__.py | 83 ++++++++++++++++++++++++++++------- src/citadel/scene/__main__.py | 2 +- 2 files changed, 69 insertions(+), 16 deletions(-) diff --git a/src/citadel/scene/__init__.py b/src/citadel/scene/__init__.py index 81304c1..8c1c8f1 100644 --- a/src/citadel/scene/__init__.py +++ b/src/citadel/scene/__init__.py @@ -1,4 +1,6 @@ +import time import threading +import queue import logging import paho.mqtt.client as mqtt @@ -38,6 +40,8 @@ class Scene(threading.Thread, mqtt.Client): logger = logging.getLogger('scene.%s' % self.mqtt_name) self.logger = logger + self._pub_queue = queue.Queue() + @property def scene_state(self): return self._scene_state @@ -56,6 +60,14 @@ class Scene(threading.Thread, mqtt.Client): run = True while run: self.loop() + try: + (args, kwargs) = self._pub_queue.get(block=False) + self.publish(*args, **kwargs) + except queue.Empty: + continue + + def threaded_publish(self, *args, **kwargs): + self._pub_queue.put((args, kwargs)) def on_message(self, mqttc, obj, msg): payload = msg.payload.decode().lower() @@ -90,8 +102,15 @@ class Scene(threading.Thread, mqtt.Client): def __activate(self): #activate scene - for action in self.ACTIVATE: - self.publish(action[0], payload=action[1], qos=2, retain=False) + # lock for activate again or deactivate + # start all activate action concurently + # start result catcher thread + # result catcher set active if ok else log error + + results = queue.Queue() + + for a in self.ON: + a.setup(self, results).start() self._scene_state = True self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name), @@ -114,21 +133,55 @@ class Scene(threading.Thread, mqtt.Client): qos=2, retain=False) +class Action(threading.Thread): + OK = 0 + FAILED = 1 + + def __init__(self): + threading.Thread.__init__(self) + + def setup(self, proxy: Scene, result: queue.Queue): + """ + proxy: a thread safe proxy object with publish, subscribe method + result: a queue for put result + """ + self.scene = proxy + self.result = result + return self + + def ok(self): + self.result.put(Action.OK) + + def failed(self): + self.result.put(Action.FAILED) + +class Publish(Action): + + def __init__(self, topic: str): + Action.__init__(self) + self.topic = topic + self.payload = '' + self.qos = 0 + + def Payload(self, payload): + self.payload = payload + return self + + def Qos(self, qos): + self.qos = qos + return self + + def run(self): + self.scene.threaded_publish(self.topic, + payload=self.payload, qos=self.qos, retain=False) + self.ok() + + class Sleep(Scene): - ACTIVATE = [ - # Close all Shutter - ('cmnd/tasmota/shutter/Bedroom1x1/ShutterClose', ''), - ('cmnd/tasmota/shutter/Bedroom1x2/ShutterClose', ''), - ('cmnd/tasmota/shutter/Bathroom/ShutterClose', ''), - ('cmnd/tasmota/shutter/Staicase1/ShutterClose', ''), - - # Shutdown all light - - # Set thermostat to sleepmode - - # Close screen - ('cmnd/tasmota/screen/Screen/POWER', 'OFF') + ON = [ + Publish('cmnd/tasmota/screen/Screen/POWER').Payload('OFF').Qos(2), + Publish('cmnd/tasmota/light/LivingroomFireplace/POWER').Payload('OFF').Qos(2), ] WATCH = [ diff --git a/src/citadel/scene/__main__.py b/src/citadel/scene/__main__.py index cff99ac..0295531 100644 --- a/src/citadel/scene/__main__.py +++ b/src/citadel/scene/__main__.py @@ -38,7 +38,7 @@ def main(\ port = mqtt_port scene = Sleep(Config) - scene.run() + scene.start() if __name__ == "__main__": typer.run(main)