Source code for wrapyfi.middlewares.zenoh

import logging
import threading
import atexit
import json

import zenoh

from wrapyfi.utils.core_utils import SingletonOptimized
from wrapyfi.connect.wrapper import MiddlewareCommunicator


[docs] class ZenohMiddlewarePubSub(metaclass=SingletonOptimized): """ Zenoh middleware wrapper with singleton pattern. The `activate` method initializes the middleware, and `deinit` handles cleanup. Configurations are initialized by merging any provided keyword arguments, environment variables, and direct `zenoh.Config` parameters. """ _instance = None # Singleton instance
[docs] @staticmethod def activate(config: zenoh.Config = None, **kwargs): """ Activates the Zenoh middleware. Initializes the Zenoh session by merging a provided configuration, environment variables, and additional keyword arguments. :param config: zenoh.Config: Optional Zenoh configuration; merged with environment and `kwargs` :param kwargs: dict: Additional settings for customization :return: ZenohMiddlewarePubSub instance """ zenoh.init_log_from_env_or("error") if ZenohMiddlewarePubSub._instance is None: ZenohMiddlewarePubSub._instance = ZenohMiddlewarePubSub( config=config, **kwargs ) return ZenohMiddlewarePubSub._instance
[docs] def __init__(self, config: zenoh.Config = None, **kwargs): """ Initializes the Zenoh session and sets up a clean exit with deinitialization. :param config: zenoh.Config: Configuration for Zenoh session """ logging.info("Initializing Zenoh middleware") # Initialize Zenoh session with configuration or default values self.config = config or self._merge_config_with_env(kwargs) self.session = zenoh.open(self.config) self.subscribers = {} # Ensure cleanup at exit atexit.register(MiddlewareCommunicator.close_all_instances) atexit.register(self.deinit)
def _merge_config_with_env(self, config_kwargs): """ Merges given configuration parameters with environment-based defaults. :param config_kwargs: dict: Direct configuration values to merge :return: zenoh.Config: Complete Zenoh configuration instance """ config = zenoh.Config() for key, value in config_kwargs.items(): config.insert_json5(key, json.dumps(value)) return config
[docs] def register_callback(self, topic: str, callback): """ Registers an event handler for a specific topic. :param topic: str: The topic to subscribe to :param callback: callable: Function to call upon receiving a message """ if topic not in self.subscribers: self.subscribers[topic] = self.session.declare_subscriber(topic, callback) logging.info(f"[ZenohMiddlewarePubSub] Registered callback for topic {topic}")
[docs] def is_connected(self) -> bool: """ Checks if the Zenoh session is active. :return: bool: True if connected, False otherwise """ return self.session is not None and not self.session.is_closed()
[docs] def deinit(self): """ Closes the Zenoh session upon exit. """ logging.info("[ZenohMiddlewarePubSub] Closing Zenoh session") self.session.close()