commit d9a3a0aa311a02457a4dcb3cb78a9d791f754170 Author: Nicolas Duhamel Date: Mon Dec 28 10:14:37 2020 +0100 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c10d448 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +# Source for the following rules: https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + + +*.egg-info/ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1870a2e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..cf8cbc2 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,14 @@ +[metadata] +name = citadel.mqtt +version = 0.0.1 + +[options] +package_dir = + =src +packages = find_namespace: +install_requires = + paho-mqtt==1.5.1 + +[options.packages.find] +where = src + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..a4f49f9 --- /dev/null +++ b/setup.py @@ -0,0 +1,2 @@ +import setuptools +setuptools.setup() diff --git a/src/citadel/mqtt/__init__.py b/src/citadel/mqtt/__init__.py new file mode 100644 index 0000000..cd25ac0 --- /dev/null +++ b/src/citadel/mqtt/__init__.py @@ -0,0 +1,78 @@ +import logging +import queue +import functools +from collections.abc import Callable +from typing import NoReturn + +import paho.mqtt.client as mqtt + +class MQTTApp: + """ Manage MQTT connection and on topic callback with help of the @topic decorator """ + + def __init__(self, logger : logging.Logger=None): + """logger: specify a logger to use, default: mqttapp""" + self._mqtt_topic_route = {} + self._mqttc = None + + self._mqtt_host = None + self._mqtt_port = None + + self._exception_queue = queue.Queue() #get exceptions from callback + + self.logger = logger + + if self.logger is None: + self.logger = logging.getLogger('mqttapp') + + def topic(self,mqtt_topic : str) -> Callable: + """ A decorator for subscribe to a topic with decorated func as callback + and catch all exception inside the func to raise it when check is_healthy + method """ + def wrap(f): + @functools.wraps(f) + def wraped_f(*arg, **kwarg): + #add logger to userdata + arg[1]['logger'] = self.logger + try: + return f(*arg, **kwarg) + except Exception as e: + self._exception_queue.put(e) + self._mqtt_topic_route[mqtt_topic] = wraped_f + return wraped_f + return wrap + + def setup(self, host: str, port: int, user: str, pwd: str) -> NoReturn: + """ Setup mqtt connection, call it before loop_start() """ + self._mqttc = mqtt.Client(clean_session=True) + self._mqttc.username_pw_set(user, password=pwd) + self._mqttc.on_connect = self._on_connect + self._mqtt_host = host + self._mqtt_port = port + self._mqttc.user_data_set({}) + + def _on_connect(self, client, userdata, flags, rc): + for topic, func in self._mqtt_topic_route.items(): + self.logger.info("Add callback %s on %s" % (func.__name__,topic)) + self._mqttc.message_callback_add(topic, func) + self._mqttc.subscribe(topic) + + def loop_start(self) -> NoReturn: + self._mqttc.connect(self._mqtt_host, self._mqtt_port, 60) + self._mqttc.loop_start() + + def loop_stop(self) -> NoReturn: + self._mqttc.loop_stop() + + def is_healthy(self) -> bool: + """ Check for exception in callback thread, + and raise it in main thread """ + try: + exc = self._exception_queue.get(block=False) + except queue.Empty: + return True + else: + raise exc + + @property + def client(self): + return self._mqttc