Source code for wrapyfi.middlewares.mqtt

import logging
import atexit
import threading

import paho.mqtt.client as mqtt_client

from wrapyfi.utils.core_utils import SingletonOptimized
from wrapyfi.connect.wrapper import MiddlewareCommunicator


[docs] class MqttMiddlewarePubSub(metaclass=SingletonOptimized): """ MQTT middleware wrapper. This class is a singleton, so it can be instantiated only once. The `activate` method should be called to initialize the middleware. The `deinit` method should be called to deinitialize the middleware and destroy all connections. """ _instance = None # Singleton instance
[docs] @staticmethod def activate( broker_address: str = "broker.emqx.io", broker_port: int = 1883, **kwargs, ): """ Activate the MQTT middleware. Initializes the MQTT client with the provided address and options. :param broker_address: str: The MQTT broker address :param broker_port: int: The MQTT broker port :param kwargs: dict: Additional keyword arguments for the middleware """ if MqttMiddlewarePubSub._instance is None: MqttMiddlewarePubSub._instance = MqttMiddlewarePubSub( broker_address=broker_address, broker_port=broker_port, **kwargs ) return MqttMiddlewarePubSub._instance
[docs] def __init__( self, broker_address: str = "broker.emqx.io", broker_port: int = 1883, client_id: str = None, **kwargs, ): """ Initialize the MQTT middleware. This method is automatically called when the class is instantiated. :param broker_address: str: The MQTT broker address :param broker_port: int: The MQTT broker port :param client_id: str: The MQTT client ID :param kwargs: dict: Additional keyword arguments for compatibility with the interface """ logging.info(f"Initializing MQTT middleware on {broker_address}:{broker_port}") # Store arguments, even if unused (for compatibility) self.broker_address = broker_address self.port = broker_port self.client_id = client_id # Create a MQTT client self.mqtt_client = mqtt_client.Client( mqtt_client.CallbackAPIVersion.VERSION2, client_id=self.client_id ) # Set up callbacks self.mqtt_client.on_connect = self._on_connect self.mqtt_client.on_disconnect = self._on_disconnect self.mqtt_client.on_message = self._on_message # Dictionary to store topic-specific callbacks self.topic_callbacks = {} # Track connection status self.connected = False # Start the connection in a background thread self.client_thread = threading.Thread(target=self._connect_client) self.client_thread.daemon = True self.client_thread.start() # Ensure cleanup at exit atexit.register(MiddlewareCommunicator.close_all_instances) atexit.register(self.deinit)
def _connect_client(self): """Connect the MQTT client.""" try: self.mqtt_client.connect(self.broker_address, self.port) # Use loop_start instead of loop_forever for non-blocking behavior self.mqtt_client.loop_start() except Exception as e: logging.error( f"[MqttMiddlewarePubSub] Error connecting to {self.broker_address}:{self.port}: {e}" ) def _on_connect(self, client, userdata, flags, rc, properties): """Callback for when the MQTT client connects to the broker.""" logging.info( f"[MqttMiddlewarePubSub] Connected to {self.broker_address}:{self.port}" ) self.connected = True def _on_disconnect(self, client, userdata, rc): """Callback for when the MQTT client disconnects from the broker.""" logging.info( f"[MqttMiddlewarePubSub] Disconnected from {self.broker_address}:{self.port}" ) self.connected = False def _on_message(self, client, userdata, msg): """Callback for when the MQTT client receives a message.""" if msg.topic in self.topic_callbacks: callback = self.topic_callbacks[msg.topic] callback(client, userdata, msg)
[docs] def register_callback(self, topic: str, callback): """ Register an event handler for a specific topic. :param topic: str: The topic to subscribe to :param callback: callable: The function to call when a message is received on this topic """ self.mqtt_client.subscribe(topic) self.topic_callbacks[topic] = callback logging.info(f"[MqttMiddlewarePubSub] Registered callback for topic {topic}")
[docs] def is_connected(self) -> bool: """ Check whether the MQTT client is connected. :return: bool: True if connected, False otherwise. """ return self.connected
[docs] @staticmethod def deinit(): """ Deinitialize the MQTT middleware. This method is automatically called when the program exits. """ logging.info("[MqttMiddlewarePubSub] Deinitializing MQTT client") if MqttMiddlewarePubSub._instance: MqttMiddlewarePubSub._instance.mqtt_client.loop_stop() MqttMiddlewarePubSub._instance.mqtt_client.disconnect() MqttMiddlewarePubSub._instance.connected = False