wrapyfi.connect package
Submodules
wrapyfi.connect.clients module
- class wrapyfi.connect.clients.Clients[source]
Bases:
object
A class that holds all clients and their corresponding middleware communicators.
- registry = {'AudioChunk:ros': <class 'wrapyfi.clients.ros.ROSAudioChunkClient'>, 'AudioChunk:ros2': <class 'wrapyfi.clients.ros2.ROS2AudioChunkClient'>, 'AudioChunk:yarp': <class 'wrapyfi.clients.yarp.YarpAudioChunkClient'>, 'AudioChunk:zeromq': <class 'wrapyfi.clients.zeromq.ZeroMQAudioChunkClient'>, 'Image:ros': <class 'wrapyfi.clients.ros.ROSImageClient'>, 'Image:ros2': <class 'wrapyfi.clients.ros2.ROS2ImageClient'>, 'Image:yarp': <class 'wrapyfi.clients.yarp.YarpImageClient'>, 'Image:zeromq': <class 'wrapyfi.clients.zeromq.ZeroMQImageClient'>, 'MMO:fallback': <class 'wrapyfi.clients.FallbackClient'>, 'NativeObject:ros': <class 'wrapyfi.clients.ros.ROSNativeObjectClient'>, 'NativeObject:ros2': <class 'wrapyfi.clients.ros2.ROS2NativeObjectClient'>, 'NativeObject:yarp': <class 'wrapyfi.clients.yarp.YarpNativeObjectClient'>, 'NativeObject:zeromq': <class 'wrapyfi.clients.zeromq.ZeroMQNativeObjectClient'>}
- mwares = {'fallback', 'ros', 'ros2', 'yarp', 'zeromq'}
- classmethod register(data_type: str, communicator: str)[source]
Register a client with the given data type and middleware communicator.
- Parameters:
data_type – str: The data type to register the client for e.g., “NativeObject”, “Image”, “AudioChunk”, etc.
communicator – str: The middleware communicator to register the client for e.g., “ros”, “ros2”, “yarp”, “zeromq”, etc.
- class wrapyfi.connect.clients.Client(name: str, in_topic: str, carrier: str = '', **kwargs)[source]
Bases:
object
A base class for clients.
wrapyfi.connect.listeners module
- class wrapyfi.connect.listeners.ListenerWatchDog(*args, **kwargs)[source]
Bases:
object
A watchdog that scans for listeners and removes them from the ring if they are not established.
- __init__(repeats: int = 10, inner_repeats: int = 10)[source]
Initialize the ListenerWatchDog.
- Parameters:
repeats – int: The number of times to repeat the scan
param inner_repeats: int: The number of times to repeat the scan for each listener
- add_listener(listener)[source]
Add a listener to the ring.
- Parameters:
listener – Listener: The listener to add
- class wrapyfi.connect.listeners.Listeners[source]
Bases:
object
A class that holds all listeners and their corresponding middleware communicators.
- registry = {}
- mwares = {}
- classmethod register(data_type: str, communicator: str)[source]
Register a listener with the given data type and middleware communicator.
- Parameters:
data_type – str: The data type to register the listener for e.g., “NativeObject”, “Image”, “AudioChunk”, etc.
communicator – str: The middleware communicator to register the listener for e.g., “ros”, “ros2”, “yarp”, “zeromq”, etc.
- class wrapyfi.connect.listeners.Listener(name: str, in_topic: str, carrier: str = '', should_wait: bool = True, **kwargs)[source]
Bases:
object
A base class for listeners.
- __init__(name: str, in_topic: str, carrier: str = '', should_wait: bool = True, **kwargs)[source]
Initialize the Listener.
- Parameters:
name – str: The name of the listener
in_topic – str: The topic to listen to
carrier – str: The middleware carrier to use
should_wait – bool: Whether to wait for the listener to be established or not
wrapyfi.connect.publishers module
- class wrapyfi.connect.publishers.PublisherWatchDog(*args, **kwargs)[source]
Bases:
object
A watchdog that scans for publishers and removes them from the ring if they are not established.
- __init__(repeats: int = 10, inner_repeats: int = 10)[source]
Initialize the PublisherWatchDog.
- Parameters:
repeats – int: The number of times to repeat the scan
param inner_repeats: int: The number of times to repeat the scan for each publisher
- add_publisher(publisher)[source]
Add a publisher to the ring.
- Parameters:
publisher – Publisher: The publisher to add
- class wrapyfi.connect.publishers.Publishers[source]
Bases:
object
A class that holds all publishers and their corresponding middleware communicators.
- registry = {}
- mwares = {}
- classmethod register(data_type: str, communicator: str)[source]
Register a publisher for a given data type and middleware communicator.
- Parameters:
data_type – str: The data type to register the publisher for e.g., “NativeObject”, “Image”, “AudioChunk”, etc.
communicator – str: The middleware communicator to register the publisher for e.g., “ros”, “ros2”, “yarp”, “zeromq”, etc.
- Returns:
Callable[…, Any]: A decorator function that registers the decorated class as a publisher for the given data type and middleware communicator
- class wrapyfi.connect.publishers.Publisher(name: str, out_topic: str, carrier: str = '', should_wait: bool = True, **kwargs)[source]
Bases:
object
A base class for all publishers.
- __init__(name: str, out_topic: str, carrier: str = '', should_wait: bool = True, **kwargs)[source]
Initialize the Publisher.
- Parameters:
name – str: The name of the publisher
out_topic – str: The name of the output topic
carrier – str: The name of the carrier to use
should_wait – bool: Whether to wait for the publisher to be established or not
wrapyfi.connect.servers module
- class wrapyfi.connect.servers.Servers[source]
Bases:
object
A class that holds all servers and their corresponding middleware communicators.
- registry = {}
- mwares = {}
- classmethod register(data_type: str, communicator: str)[source]
Register a server with the given data type and middleware communicator.
- Parameters:
data_type – str: The data type to register the server for e.g., “NativeObject”, “Image”, “AudioChunk”, etc.
communicator – str: The middleware communicator to register the server for e.g., “ros”, “ros2”, “yarp”, “zeromq”, etc.
- Returns:
Callable: A decorator that registers the server with the given data type and middleware communicator
- class wrapyfi.connect.servers.Server(name: str, out_topic: str, carrier: str = '', out_topic_connect: str | None = None, **kwargs)[source]
Bases:
object
A base class for servers.
- __init__(name: str, out_topic: str, carrier: str = '', out_topic_connect: str | None = None, **kwargs)[source]
Initialize the server.
- Parameters:
name – str: The name of the server
out_topic – str: The topic to publish to
carrier – str: The middleware carrier to use
out_topic_connect – str: The topic to connect to (this is deprecated and will be removed in the future since its usage is limited to YARP)
wrapyfi.connect.wrapper module
- class wrapyfi.connect.wrapper.MiddlewareCommunicator[source]
Bases:
object
- classmethod register(data_type: str | List[Any], middleware: str = 'zeromq', *args, **kwargs)[source]
Registers a function to the middleware communicator, defining its communication message type and associated middleware. Note that the function returned is a wrapper that can alter the behavior of the registered function based on the communication mode set in the middleware communicator registry.
- Parameters:
data_type – Union[str, List[Any]]: Specifies the communication message type, either as a string or a list containing specifications of the data type and middleware
middleware – str: Specifies the middleware to be used for the communication, defaults to DEFAULT_COMMUNICATOR
args – tuple: Variable positional arguments to be passed to the function
kwargs – dict: Additional keyword arguments to be passed to the function
- Returns:
Callable[…, Any]: A wrapper function that triggers specific communication modes (e.g., publish, listen, reply, request) based on the registered function and middleware settings
- Raises:
NotImplementedError: If data_type is a dictionary or an unsupported type
- activate_communication(func: str | Callable[[...], Any], mode: str | List[str])[source]
Activates the communication mode for a registered function in the middleware communicator. The mode determines how the function will interact with the middleware communicator upon invocation, e.g., whether it should publish its return values, listen for values, etc. The communication mode can be set for all instances of the function or individually per instance.
- Parameters:
func – Union[str, Callable[…, Any]]: The function or the name of the function for which the communication mode should be activated
mode – Union[str, List[str]]: Specifies the communication mode to be activated, either as a single mode (string) applicable to all instances, or as a list of modes (strings) per instance
- Raises:
IndexError: If mode is a list and the number of elements does not match the number of instances
- Raises:
ValueError: If the instance address cannot be found in the registry
- static get_communicators()[source]
Returns the available middleware communicators.
- Returns:
Set[str]: The available middleware communicators (excluding the fallback communicator)
- classmethod close_instance(instance_addr: str | None = None)[source]
Closes a specific instance of the middleware communicator. Note that the instance address is the hexadecimal representation of the instance’s id. If no instance address is provided, all instances will be closed.
- Parameters:
instance_addr – str: The unique identifier of the instance to be closed, defaults to None
- Raises:
ValueError: If the instance address cannot be found in the registry