Source code for wrapyfi.connect.servers

import logging
import os
from glob import glob
from typing import Optional
from pathlib import Path

from wrapyfi.utils.core_utils import (
    dynamic_module_import,
    scan_external,
    WRAPYFI_MWARE_PATHS,
)


[docs] class Servers(object): """ A class that holds all servers and their corresponding middleware communicators. """ registry = {} mwares = set()
[docs] @classmethod def register(cls, data_type: str, communicator: str): """ Register a server with the given data type and middleware communicator. :param data_type: str: The data type to register the server for e.g., "NativeObject", "Image", "AudioChunk", etc. :param communicator: str: The middleware communicator to register the server for e.g., "ros", "ros2", "yarp", "zeromq", etc. :return: Callable: A decorator that registers the server with the given data type and middleware communicator """ def decorator(cls_): cls.registry[data_type + ":" + communicator] = cls_ cls.mwares.add(communicator) return cls_ return decorator
[docs] @staticmethod def scan(): """ Scan for servers and add them to the registry. """ base_dir = Path(__file__).parent.parent / "servers" modules = glob(str(base_dir / "*.py"), recursive=True) modules = [ "wrapyfi.servers." + Path(module).relative_to(base_dir).as_posix() for module in modules ] dynamic_module_import(modules, globals()) scan_external(os.environ.get(WRAPYFI_MWARE_PATHS, ""), "servers")
[docs] class Server(object): """ A base class for servers. """
[docs] def __init__( self, name: str, out_topic: str, carrier: str = "", out_topic_connect: Optional[str] = None, **kwargs, ): """ Initialize the server. :param name: str: The name of the server :param out_topic: str: The topic to publish to :param carrier: str: The middleware carrier to use :param out_topic_connect: str: The topic to connect to (this is deprecated and will be removed in the future since its usage is limited to YARP) """ self.__name__ = name self.out_topic = out_topic self.carrier = carrier self.out_topic_connect = ( out_topic + ":out" if out_topic_connect is None else out_topic_connect ) self.established = False
[docs] def establish(self): """ Establish the server. """ raise NotImplementedError
[docs] def await_request(self, *args, **kwargs): """ Await a request from a client. """ raise NotImplementedError
[docs] def reply(self, obj): """ Reply to a client request. """ raise NotImplementedError
[docs] def close(self): """ Close the connection. """ raise NotImplementedError