initial commit
This commit is contained in:
commit
d9a3a0aa31
8
.gitignore
vendored
Normal file
8
.gitignore
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
# Source for the following rules: https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
|
||||
*.egg-info/
|
4
pyproject.toml
Normal file
4
pyproject.toml
Normal file
@ -0,0 +1,4 @@
|
||||
[build-system]
|
||||
requires = ["setuptools", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
14
setup.cfg
Normal file
14
setup.cfg
Normal file
@ -0,0 +1,14 @@
|
||||
[metadata]
|
||||
name = citadel.mqtt
|
||||
version = 0.0.1
|
||||
|
||||
[options]
|
||||
package_dir =
|
||||
=src
|
||||
packages = find_namespace:
|
||||
install_requires =
|
||||
paho-mqtt==1.5.1
|
||||
|
||||
[options.packages.find]
|
||||
where = src
|
||||
|
78
src/citadel/mqtt/__init__.py
Normal file
78
src/citadel/mqtt/__init__.py
Normal file
@ -0,0 +1,78 @@
|
||||
import logging
|
||||
import queue
|
||||
import functools
|
||||
from collections.abc import Callable
|
||||
from typing import NoReturn
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
class MQTTApp:
|
||||
""" Manage MQTT connection and on topic callback with help of the @topic decorator """
|
||||
|
||||
def __init__(self, logger : logging.Logger=None):
|
||||
"""logger: specify a logger to use, default: mqttapp"""
|
||||
self._mqtt_topic_route = {}
|
||||
self._mqttc = None
|
||||
|
||||
self._mqtt_host = None
|
||||
self._mqtt_port = None
|
||||
|
||||
self._exception_queue = queue.Queue() #get exceptions from callback
|
||||
|
||||
self.logger = logger
|
||||
|
||||
if self.logger is None:
|
||||
self.logger = logging.getLogger('mqttapp')
|
||||
|
||||
def topic(self,mqtt_topic : str) -> Callable:
|
||||
""" A decorator for subscribe to a topic with decorated func as callback
|
||||
and catch all exception inside the func to raise it when check is_healthy
|
||||
method """
|
||||
def wrap(f):
|
||||
@functools.wraps(f)
|
||||
def wraped_f(*arg, **kwarg):
|
||||
#add logger to userdata
|
||||
arg[1]['logger'] = self.logger
|
||||
try:
|
||||
return f(*arg, **kwarg)
|
||||
except Exception as e:
|
||||
self._exception_queue.put(e)
|
||||
self._mqtt_topic_route[mqtt_topic] = wraped_f
|
||||
return wraped_f
|
||||
return wrap
|
||||
|
||||
def setup(self, host: str, port: int, user: str, pwd: str) -> NoReturn:
|
||||
""" Setup mqtt connection, call it before loop_start() """
|
||||
self._mqttc = mqtt.Client(clean_session=True)
|
||||
self._mqttc.username_pw_set(user, password=pwd)
|
||||
self._mqttc.on_connect = self._on_connect
|
||||
self._mqtt_host = host
|
||||
self._mqtt_port = port
|
||||
self._mqttc.user_data_set({})
|
||||
|
||||
def _on_connect(self, client, userdata, flags, rc):
|
||||
for topic, func in self._mqtt_topic_route.items():
|
||||
self.logger.info("Add callback %s on %s" % (func.__name__,topic))
|
||||
self._mqttc.message_callback_add(topic, func)
|
||||
self._mqttc.subscribe(topic)
|
||||
|
||||
def loop_start(self) -> NoReturn:
|
||||
self._mqttc.connect(self._mqtt_host, self._mqtt_port, 60)
|
||||
self._mqttc.loop_start()
|
||||
|
||||
def loop_stop(self) -> NoReturn:
|
||||
self._mqttc.loop_stop()
|
||||
|
||||
def is_healthy(self) -> bool:
|
||||
""" Check for exception in callback thread,
|
||||
and raise it in main thread """
|
||||
try:
|
||||
exc = self._exception_queue.get(block=False)
|
||||
except queue.Empty:
|
||||
return True
|
||||
else:
|
||||
raise exc
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
return self._mqttc
|
Loading…
x
Reference in New Issue
Block a user