File split

This commit is contained in:
Nicolas Duhamel 2021-04-01 14:55:16 +02:00
parent 1c0f931aa0
commit cb03c12d16
5 changed files with 345 additions and 333 deletions

View File

@ -1,332 +1,5 @@
import time from .action import Action, Publish, TasmotaShutter
import threading from .scene import Scene
import queue
import logging
import concurrent.futures
from typing import Union, Callable
import pebble
import paho.mqtt.client as mqtt
from citadel.mqtt import Configuration
class Scene(threading.Thread, mqtt.Client):
""" A scene run as a thread, has his own mqtt session
Can be activated by topic:
cmnd/scene/{mqtt_name}/activate ON|OFF|null
And return status on topic:
stat/scene/{mqtt_name}/state ON|OFF
Special class attributes:
mqtt_name: define topic name, default class name in lower case
"""
TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate"
TOPIC_STAT = "stat/scene/{mqtt_name}/activate"
STATE_DEACTIVATE = 0
STATE_ACTIVATE = 1
STATE_ACTIVATING = 2
def __init__(self, config: Configuration, logger: logging.Logger=None):
threading.Thread.__init__(self)
mqtt.Client.__init__(self)
self.configuration = config
if not hasattr(self, 'mqtt_name'):
self.mqtt_name = self.__class__.__name__.lower()
if not logger:
logger = logging.getLogger('scene.%s' % self.mqtt_name)
self.logger = logger
self.__pub_queue = queue.Queue()
self.__state = Scene.STATE_DEACTIVATE
self.__executor = None
self.__activating_r_futures = []
@property
def scene_state(self):
return self.__state
def run(self):
self.username_pw_set(self.configuration.user,
self.configuration.password)
self.connect(self.configuration.host,
self.configuration.port)
self.subscribe(self.TOPIC_CMND.format(mqtt_name=self.mqtt_name), 2)
for watch in self.WATCH:
self.subscribe(watch[0], 1)
run = True
while run:
#Mqtt client loop
self.loop()
#Check publish queue
try:
(args, kwargs) = self.__pub_queue.get(block=False)
self.publish(*args, **kwargs)
except queue.Empty:
pass
# Check for futures if activating
if self.scene_state == Scene.STATE_ACTIVATING:
for (action, futures) in self.__activating_r_futures:
try:
r = futures.result(timeout=0)
if r != Action.SUCCESS:
self.logger.error("Action: '%s' error", action.name)
self.logger.info('Activating error')
self.__executor.join()
self.__executor = None
self.__activating_r_futures = []
self.__state = Scene.STATE_DEACTIVATE
self.__send_state()
break
except concurrent.futures.TimeoutError:
break
else:
self.logger.info('Activating terminate')
self.__executor.join()
self.__executor = None
self.__activating_r_futures = []
self.__state = Scene.STATE_ACTIVATE
self.__send_state()
def threaded_publish(self, *args, **kwargs):
self.__pub_queue.put((args, kwargs))
def on_message(self, mqttc, obj, msg):
payload = msg.payload.decode().lower()
topic = msg.topic
# on CMND
if topic == self.TOPIC_CMND.format(mqtt_name=self.mqtt_name):
if payload == 'on':
self.logger.info('CMND activate ON')
self.__activate()
elif payload == 'off':
self.logger.info('CMND activate OFF')
self.__deactivate()
elif payload == '':
self.logger.info('CMND activate state')
self.__send_state()
else:
self.logger.warn('CMND activate invalide')
elif self.scene_state and self.__check_watched(topic, payload):
self.logger.info('Watched topic %s with invalid payload %s deactivating', topic, payload)
self.__deactivate()
def __check_watched(self, topic, payload):
""" return True if is a watched topic with an invalid payload """
for watch in self.WATCH:
if topic == watch[0]:
if payload != watch[1].lower():
return True
return False
return False
def __activate(self):
if self.scene_state:
self.logger.info('Already activate/activating ignore cmnd')
return
if self.__executor != None:
self.logger.error('Trying to activate but __executor not None')
return
self.__state = Scene.STATE_ACTIVATING
self.__executor = pebble.ThreadPool()
for a in self.ON:
self.__activating_r_futures.append(
(a, self.__executor.schedule(a.setup(self).run))
)
self.__executor.close()
self.__send_state()
def __deactivate(self):
self.__state = Scene.STATE_DEACTIVATE
self.__send_state()
def __send_state(self):
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
payload='ON' if self.scene_state else 'OFF',
qos=2,
retain=False)
class WaitForTopic(threading.Thread, mqtt.Client):
def __init__(self,
config: Configuration,
topic: str,
payload: Union[str, Callable[[str],bool]],
logger: logging.Logger=None):
""" payload: str or function stop if payload is received or function return True, the function must have this signature func(payload: str) -> bool """
threading.Thread.__init__(self)
mqtt.Client.__init__(self)
self.configuration = config
self.topic = topic
if callable(payload):
self.check_payload = payload
else:
self.check_payload = lambda x: x == payload
self.daemon = True
if not logger:
logger = logging.getLogger('watcher')
self.logger = logger
self.__run = True
self.username_pw_set(self.configuration.user,
self.configuration.password)
self.connect(self.configuration.host,
self.configuration.port)
self.subscribe(self.topic, 1)
self.start()
def on_message(self, mqttc, obj, msg):
payload = msg.payload.decode()
topic = msg.topic
self.logger.info('Topic: %s, Payload: %s', topic, payload)
if topic != self.topic:
self.logger.error('Invaliad topic: %s', topic)
return
if self.check_payload(payload):
self.__run = False
def stop(self):
self.__run = False
def run(self):
while self.__run:
self.loop()
self.disconnect()
class Action(object):
SUCCESS = 0
FAILED = 1
def setup(self, proxy: Scene):
"""
proxy: a thread safe proxy object with publish, subscribe method
"""
self.proxy = proxy
return self
def ok(self):
return Action.SUCCESS
def failed(self):
return Action.FAILED
@property
def name(self):
return self.__class__.__name__
class Publish(Action):
def __init__(self, topic: str):
Action.__init__(self)
self.topic = topic
self.payload = ''
self.qos = 0
self.check_response = None
def Payload(self, payload):
self.payload = payload
return self
def Qos(self, qos):
self.qos = qos
return self
def CheckResponse(self, topic: str, payload: str=None, timeout: int=None):
self.check_response= {'topic': topic, 'payload':payload, 'timeout': timeout}
return self
def run(self):
checker = None
if self.check_response:
checker = WaitForTopic(self.proxy.configuration,
self.check_response['topic'],
self.check_response['payload'])
self.proxy.threaded_publish(self.topic,
payload=self.payload, qos=self.qos, retain=False)
if self.check_response:
checker.join(timeout=self.check_response['timeout'])
if checker.is_alive(): #timeout occur
checker.stop()
return self.failed()
return self.ok()
class TasmotaShutter(Action):
TOPIC_FORMAT = "cmnd/tasmota/shutter/{shutter_name}/{action}"
TOPIC_CHECK_FORMAT = "stat/tasmota/shutter/{shutter_name}/SHUTTER1"
ACTION_OPEN = 'ShutterOpen'
ACTION_CLOSE = 'ShutterClose'
def __init__(self, shutter_name: str):
Action.__init__(self)
self.shutter_name = shutter_name
self.action = None
def Close(self):
self.action = self.ACTION_CLOSE
return self
def Open(self):
self.action = self.ACTION_OPEN
return self
def run(self):
if self.action is None:
raise RuntimeError('No action specified')
topic = self.TOPIC_FORMAT.format(
shutter_name = self.shutter_name,
action = self.action
)
self.proxy.threaded_publish(topic, qos=1, retain=False)
checker = WaitForTopic(self.proxy.configuration,
self.TOPIC_CHECK_FORMAT.format(shutter_name = self.shutter_name),
payload= '100' if self.action == self.ACTION_OPEN else '0')
checker.join(timeout=30)
if checker.is_alive():
return self.failed()
return self.ok()
class Sleep(Scene): class Sleep(Scene):
@ -342,8 +15,4 @@ class Sleep(Scene):
WATCH = [ WATCH = [
('stat/tasmota/screen/Screen/POWER', 'OFF') ('stat/tasmota/screen/Screen/POWER', 'OFF')
# ('cmnd/tasmota/shutter/Bedroom1x1/ShutterClose', ''),
# ('cmnd/tasmota/shutter/Bedroom1x2/ShutterClose', ''),
# ('cmnd/tasmota/shutter/Bathroom/ShutterClose', ''),
# ('cmnd/tasmota/shutter/Staicase1/ShutterClose', ''),
] ]

