Compare commits

...

6 Commits

5 changed files with 351 additions and 183 deletions

View File

@ -1,193 +1,18 @@
import time from .action import Action, Publish, TasmotaShutter
import threading from .scene import Scene
import queue
import logging
import paho.mqtt.client as mqtt
from citadel.mqtt import Configuration
class Scene(threading.Thread, mqtt.Client):
""" A scene run as a thread, has his own mqtt session
Can be activated by topic:
cmnd/scene/{mqtt_name}/activate ON|OFF|null
And return status on topic:
stat/scene/{mqtt_name}/state ON|OFF
Special class attributes:
mqtt_name: define topic name, default class name in lower case
"""
TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate"
TOPIC_STAT = "stat/scene/{mqtt_name}/activate"
def __init__(self, config: Configuration, logger: logging.Logger=None):
threading.Thread.__init__(self)
mqtt.Client.__init__(self)
self.configuration = config
self._scene_state = False
if not hasattr(self, 'mqtt_name'):
self.mqtt_name = self.__class__.__name__.lower()
if not logger:
logger = logging.getLogger('scene.%s' % self.mqtt_name)
self.logger = logger
self._pub_queue = queue.Queue()
@property
def scene_state(self):
return self._scene_state
def run(self):
self.username_pw_set(self.configuration.user,
self.configuration.password)
self.connect(self.configuration.host,
self.configuration.port)
self.subscribe(self.TOPIC_CMND.format(mqtt_name=self.mqtt_name), 2)
for watch in self.WATCH:
self.subscribe(watch[0], 1)
run = True
while run:
self.loop()
try:
(args, kwargs) = self._pub_queue.get(block=False)
self.publish(*args, **kwargs)
except queue.Empty:
continue
def threaded_publish(self, *args, **kwargs):
self._pub_queue.put((args, kwargs))
def on_message(self, mqttc, obj, msg):
payload = msg.payload.decode().lower()
topic = msg.topic
# on CMND
if topic == self.TOPIC_CMND.format(mqtt_name=self.mqtt_name):
if payload == 'on':
self.logger.info('CMND activate ON')
self.__activate()
elif payload == 'off':
self.logger.info('CMND activate OFF')
self.__deactivate()
elif payload == '':
self.logger.info('CMND activate state')
self.__send_state()
else:
self.logger.warn('CMND activate invalide')
elif self.scene_state and self.__check_watched(topic, payload):
self.logger.info('Watched topic %s with invalid payload %s deactivating', topic, payload)
self.__deactivate()
def __check_watched(self, topic, payload):
""" return True if is a watched topic with an invalid payload """
for watch in self.WATCH:
if topic == watch[0]:
if payload != watch[1].lower():
return True
return False
return False
def __activate(self):
#activate scene
# lock for activate again or deactivate
# start all activate action concurently
# start result catcher thread
# result catcher set active if ok else log error
results = queue.Queue()
for a in self.ON:
a.setup(self, results).start()
self._scene_state = True
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
payload='ON',
qos=2,
retain=False)
def __deactivate(self):
#deactivate scene
self._scene_state = False
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
payload='OFF',
qos=2,
retain=False)
def __send_state(self):
print(self._scene_state)
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
payload='ON' if self.scene_state else 'OFF',
qos=2,
retain=False)
class Action(threading.Thread):
OK = 0
FAILED = 1
def __init__(self):
threading.Thread.__init__(self)
def setup(self, proxy: Scene, result: queue.Queue):
"""
proxy: a thread safe proxy object with publish, subscribe method
result: a queue for put result
"""
self.scene = proxy
self.result = result
return self
def ok(self):
self.result.put(Action.OK)
def failed(self):
self.result.put(Action.FAILED)
class Publish(Action):
def __init__(self, topic: str):
Action.__init__(self)
self.topic = topic
self.payload = ''
self.qos = 0
def Payload(self, payload):
self.payload = payload
return self
def Qos(self, qos):
self.qos = qos
return self
def run(self):
self.scene.threaded_publish(self.topic,
payload=self.payload, qos=self.qos, retain=False)
self.ok()
class Sleep(Scene): class Sleep(Scene):
ON = [ ON = [
Publish('cmnd/tasmota/screen/Screen/POWER').Payload('OFF').Qos(2), Publish('cmnd/tasmota/screen/Screen/POWER').Payload('OFF').Qos(2)\
.CheckResponse('stat/tasmota/screen/Screen/POWER', 'OFF', 3),
Publish('cmnd/tasmota/light/LivingroomFireplace/POWER').Payload('OFF').Qos(2), Publish('cmnd/tasmota/light/LivingroomFireplace/POWER').Payload('OFF').Qos(2),
TasmotaShutter('Bedroom1x1').Close(),
TasmotaShutter('Bedroom1x2').Close(),
TasmotaShutter('Staircase1').Close(),
TasmotaShutter('Bathroom').Close(),
] ]
WATCH = [ WATCH = [
('stat/tasmota/screen/Screen/POWER', 'OFF') ('stat/tasmota/screen/Screen/POWER', 'OFF')
# ('cmnd/tasmota/shutter/Bedroom1x1/ShutterClose', ''),
# ('cmnd/tasmota/shutter/Bedroom1x2/ShutterClose', ''),
# ('cmnd/tasmota/shutter/Bathroom/ShutterClose', ''),
# ('cmnd/tasmota/shutter/Staicase1/ShutterClose', ''),
] ]

