From 20bbd9661a714df74bb005a7f79da376ffe44f2d Mon Sep 17 00:00:00 2001 From: Nicolas Duhamel Date: Tue, 5 Jan 2021 17:58:02 +0100 Subject: [PATCH] Add new Client interface --- src/citadel/mqtt/__init__.py | 51 +++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/src/citadel/mqtt/__init__.py b/src/citadel/mqtt/__init__.py index cd25ac0..cf5d515 100644 --- a/src/citadel/mqtt/__init__.py +++ b/src/citadel/mqtt/__init__.py @@ -6,11 +6,56 @@ from typing import NoReturn import paho.mqtt.client as mqtt +class Client(mqtt.Client): + def __init__(self, logger: logging.Logger=None): + super().__init__() + + self.logger = logger + if self.logger is None: + self.logger = logging.getLogger('mqtt') + + self._subscribe_callbacks = [] + + def setup(self, host: str, port: int, username: str, password: str): + self._username = username + self._password = password + self._host = host + self._port = port + + def connect(self): + self.reconnect() + + def is_alive(self): + """ Check for loop thread health """ + return self._thread.is_alive() + + def join(self): + return self._thread.join() + + def subscribe_callback(self, topic: str, callback): + self.logger.info("Add callback %s for topic %s" % (callback, topic)) + self._subscribe_callbacks.append((topic,callback)) + self.message_callback_add(topic, callback) + + @property + def on_connect(self): + #TODO: called twice because of a boolean check before call + for (topic, callback) in self._subscribe_callbacks: + self.subscribe(topic) + return self._on_connect + + @on_connect.setter + def on_connect(self, func): + with self._callback_mutex: + self._on_connect = func + class MQTTApp: """ Manage MQTT connection and on topic callback with help of the @topic decorator """ def __init__(self, logger : logging.Logger=None): """logger: specify a logger to use, default: mqttapp""" + self.context = {} + self._mqtt_topic_route = {} self._mqttc = None @@ -24,6 +69,8 @@ class MQTTApp: if self.logger is None: self.logger = logging.getLogger('mqttapp') + self.context['logger'] = self.logger + def topic(self,mqtt_topic : str) -> Callable: """ A decorator for subscribe to a topic with decorated func as callback and catch all exception inside the func to raise it when check is_healthy @@ -31,8 +78,6 @@ class MQTTApp: def wrap(f): @functools.wraps(f) def wraped_f(*arg, **kwarg): - #add logger to userdata - arg[1]['logger'] = self.logger try: return f(*arg, **kwarg) except Exception as e: @@ -48,7 +93,7 @@ class MQTTApp: self._mqttc.on_connect = self._on_connect self._mqtt_host = host self._mqtt_port = port - self._mqttc.user_data_set({}) + self._mqttc.user_data_set(self.context) def _on_connect(self, client, userdata, flags, rc): for topic, func in self._mqtt_topic_route.items():