View File

@ -4,6 +4,7 @@ import typer
import systemd.journal import systemd.journal
from citadel.mqtt import Configuration from citadel.mqtt import Configuration
from . import Sleep from . import Sleep
def main(\ def main(\

108
src/citadel/scene/action.py Normal file
View File

@ -0,0 +1,108 @@
import time
from .helper import WaitForTopic
class Action(object):
SUCCESS = 0
FAILED = 1
def setup(self, proxy):
"""
proxy: a thread safe proxy object with publish, subscribe method
"""
self.proxy = proxy
return self
def ok(self):
return Action.SUCCESS
def failed(self):
return Action.FAILED
@property
def name(self):
return self.__class__.__name__
class Publish(Action):
def __init__(self, topic: str):
Action.__init__(self)
self.topic = topic
self.payload = ''
self.qos = 0
self.check_response = None
def Payload(self, payload):
self.payload = payload
return self
def Qos(self, qos):
self.qos = qos
return self
def CheckResponse(self, topic: str, payload: str=None, timeout: int=None):
self.check_response= {'topic': topic, 'payload':payload, 'timeout': timeout}
return self
def run(self):
checker = None
if self.check_response:
checker = WaitForTopic(self.proxy.configuration,
self.check_response['topic'],
self.check_response['payload'])
self.proxy.threaded_publish(self.topic,
payload=self.payload, qos=self.qos, retain=False)
if self.check_response:
checker.join(timeout=self.check_response['timeout'])
if checker.is_alive(): #timeout occur
checker.stop()
return self.failed()
return self.ok()
class TasmotaShutter(Action):
TOPIC_FORMAT = "cmnd/tasmota/shutter/{shutter_name}/{action}"
TOPIC_CHECK_FORMAT = "stat/tasmota/shutter/{shutter_name}/RESULT"
ACTION_OPEN = 'ShutterOpen'
ACTION_CLOSE = 'ShutterClose'
def __init__(self, shutter_name: str):
Action.__init__(self)
self.shutter_name = shutter_name
self.action = None
def Close(self):
self.action = self.ACTION_CLOSE
return self
def Open(self):
self.action = self.ACTION_OPEN
return self
def run(self):
if self.action is None:
raise RuntimeError('No action specified')
topic = self.TOPIC_FORMAT.format(
shutter_name = self.shutter_name,
action = self.action
)
checker = WaitForTopic(self.proxy.configuration,
self.TOPIC_CHECK_FORMAT.format(shutter_name = self.shutter_name),
payload= '{"Shutter1":{"Position":100,"Direction":0,"Target":100}}' \
if self.action == self.ACTION_OPEN \
else '{"Shutter1":{"Position":0,"Direction":0,"Target":0}}')
time.sleep(1)
self.proxy.threaded_publish(topic, qos=1, retain=False)
checker.join(timeout=35)
if checker.is_alive():
return self.failed()
return self.ok()

View File

@ -0,0 +1,65 @@
import threading
import logging
from typing import Union, Callable
import paho.mqtt.client as mqtt
from citadel.mqtt import Configuration
class WaitForTopic(threading.Thread, mqtt.Client):
def __init__(self,
config: Configuration,
topic: str,
payload: Union[str, Callable[[str],bool]],
logger: logging.Logger=None):
""" payload: str or function stop if payload is received or function return True, the function must have this signature func(payload: str) -> bool """
threading.Thread.__init__(self)
mqtt.Client.__init__(self)
self.configuration = config
self.topic = topic
if callable(payload):
self.check_payload = payload
else:
self.check_payload = lambda x: x == payload
self.daemon = True
if not logger:
logger = logging.getLogger('watcher')
self.logger = logger
self.__run = True
self.username_pw_set(self.configuration.user,
self.configuration.password)
self.connect(self.configuration.host,
self.configuration.port)
self.subscribe(self.topic, 1)
self.start()
def on_message(self, mqttc, obj, msg):
payload = msg.payload.decode()
topic = msg.topic
self.logger.debug('Topic: %s, Payload: %s', topic, payload)
if topic != self.topic:
self.logger.error('Invaliad topic: %s', topic)
return
if self.check_payload(payload):
self.__run = False
def stop(self):
self.__run = False
def run(self):
while self.__run:
self.loop()
self.disconnect()

169
src/citadel/scene/scene.py Normal file
View File

@ -0,0 +1,169 @@
import threading
import logging
import queue
import concurrent.futures
import pebble
import paho.mqtt.client as mqtt
from citadel.mqtt import Configuration
from .action import Action
class Scene(threading.Thread, mqtt.Client):
""" A scene run as a thread, has his own mqtt session
Can be activated by topic:
cmnd/scene/{mqtt_name}/activate ON|OFF|null
And return status on topic:
stat/scene/{mqtt_name}/state ON|OFF
Special class attributes:
mqtt_name: define topic name, default class name in lower case
"""
TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate"
TOPIC_STAT = "stat/scene/{mqtt_name}/activate"
STATE_DEACTIVATE = 0
STATE_ACTIVATE = 1
STATE_ACTIVATING = 2
def __init__(self, config: Configuration, logger: logging.Logger=None):
threading.Thread.__init__(self)
mqtt.Client.__init__(self)
self.configuration = config
if not hasattr(self, 'mqtt_name'):
self.mqtt_name = self.__class__.__name__.lower()
if not logger:
logger = logging.getLogger('scene.%s' % self.mqtt_name)
self.logger = logger
self.__pub_queue = queue.Queue()
self.__state = Scene.STATE_DEACTIVATE
self.__executor = None
self.__activating_r_futures = []
@property
def scene_state(self):
return self.__state
def run(self):
self.username_pw_set(self.configuration.user,
self.configuration.password)
self.connect(self.configuration.host,
self.configuration.port)
self.subscribe(self.TOPIC_CMND.format(mqtt_name=self.mqtt_name), 2)
for watch in self.WATCH:
self.subscribe(watch[0], 1)
run = True
while run:
#Mqtt client loop
self.loop()
#Check publish queue
try:
(args, kwargs) = self.__pub_queue.get(block=False)
self.publish(*args, **kwargs)
except queue.Empty:
pass
# Check for futures if activating
if self.scene_state == Scene.STATE_ACTIVATING:
for (action, futures) in self.__activating_r_futures:
try:
r = futures.result(timeout=0)
if r != Action.SUCCESS:
self.logger.error("Action: '%s' error", action.name)
self.logger.info('Activating error')
self.__executor.join()
self.__executor = None
self.__activating_r_futures = []
self.__state = Scene.STATE_DEACTIVATE
self.__send_state()
break
except concurrent.futures.TimeoutError:
break
else:
self.logger.info('Activating terminate')
self.__executor.join()
self.__executor = None
self.__activating_r_futures = []
self.__state = Scene.STATE_ACTIVATE
self.__send_state()
def threaded_publish(self, *args, **kwargs):
self.__pub_queue.put((args, kwargs))
def on_message(self, mqttc, obj, msg):
payload = msg.payload.decode().lower()
topic = msg.topic
# on CMND
if topic == self.TOPIC_CMND.format(mqtt_name=self.mqtt_name):
if payload == 'on':
self.logger.info('CMND activate ON')
self.__activate()
elif payload == 'off':
self.logger.info('CMND activate OFF')
self.__deactivate()
elif payload == '':
self.logger.info('CMND activate state')
self.__send_state()
else:
self.logger.warn('CMND activate invalide')
elif self.scene_state and self.__check_watched(topic, payload):
self.logger.info('Watched topic %s with invalid payload %s deactivating', topic, payload)
self.__deactivate()
def __check_watched(self, topic, payload):
""" return True if is a watched topic with an invalid payload """
for watch in self.WATCH:
if topic == watch[0]:
if payload != watch[1].lower():
return True
return False
return False
def __activate(self):
if self.scene_state:
self.logger.info('Already activate/activating ignore cmnd')
return
if self.__executor != None:
self.logger.error('Trying to activate but __executor not None')
return
self.__state = Scene.STATE_ACTIVATING
self.__executor = pebble.ThreadPool()
for a in self.ON:
self.__activating_r_futures.append(
(a, self.__executor.schedule(a.setup(self).run))
)
self.__executor.close()
self.__send_state()
def __deactivate(self):
self.__state = Scene.STATE_DEACTIVATE
self.__send_state()
def __send_state(self):
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
payload='ON' if self.scene_state else 'OFF',
qos=2,
retain=False)