View File

@ -4,6 +4,7 @@ import typer
import systemd.journal import systemd.journal
from citadel.mqtt import Configuration from citadel.mqtt import Configuration
from . import Sleep from . import Sleep
def main(\ def main(\

108
src/citadel/scene/action.py Normal file
View File

@ -0,0 +1,108 @@
import time
from .helper import WaitForTopic
class Action(object):
SUCCESS = 0
FAILED = 1
def setup(self, proxy):
"""
proxy: a thread safe proxy object with publish, subscribe method
"""
self.proxy = proxy
return self
def ok(self):
return Action.SUCCESS
def failed(self):
return Action.FAILED
@property
def name(self):
return self.__class__.__name__
class Publish(Action):
def __init__(self, topic: str):
Action.__init__(self)
self.topic = topic
self.payload = ''
self.qos = 0
self.check_response = None
def Payload(self, payload):
self.payload = payload
return self
def Qos(self, qos):
self.qos = qos
return self
def CheckResponse(self, topic: str, payload: str=None, timeout: int=None):
self.check_response= {'topic': topic, 'payload':payload, 'timeout': timeout}
return self
def run(self):
checker = None
if self.check_response:
checker = WaitForTopic(self.proxy.configuration,
self.check_response['topic'],
self.check_response['payload'])
self.proxy.threaded_publish(self.topic,
payload=self.payload, qos=self.qos, retain=False)
if self.check_response:
checker.join(timeout=self.check_response['timeout'])
if checker.is_alive(): #timeout occur
checker.stop()
return self.failed()
return self.ok()
class TasmotaShutter(Action):
TOPIC_FORMAT = "cmnd/tasmota/shutter/{shutter_name}/{action}"
TOPIC_CHECK_FORMAT = "stat/tasmota/shutter/{shutter_name}/RESULT"
ACTION_OPEN = 'ShutterOpen'
ACTION_CLOSE = 'ShutterClose'
def __init__(self, shutter_name: str):
Action.__init__(self)
self.shutter_name = shutter_name
self.action = None
def Close(self):
self.action = self.ACTION_CLOSE
return self
def Open(self):
self.action = self.ACTION_OPEN
return self
def run(self):
if self.action is None:
raise RuntimeError('No action specified')
topic = self.TOPIC_FORMAT.format(
shutter_name = self.shutter_name,
action = self.action
)
checker = WaitForTopic(self.proxy.configuration,
self.TOPIC_CHECK_FORMAT.format(shutter_name = self.shutter_name),
payload= '{"Shutter1":{"Position":100,"Direction":0,"Target":100}}' \
if self.action == self.ACTION_OPEN \
else '{"Shutter1":{"Position":0,"Direction":0,"Target":0}}')
time.sleep(1)
self.proxy.threaded_publish(topic, qos=1, retain=False)
checker.join(timeout=35)
if checker.is_alive():
return self.failed()
return self.ok()

View File

@ -0,0 +1,65 @@
import threading
import logging
from typing import Union, Callable
import paho.mqtt.client as mqtt
from citadel.mqtt import Configuration
class WaitForTopic(threading.Thread, mqtt.Client):
def __init__(self,
config: Configuration,
topic: str,
payload: Union[str, Callable[[str],bool]],
logger: logging.Logger=None):
""" payload: str or function stop if payload is received or function return True, the function must have this signature func(payload: str) -> bool """
threading.Thread.__init__(self)
mqtt.Client.__init__(self)
self.configuration = config
self.topic = topic
if callable(payload):
self.check_payload = payload
else:
self.check_payload = lambda x: x == payload
self.daemon = True
if not logger:
logger = logging.getLogger('watcher')
self.logger = logger
self.__run = True
self.username_pw_set(self.configuration.user,
self.configuration.password)
self.connect(self.configuration.host,
self.configuration.port)
self.subscribe(self.topic, 1)
self.start()
def on_message(self, mqttc, obj, msg):
payload = msg.payload.decode()
topic = msg.topic
self.logger.debug('Topic: %s, Payload: %s', topic, payload)
if topic != self.topic:
self.logger.error('Invaliad topic: %s', topic)
return
if self.check_payload(payload):
self.__run = False
def stop(self):
self.__run = False
def run(self):
while self.__run:
self.loop()
self.disconnect()

169
src/citadel/scene/scene.py Normal file
View File

@ -0,0 +1,169 @@
import threading
import logging
import queue
import concurrent.futures
import pebble
import paho.mqtt.client as mqtt
from citadel.mqtt import Configuration
from .action import Action
class Scene(threading.Thread, mqtt.Client):
""" A scene run as a thread, has his own mqtt session
Can be activated by topic:
cmnd/scene/{mqtt_name}/activate ON|OFF|null
And return status on topic:
stat/scene/{mqtt_name}/state ON|OFF
Special class attributes:
mqtt_name: define topic name, default class name in lower case
"""
TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate"
TOPIC_STAT = "stat/scene/{mqtt_name}/activate"
STATE_DEACTIVATE = 0
STATE_ACTIVATE = 1
STATE_ACTIVATING = 2
def __init__(self, config: Configuration, logger: logging.Logger=None):
threading.Thread.__init__(self)
mqtt.Client.__init__(self)
self.configuration = config
if not hasattr(self, 'mqtt_name'):
self.mqtt_name = self.__class__.__name__.lower()
if not logger:
logger = logging.getLogger('scene.%s' % self.mqtt_name)
self.logger = logger
self.__pub_queue = queue.Queue()
self.__state = Scene.STATE_DEACTIVATE
self.__executor = None
self.__activating_r_futures = []
@property
def scene_state(self):
return self.__state
def run(self):
self.username_pw_set(self.configuration.user,
self.configuration.password)
self.connect(self.configuration.host,
self.configuration.port)
self.subscribe(self.TOPIC_CMND.format(mqtt_name=self.mqtt_name), 2)
for watch in self.WATCH:
self.subscribe(watch[0], 1)
run = True
while run:
#Mqtt client loop
self.loop()
#Check publish queue
try:
(args, kwargs) = self.__pub_queue.get(block=False)
self.publish(*args, **kwargs)
except queue.Empty:
pass
# Check for futures if activating
if self.scene_state == Scene.STATE_ACTIVATING:
for (action, futures) in self.__activating_r_futures:
try:
r = futures.result(timeout=0)
if r != Action.SUCCESS:
self.logger.error("Action: '%s' error", action.name)
self.logger.info('Activating error')
self.__executor.join()
self.__executor = None
self.__activating_r_futures = []
self.__state = Scene.STATE_DEACTIVATE
self.__send_state()
break
except concurrent.futures.TimeoutError:
break
else:
self.logger.info('Activating terminate')
self.__executor.join()
self.__executor = None
self.__activating_r_futures = []
self.__state = Scene.STATE_ACTIVATE
self.__send_state()
def threaded_publish(self, *args, **kwargs):
self.__pub_queue.put((args, kwargs))
def on_message(self, mqttc, obj, msg):
payload = msg.payload.decode().lower()
topic = msg.topic
# on CMND
if topic == self.TOPIC_CMND.format(mqtt_name=self.mqtt_name):
if payload == 'on':
self.logger.info('CMND activate ON')
self.__activate()
elif payload == 'off':
self.logger.info('CMND activate OFF')
self.__deactivate()
elif payload == '':
self.logger.info('CMND activate state')
self.__send_state()
else:
self.logger.warn('CMND activate invalide')
elif self.scene_state and self.__check_watched(topic, payload):
self.logger.info('Watched topic %s with invalid payload %s deactivating', topic, payload)
self.__deactivate()
def __check_watched(self, topic, payload):
""" return True if is a watched topic with an invalid payload """
for watch in self.WATCH:
if topic == watch[0]:
if payload != watch[1].lower():
return True
return False
return False
def __activate(self):
if self.scene_state:
self.logger.info('Already activate/activating ignore cmnd')
return
if self.__executor != None:
self.logger.error('Trying to activate but __executor not None')
return
self.__state = Scene.STATE_ACTIVATING
self.__executor = pebble.ThreadPool()
for a in self.ON:
self.__activating_r_futures.append(
(a, self.__executor.schedule(a.setup(self).run))
)
self.__executor.close()
self.__send_state()
def __deactivate(self):
self.__state = Scene.STATE_DEACTIVATE
self.__send_state()
def __send_state(self):
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
payload='ON' if self.scene_state else 'OFF',
qos=2,
retain=False)