From 402cc2882224b1522375612bab1b8d3d518d9932 Mon Sep 17 00:00:00 2001 From: Nicolas Duhamel Date: Tue, 27 Jul 2021 14:52:18 +0200 Subject: [PATCH] support multiple callbacks per topic --- src/citadel/mqtt/__init__.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/citadel/mqtt/__init__.py b/src/citadel/mqtt/__init__.py index ccc1346..dfe5799 100644 --- a/src/citadel/mqtt/__init__.py +++ b/src/citadel/mqtt/__init__.py @@ -5,6 +5,7 @@ from collections.abc import Callable from typing import NoReturn import paho.mqtt.client as mqtt +from paho.mqtt.client import topic_matches_sub class Configuration: user: str @@ -20,8 +21,8 @@ class Client(mqtt.Client): if self.logger is None: self.logger = logging.getLogger('mqtt') - self._subscribe_callbacks = [] self._on_connect_callbacks = [] + self._subscribe_callbacks = {} self.on_connect = self._on_connect2 @@ -43,11 +44,26 @@ class Client(mqtt.Client): def subscribe_callback(self, topic: str, callback): self.logger.info("Add callback %s for topic %s" % (callback, topic)) - self._subscribe_callbacks.append((topic,callback)) - self.message_callback_add(topic, callback) + + if topic in self._subscribe_callbacks: + self._subscribe_callbacks[topic].append(callback) + else: + self._subscribe_callbacks[topic] = [callback] + + #register our callback for storing multiple callback for one topic + self.message_callback_add(topic, self.on_subscribe_callback) + + def on_subscribe_callback(self, client, userdata, msg): + self.logger.debug("Callback search for %s" % msg.topic) + for topic, callbacks in self._subscribe_callbacks.items(): + if topic_matches_sub(topic, msg.topic): + for func in callbacks: + self.logger.debug("Call %s for topic %s" % (func, topic)) + func(client, userdata, msg) + def _on_connect2(self, *args): - for (topic, callback) in self._subscribe_callbacks: + for topic in self._subscribe_callbacks.keys(): self.subscribe(topic) for func in self._on_connect_callbacks: func(self, *args)