Compare commits
No commits in common. "cb03c12d160f916437f8d8e8dca7befd8452ba47" and "98bfac1dc310fa754c84a3541567306f248edbf0" have entirely different histories.
cb03c12d16
...
98bfac1dc3
@ -1,18 +1,193 @@
|
|||||||
from .action import Action, Publish, TasmotaShutter
|
import time
|
||||||
from .scene import Scene
|
import threading
|
||||||
|
import queue
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
|
from citadel.mqtt import Configuration
|
||||||
|
|
||||||
|
class Scene(threading.Thread, mqtt.Client):
|
||||||
|
""" A scene run as a thread, has his own mqtt session
|
||||||
|
|
||||||
|
Can be activated by topic:
|
||||||
|
|
||||||
|
cmnd/scene/{mqtt_name}/activate ON|OFF|null
|
||||||
|
|
||||||
|
And return status on topic:
|
||||||
|
|
||||||
|
stat/scene/{mqtt_name}/state ON|OFF
|
||||||
|
|
||||||
|
Special class attributes:
|
||||||
|
|
||||||
|
mqtt_name: define topic name, default class name in lower case
|
||||||
|
|
||||||
|
"""
|
||||||
|
TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate"
|
||||||
|
TOPIC_STAT = "stat/scene/{mqtt_name}/activate"
|
||||||
|
|
||||||
|
def __init__(self, config: Configuration, logger: logging.Logger=None):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
mqtt.Client.__init__(self)
|
||||||
|
|
||||||
|
self.configuration = config
|
||||||
|
self._scene_state = False
|
||||||
|
|
||||||
|
if not hasattr(self, 'mqtt_name'):
|
||||||
|
self.mqtt_name = self.__class__.__name__.lower()
|
||||||
|
|
||||||
|
if not logger:
|
||||||
|
logger = logging.getLogger('scene.%s' % self.mqtt_name)
|
||||||
|
self.logger = logger
|
||||||
|
|
||||||
|
self._pub_queue = queue.Queue()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def scene_state(self):
|
||||||
|
return self._scene_state
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.username_pw_set(self.configuration.user,
|
||||||
|
self.configuration.password)
|
||||||
|
self.connect(self.configuration.host,
|
||||||
|
self.configuration.port)
|
||||||
|
|
||||||
|
self.subscribe(self.TOPIC_CMND.format(mqtt_name=self.mqtt_name), 2)
|
||||||
|
|
||||||
|
for watch in self.WATCH:
|
||||||
|
self.subscribe(watch[0], 1)
|
||||||
|
|
||||||
|
run = True
|
||||||
|
while run:
|
||||||
|
self.loop()
|
||||||
|
try:
|
||||||
|
(args, kwargs) = self._pub_queue.get(block=False)
|
||||||
|
self.publish(*args, **kwargs)
|
||||||
|
except queue.Empty:
|
||||||
|
continue
|
||||||
|
|
||||||
|
def threaded_publish(self, *args, **kwargs):
|
||||||
|
self._pub_queue.put((args, kwargs))
|
||||||
|
|
||||||
|
def on_message(self, mqttc, obj, msg):
|
||||||
|
payload = msg.payload.decode().lower()
|
||||||
|
topic = msg.topic
|
||||||
|
|
||||||
|
# on CMND
|
||||||
|
if topic == self.TOPIC_CMND.format(mqtt_name=self.mqtt_name):
|
||||||
|
if payload == 'on':
|
||||||
|
self.logger.info('CMND activate ON')
|
||||||
|
self.__activate()
|
||||||
|
elif payload == 'off':
|
||||||
|
self.logger.info('CMND activate OFF')
|
||||||
|
self.__deactivate()
|
||||||
|
elif payload == '':
|
||||||
|
self.logger.info('CMND activate state')
|
||||||
|
self.__send_state()
|
||||||
|
else:
|
||||||
|
self.logger.warn('CMND activate invalide')
|
||||||
|
|
||||||
|
elif self.scene_state and self.__check_watched(topic, payload):
|
||||||
|
self.logger.info('Watched topic %s with invalid payload %s deactivating', topic, payload)
|
||||||
|
self.__deactivate()
|
||||||
|
|
||||||
|
def __check_watched(self, topic, payload):
|
||||||
|
""" return True if is a watched topic with an invalid payload """
|
||||||
|
for watch in self.WATCH:
|
||||||
|
if topic == watch[0]:
|
||||||
|
if payload != watch[1].lower():
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __activate(self):
|
||||||
|
#activate scene
|
||||||
|
# lock for activate again or deactivate
|
||||||
|
# start all activate action concurently
|
||||||
|
# start result catcher thread
|
||||||
|
# result catcher set active if ok else log error
|
||||||
|
|
||||||
|
results = queue.Queue()
|
||||||
|
|
||||||
|
for a in self.ON:
|
||||||
|
a.setup(self, results).start()
|
||||||
|
|
||||||
|
self._scene_state = True
|
||||||
|
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
|
||||||
|
payload='ON',
|
||||||
|
qos=2,
|
||||||
|
retain=False)
|
||||||
|
|
||||||
|
def __deactivate(self):
|
||||||
|
#deactivate scene
|
||||||
|
self._scene_state = False
|
||||||
|
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
|
||||||
|
payload='OFF',
|
||||||
|
qos=2,
|
||||||
|
retain=False)
|
||||||
|
|
||||||
|
def __send_state(self):
|
||||||
|
print(self._scene_state)
|
||||||
|
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
|
||||||
|
payload='ON' if self.scene_state else 'OFF',
|
||||||
|
qos=2,
|
||||||
|
retain=False)
|
||||||
|
|
||||||
|
class Action(threading.Thread):
|
||||||
|
OK = 0
|
||||||
|
FAILED = 1
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
|
||||||
|
def setup(self, proxy: Scene, result: queue.Queue):
|
||||||
|
"""
|
||||||
|
proxy: a thread safe proxy object with publish, subscribe method
|
||||||
|
result: a queue for put result
|
||||||
|
"""
|
||||||
|
self.scene = proxy
|
||||||
|
self.result = result
|
||||||
|
return self
|
||||||
|
|
||||||
|
def ok(self):
|
||||||
|
self.result.put(Action.OK)
|
||||||
|
|
||||||
|
def failed(self):
|
||||||
|
self.result.put(Action.FAILED)
|
||||||
|
|
||||||
|
class Publish(Action):
|
||||||
|
|
||||||
|
def __init__(self, topic: str):
|
||||||
|
Action.__init__(self)
|
||||||
|
self.topic = topic
|
||||||
|
self.payload = ''
|
||||||
|
self.qos = 0
|
||||||
|
|
||||||
|
def Payload(self, payload):
|
||||||
|
self.payload = payload
|
||||||
|
return self
|
||||||
|
|
||||||
|
def Qos(self, qos):
|
||||||
|
self.qos = qos
|
||||||
|
return self
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.scene.threaded_publish(self.topic,
|
||||||
|
payload=self.payload, qos=self.qos, retain=False)
|
||||||
|
self.ok()
|
||||||
|
|
||||||
|
|
||||||
class Sleep(Scene):
|
class Sleep(Scene):
|
||||||
|
|
||||||
ON = [
|
ON = [
|
||||||
Publish('cmnd/tasmota/screen/Screen/POWER').Payload('OFF').Qos(2)\
|
Publish('cmnd/tasmota/screen/Screen/POWER').Payload('OFF').Qos(2),
|
||||||
.CheckResponse('stat/tasmota/screen/Screen/POWER', 'OFF', 3),
|
|
||||||
Publish('cmnd/tasmota/light/LivingroomFireplace/POWER').Payload('OFF').Qos(2),
|
Publish('cmnd/tasmota/light/LivingroomFireplace/POWER').Payload('OFF').Qos(2),
|
||||||
TasmotaShutter('Bedroom1x1').Close(),
|
|
||||||
TasmotaShutter('Bedroom1x2').Close(),
|
|
||||||
TasmotaShutter('Staircase1').Close(),
|
|
||||||
TasmotaShutter('Bathroom').Close(),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
WATCH = [
|
WATCH = [
|
||||||
('stat/tasmota/screen/Screen/POWER', 'OFF')
|
('stat/tasmota/screen/Screen/POWER', 'OFF')
|
||||||
|
# ('cmnd/tasmota/shutter/Bedroom1x1/ShutterClose', ''),
|
||||||
|
# ('cmnd/tasmota/shutter/Bedroom1x2/ShutterClose', ''),
|
||||||
|
# ('cmnd/tasmota/shutter/Bathroom/ShutterClose', ''),
|
||||||
|
# ('cmnd/tasmota/shutter/Staicase1/ShutterClose', ''),
|
||||||
]
|
]
|
||||||
|
@ -4,7 +4,6 @@ import typer
|
|||||||
import systemd.journal
|
import systemd.journal
|
||||||
|
|
||||||
from citadel.mqtt import Configuration
|
from citadel.mqtt import Configuration
|
||||||
|
|
||||||
from . import Sleep
|
from . import Sleep
|
||||||
|
|
||||||
def main(\
|
def main(\
|
||||||
|
@ -1,108 +0,0 @@
|
|||||||
import time
|
|
||||||
|
|
||||||
from .helper import WaitForTopic
|
|
||||||
|
|
||||||
class Action(object):
|
|
||||||
SUCCESS = 0
|
|
||||||
FAILED = 1
|
|
||||||
|
|
||||||
def setup(self, proxy):
|
|
||||||
"""
|
|
||||||
proxy: a thread safe proxy object with publish, subscribe method
|
|
||||||
"""
|
|
||||||
self.proxy = proxy
|
|
||||||
return self
|
|
||||||
|
|
||||||
def ok(self):
|
|
||||||
return Action.SUCCESS
|
|
||||||
|
|
||||||
def failed(self):
|
|
||||||
return Action.FAILED
|
|
||||||
|
|
||||||
@property
|
|
||||||
def name(self):
|
|
||||||
return self.__class__.__name__
|
|
||||||
|
|
||||||
|
|
||||||
class Publish(Action):
|
|
||||||
|
|
||||||
def __init__(self, topic: str):
|
|
||||||
Action.__init__(self)
|
|
||||||
self.topic = topic
|
|
||||||
self.payload = ''
|
|
||||||
self.qos = 0
|
|
||||||
|
|
||||||
self.check_response = None
|
|
||||||
|
|
||||||
def Payload(self, payload):
|
|
||||||
self.payload = payload
|
|
||||||
return self
|
|
||||||
|
|
||||||
def Qos(self, qos):
|
|
||||||
self.qos = qos
|
|
||||||
return self
|
|
||||||
|
|
||||||
def CheckResponse(self, topic: str, payload: str=None, timeout: int=None):
|
|
||||||
self.check_response= {'topic': topic, 'payload':payload, 'timeout': timeout}
|
|
||||||
return self
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
checker = None
|
|
||||||
if self.check_response:
|
|
||||||
checker = WaitForTopic(self.proxy.configuration,
|
|
||||||
self.check_response['topic'],
|
|
||||||
self.check_response['payload'])
|
|
||||||
|
|
||||||
self.proxy.threaded_publish(self.topic,
|
|
||||||
payload=self.payload, qos=self.qos, retain=False)
|
|
||||||
|
|
||||||
if self.check_response:
|
|
||||||
checker.join(timeout=self.check_response['timeout'])
|
|
||||||
if checker.is_alive(): #timeout occur
|
|
||||||
checker.stop()
|
|
||||||
return self.failed()
|
|
||||||
|
|
||||||
return self.ok()
|
|
||||||
|
|
||||||
class TasmotaShutter(Action):
|
|
||||||
|
|
||||||
TOPIC_FORMAT = "cmnd/tasmota/shutter/{shutter_name}/{action}"
|
|
||||||
TOPIC_CHECK_FORMAT = "stat/tasmota/shutter/{shutter_name}/RESULT"
|
|
||||||
ACTION_OPEN = 'ShutterOpen'
|
|
||||||
ACTION_CLOSE = 'ShutterClose'
|
|
||||||
|
|
||||||
def __init__(self, shutter_name: str):
|
|
||||||
Action.__init__(self)
|
|
||||||
self.shutter_name = shutter_name
|
|
||||||
self.action = None
|
|
||||||
|
|
||||||
def Close(self):
|
|
||||||
self.action = self.ACTION_CLOSE
|
|
||||||
return self
|
|
||||||
|
|
||||||
def Open(self):
|
|
||||||
self.action = self.ACTION_OPEN
|
|
||||||
return self
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
if self.action is None:
|
|
||||||
raise RuntimeError('No action specified')
|
|
||||||
|
|
||||||
topic = self.TOPIC_FORMAT.format(
|
|
||||||
shutter_name = self.shutter_name,
|
|
||||||
action = self.action
|
|
||||||
)
|
|
||||||
|
|
||||||
checker = WaitForTopic(self.proxy.configuration,
|
|
||||||
self.TOPIC_CHECK_FORMAT.format(shutter_name = self.shutter_name),
|
|
||||||
payload= '{"Shutter1":{"Position":100,"Direction":0,"Target":100}}' \
|
|
||||||
if self.action == self.ACTION_OPEN \
|
|
||||||
else '{"Shutter1":{"Position":0,"Direction":0,"Target":0}}')
|
|
||||||
time.sleep(1)
|
|
||||||
self.proxy.threaded_publish(topic, qos=1, retain=False)
|
|
||||||
|
|
||||||
checker.join(timeout=35)
|
|
||||||
if checker.is_alive():
|
|
||||||
return self.failed()
|
|
||||||
|
|
||||||
return self.ok()
|
|
@ -1,65 +0,0 @@
|
|||||||
import threading
|
|
||||||
import logging
|
|
||||||
from typing import Union, Callable
|
|
||||||
|
|
||||||
import paho.mqtt.client as mqtt
|
|
||||||
|
|
||||||
from citadel.mqtt import Configuration
|
|
||||||
|
|
||||||
class WaitForTopic(threading.Thread, mqtt.Client):
|
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
config: Configuration,
|
|
||||||
topic: str,
|
|
||||||
payload: Union[str, Callable[[str],bool]],
|
|
||||||
logger: logging.Logger=None):
|
|
||||||
""" payload: str or function stop if payload is received or function return True, the function must have this signature func(payload: str) -> bool """
|
|
||||||
|
|
||||||
threading.Thread.__init__(self)
|
|
||||||
mqtt.Client.__init__(self)
|
|
||||||
|
|
||||||
self.configuration = config
|
|
||||||
|
|
||||||
self.topic = topic
|
|
||||||
|
|
||||||
if callable(payload):
|
|
||||||
self.check_payload = payload
|
|
||||||
else:
|
|
||||||
self.check_payload = lambda x: x == payload
|
|
||||||
|
|
||||||
self.daemon = True
|
|
||||||
|
|
||||||
if not logger:
|
|
||||||
logger = logging.getLogger('watcher')
|
|
||||||
self.logger = logger
|
|
||||||
|
|
||||||
self.__run = True
|
|
||||||
|
|
||||||
self.username_pw_set(self.configuration.user,
|
|
||||||
self.configuration.password)
|
|
||||||
self.connect(self.configuration.host,
|
|
||||||
self.configuration.port)
|
|
||||||
|
|
||||||
self.subscribe(self.topic, 1)
|
|
||||||
|
|
||||||
self.start()
|
|
||||||
|
|
||||||
def on_message(self, mqttc, obj, msg):
|
|
||||||
payload = msg.payload.decode()
|
|
||||||
topic = msg.topic
|
|
||||||
self.logger.debug('Topic: %s, Payload: %s', topic, payload)
|
|
||||||
|
|
||||||
if topic != self.topic:
|
|
||||||
self.logger.error('Invaliad topic: %s', topic)
|
|
||||||
return
|
|
||||||
|
|
||||||
if self.check_payload(payload):
|
|
||||||
self.__run = False
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.__run = False
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
while self.__run:
|
|
||||||
self.loop()
|
|
||||||
self.disconnect()
|
|
@ -1,169 +0,0 @@
|
|||||||
import threading
|
|
||||||
import logging
|
|
||||||
import queue
|
|
||||||
import concurrent.futures
|
|
||||||
|
|
||||||
import pebble
|
|
||||||
|
|
||||||
import paho.mqtt.client as mqtt
|
|
||||||
|
|
||||||
from citadel.mqtt import Configuration
|
|
||||||
|
|
||||||
from .action import Action
|
|
||||||
|
|
||||||
class Scene(threading.Thread, mqtt.Client):
|
|
||||||
""" A scene run as a thread, has his own mqtt session
|
|
||||||
|
|
||||||
Can be activated by topic:
|
|
||||||
|
|
||||||
cmnd/scene/{mqtt_name}/activate ON|OFF|null
|
|
||||||
|
|
||||||
And return status on topic:
|
|
||||||
|
|
||||||
stat/scene/{mqtt_name}/state ON|OFF
|
|
||||||
|
|
||||||
Special class attributes:
|
|
||||||
|
|
||||||
mqtt_name: define topic name, default class name in lower case
|
|
||||||
|
|
||||||
"""
|
|
||||||
TOPIC_CMND = "cmnd/scene/{mqtt_name}/activate"
|
|
||||||
TOPIC_STAT = "stat/scene/{mqtt_name}/activate"
|
|
||||||
|
|
||||||
STATE_DEACTIVATE = 0
|
|
||||||
STATE_ACTIVATE = 1
|
|
||||||
STATE_ACTIVATING = 2
|
|
||||||
|
|
||||||
def __init__(self, config: Configuration, logger: logging.Logger=None):
|
|
||||||
threading.Thread.__init__(self)
|
|
||||||
mqtt.Client.__init__(self)
|
|
||||||
|
|
||||||
self.configuration = config
|
|
||||||
|
|
||||||
if not hasattr(self, 'mqtt_name'):
|
|
||||||
self.mqtt_name = self.__class__.__name__.lower()
|
|
||||||
|
|
||||||
if not logger:
|
|
||||||
logger = logging.getLogger('scene.%s' % self.mqtt_name)
|
|
||||||
self.logger = logger
|
|
||||||
|
|
||||||
self.__pub_queue = queue.Queue()
|
|
||||||
self.__state = Scene.STATE_DEACTIVATE
|
|
||||||
|
|
||||||
self.__executor = None
|
|
||||||
self.__activating_r_futures = []
|
|
||||||
|
|
||||||
@property
|
|
||||||
def scene_state(self):
|
|
||||||
return self.__state
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
self.username_pw_set(self.configuration.user,
|
|
||||||
self.configuration.password)
|
|
||||||
self.connect(self.configuration.host,
|
|
||||||
self.configuration.port)
|
|
||||||
|
|
||||||
self.subscribe(self.TOPIC_CMND.format(mqtt_name=self.mqtt_name), 2)
|
|
||||||
|
|
||||||
for watch in self.WATCH:
|
|
||||||
self.subscribe(watch[0], 1)
|
|
||||||
|
|
||||||
run = True
|
|
||||||
while run:
|
|
||||||
#Mqtt client loop
|
|
||||||
self.loop()
|
|
||||||
|
|
||||||
#Check publish queue
|
|
||||||
try:
|
|
||||||
(args, kwargs) = self.__pub_queue.get(block=False)
|
|
||||||
self.publish(*args, **kwargs)
|
|
||||||
except queue.Empty:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Check for futures if activating
|
|
||||||
if self.scene_state == Scene.STATE_ACTIVATING:
|
|
||||||
for (action, futures) in self.__activating_r_futures:
|
|
||||||
try:
|
|
||||||
r = futures.result(timeout=0)
|
|
||||||
if r != Action.SUCCESS:
|
|
||||||
self.logger.error("Action: '%s' error", action.name)
|
|
||||||
self.logger.info('Activating error')
|
|
||||||
self.__executor.join()
|
|
||||||
self.__executor = None
|
|
||||||
self.__activating_r_futures = []
|
|
||||||
self.__state = Scene.STATE_DEACTIVATE
|
|
||||||
self.__send_state()
|
|
||||||
break
|
|
||||||
except concurrent.futures.TimeoutError:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.logger.info('Activating terminate')
|
|
||||||
self.__executor.join()
|
|
||||||
self.__executor = None
|
|
||||||
self.__activating_r_futures = []
|
|
||||||
self.__state = Scene.STATE_ACTIVATE
|
|
||||||
self.__send_state()
|
|
||||||
|
|
||||||
|
|
||||||
def threaded_publish(self, *args, **kwargs):
|
|
||||||
self.__pub_queue.put((args, kwargs))
|
|
||||||
|
|
||||||
def on_message(self, mqttc, obj, msg):
|
|
||||||
payload = msg.payload.decode().lower()
|
|
||||||
topic = msg.topic
|
|
||||||
|
|
||||||
# on CMND
|
|
||||||
if topic == self.TOPIC_CMND.format(mqtt_name=self.mqtt_name):
|
|
||||||
if payload == 'on':
|
|
||||||
self.logger.info('CMND activate ON')
|
|
||||||
self.__activate()
|
|
||||||
elif payload == 'off':
|
|
||||||
self.logger.info('CMND activate OFF')
|
|
||||||
self.__deactivate()
|
|
||||||
elif payload == '':
|
|
||||||
self.logger.info('CMND activate state')
|
|
||||||
self.__send_state()
|
|
||||||
else:
|
|
||||||
self.logger.warn('CMND activate invalide')
|
|
||||||
|
|
||||||
elif self.scene_state and self.__check_watched(topic, payload):
|
|
||||||
self.logger.info('Watched topic %s with invalid payload %s deactivating', topic, payload)
|
|
||||||
self.__deactivate()
|
|
||||||
|
|
||||||
def __check_watched(self, topic, payload):
|
|
||||||
""" return True if is a watched topic with an invalid payload """
|
|
||||||
for watch in self.WATCH:
|
|
||||||
if topic == watch[0]:
|
|
||||||
if payload != watch[1].lower():
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
return False
|
|
||||||
|
|
||||||
def __activate(self):
|
|
||||||
if self.scene_state:
|
|
||||||
self.logger.info('Already activate/activating ignore cmnd')
|
|
||||||
return
|
|
||||||
if self.__executor != None:
|
|
||||||
self.logger.error('Trying to activate but __executor not None')
|
|
||||||
return
|
|
||||||
|
|
||||||
self.__state = Scene.STATE_ACTIVATING
|
|
||||||
self.__executor = pebble.ThreadPool()
|
|
||||||
|
|
||||||
for a in self.ON:
|
|
||||||
self.__activating_r_futures.append(
|
|
||||||
(a, self.__executor.schedule(a.setup(self).run))
|
|
||||||
)
|
|
||||||
|
|
||||||
self.__executor.close()
|
|
||||||
self.__send_state()
|
|
||||||
|
|
||||||
def __deactivate(self):
|
|
||||||
self.__state = Scene.STATE_DEACTIVATE
|
|
||||||
self.__send_state()
|
|
||||||
|
|
||||||
def __send_state(self):
|
|
||||||
self.publish(self.TOPIC_STAT.format(mqtt_name=self.mqtt_name),
|
|
||||||
payload='ON' if self.scene_state else 'OFF',
|
|
||||||
qos=2,
|
|
||||||
retain=False)
|
|
Loading…
x
Reference in New Issue
Block a user