Source code for wrapyfi.connect.publishers

import logging
import os
from glob import glob
from pathlib import Path

from wrapyfi.utils.core_utils import (
    SingletonOptimized,
    dynamic_module_import,
    scan_external,
    WRAPYFI_MWARE_PATHS,
)


[docs] class PublisherWatchDog(metaclass=SingletonOptimized): """ A watchdog that scans for publishers and removes them from the ring if they are not established. """
[docs] def __init__(self, repeats: int = 10, inner_repeats: int = 10): """ Initialize the PublisherWatchDog. :param repeats: int: The number of times to repeat the scan param inner_repeats: int: The number of times to repeat the scan for each publisher """ self.repeats = repeats self.inner_repeats = inner_repeats self.publisher_ring = []
[docs] def add_publisher(self, publisher): """ Add a publisher to the ring. :param publisher: Publisher: The publisher to add """ self.publisher_ring.append(publisher)
[docs] def remove_publisher(self, publisher): """ Remove a publisher from the ring. :param publisher: Publisher: The publisher to remove """ try: self.publisher_ring.remove(publisher) except ValueError: pass
[docs] def scan(self): """ Scan for publishers and remove them from the ring if they are established. """ repeats = self.repeats while self.publisher_ring and (repeats > 0 | repeats <= -1): repeats -= 1 for publisher in self.publisher_ring: found_publisher = publisher.establish(repeats=self.inner_repeats) if found_publisher: self.publisher_ring.remove(publisher)
[docs] class Publishers(object): """ A class that holds all publishers and their corresponding middleware communicators. """ registry = {} mwares = set()
[docs] @classmethod def register(cls, data_type: str, communicator: str): """ Register a publisher for a given data type and middleware communicator. :param data_type: str: The data type to register the publisher for e.g., "NativeObject", "Image", "AudioChunk", etc. :param communicator: str: The middleware communicator to register the publisher for e.g., "ros", "ros2", "yarp", "zeromq", etc. :return: Callable[..., Any]: A decorator function that registers the decorated class as a publisher for the given data type and middleware communicator """ def decorator(cls_): cls.registry[data_type + ":" + communicator] = cls_ cls.mwares.add(communicator) return cls_ return decorator
[docs] @staticmethod def scan(): """ Scan for publishers and add them to the registry. """ base_dir = Path(__file__).parent.parent / "publishers" modules = glob(str(base_dir / "*.py"), recursive=True) modules = [ "wrapyfi.publishers." + Path(module).relative_to(base_dir).as_posix() for module in modules ] dynamic_module_import(modules, globals()) scan_external(os.environ.get(WRAPYFI_MWARE_PATHS, ""), "publishers")
[docs] class Publisher(object): """ A base class for all publishers. """
[docs] def __init__( self, name: str, out_topic: str, carrier: str = "", should_wait: bool = True, **kwargs, ): """ Initialize the Publisher. :param name: str: The name of the publisher :param out_topic: str: The name of the output topic :param carrier: str: The name of the carrier to use :param should_wait: bool: Whether to wait for the publisher to be established or not """ self.__name__ = name self.out_topic = out_topic self.carrier = carrier self.should_wait = should_wait self.established = False
[docs] def check_establishment(self, established: bool): """ Check if the publisher is established and remove it from the ring if it is. :param established: bool: Whether the publisher is established or not :return: bool: Whether the publisher is established or not """ if established: self.established = True if not self.should_wait: PublisherWatchDog().remove_publisher(self) elif not self.should_wait: PublisherWatchDog().scan() if self in PublisherWatchDog().publisher_ring: established = False else: established = True return established
[docs] def establish(self, repeats: int = -1, **kwargs): """ Establish the publisher. """ raise NotImplementedError
[docs] def publish(self, obj): """ Publish an object. """ raise NotImplementedError
[docs] def close(self): """ Close the connection. """ raise NotImplementedError