wrapyfi.connect package

Submodules

wrapyfi.connect.clients module

class wrapyfi.connect.clients.Clients[source]

Bases: object

A class that holds all clients and their corresponding middleware communicators.

registry = {'AudioChunk:ros': <class 'wrapyfi.clients.ros.ROSAudioChunkClient'>, 'AudioChunk:ros2': <class 'wrapyfi.clients.ros2.ROS2AudioChunkClient'>, 'AudioChunk:yarp': <class 'wrapyfi.clients.yarp.YarpAudioChunkClient'>, 'AudioChunk:zeromq': <class 'wrapyfi.clients.zeromq.ZeroMQAudioChunkClient'>, 'Image:ros': <class 'wrapyfi.clients.ros.ROSImageClient'>, 'Image:ros2': <class 'wrapyfi.clients.ros2.ROS2ImageClient'>, 'Image:yarp': <class 'wrapyfi.clients.yarp.YarpImageClient'>, 'Image:zeromq': <class 'wrapyfi.clients.zeromq.ZeroMQImageClient'>, 'MMO:fallback': <class 'wrapyfi.clients.FallbackClient'>, 'NativeObject:ros': <class 'wrapyfi.clients.ros.ROSNativeObjectClient'>, 'NativeObject:ros2': <class 'wrapyfi.clients.ros2.ROS2NativeObjectClient'>, 'NativeObject:yarp': <class 'wrapyfi.clients.yarp.YarpNativeObjectClient'>, 'NativeObject:zeromq': <class 'wrapyfi.clients.zeromq.ZeroMQNativeObjectClient'>}
mwares = {'fallback', 'ros', 'ros2', 'yarp', 'zeromq'}
classmethod register(data_type: str, communicator: str)[source]

Register a client with the given data type and middleware communicator.

Parameters:
  • data_type – str: The data type to register the client for e.g., “NativeObject”, “Image”, “AudioChunk”, etc.

  • communicator – str: The middleware communicator to register the client for e.g., “ros”, “ros2”, “yarp”, “zeromq”, etc.

static scan()[source]

Scan for clients and add them to the registry.

class wrapyfi.connect.clients.Client(name: str, in_topic: str, carrier: str = '', **kwargs)[source]

Bases: object

A base class for clients.

__init__(name: str, in_topic: str, carrier: str = '', **kwargs)[source]

Initialize the client.

Parameters:
  • name – str: The name of the client

  • in_topic – str: The topic to listen to

  • carrier – str: The middleware carrier to use

establish()[source]

Establish the client.

request(*args, **kwargs)[source]

Send a request to the server.

close()[source]

Close the connection.

wrapyfi.connect.listeners module

class wrapyfi.connect.listeners.ListenerWatchDog(*args, **kwargs)[source]

Bases: object

A watchdog that scans for listeners and removes them from the ring if they are not established.

__init__(repeats: int = 10, inner_repeats: int = 10)[source]

Initialize the ListenerWatchDog.

Parameters:

repeats – int: The number of times to repeat the scan

param inner_repeats: int: The number of times to repeat the scan for each listener

add_listener(listener)[source]

Add a listener to the ring.

Parameters:

listener – Listener: The listener to add

remove_listener(listener)[source]

Remove a listener from the ring.

Parameters:

listener – Listener: The listener to remove

scan()[source]

Scan for listeners and remove them from the ring if they are established.

class wrapyfi.connect.listeners.Listeners[source]

Bases: object

A class that holds all listeners and their corresponding middleware communicators.

registry = {}
mwares = {}
classmethod register(data_type: str, communicator: str)[source]

Register a listener with the given data type and middleware communicator.

Parameters:
  • data_type – str: The data type to register the listener for e.g., “NativeObject”, “Image”, “AudioChunk”, etc.

  • communicator – str: The middleware communicator to register the listener for e.g., “ros”, “ros2”, “yarp”, “zeromq”, etc.

static scan()[source]

Scan for listeners and add them to the registry.

class wrapyfi.connect.listeners.Listener(name: str, in_topic: str, carrier: str = '', should_wait: bool = True, **kwargs)[source]

Bases: object

A base class for listeners.

__init__(name: str, in_topic: str, carrier: str = '', should_wait: bool = True, **kwargs)[source]

Initialize the Listener.

Parameters:
  • name – str: The name of the listener

  • in_topic – str: The topic to listen to

  • carrier – str: The middleware carrier to use

  • should_wait – bool: Whether to wait for the listener to be established or not

check_establishment(established: bool)[source]

Check if the listener is established or not.

Parameters:

established – bool: Whether the listener is established or not

Returns:

bool: Whether the listener is established or not

establish(repeats: int = -1, **kwargs)[source]

Establish the listener.

listen()[source]

Listen for incoming data.

close()[source]

Close the connection.

wrapyfi.connect.publishers module

class wrapyfi.connect.publishers.PublisherWatchDog(*args, **kwargs)[source]

Bases: object

A watchdog that scans for publishers and removes them from the ring if they are not established.

__init__(repeats: int = 10, inner_repeats: int = 10)[source]

Initialize the PublisherWatchDog.

Parameters:

repeats – int: The number of times to repeat the scan

param inner_repeats: int: The number of times to repeat the scan for each publisher

add_publisher(publisher)[source]

Add a publisher to the ring.

Parameters:

publisher – Publisher: The publisher to add

remove_publisher(publisher)[source]

Remove a publisher from the ring.

Parameters:

publisher – Publisher: The publisher to remove

scan()[source]

Scan for publishers and remove them from the ring if they are established.

class wrapyfi.connect.publishers.Publishers[source]

Bases: object

A class that holds all publishers and their corresponding middleware communicators.

registry = {}
mwares = {}
classmethod register(data_type: str, communicator: str)[source]

Register a publisher for a given data type and middleware communicator.

Parameters:
  • data_type – str: The data type to register the publisher for e.g., “NativeObject”, “Image”, “AudioChunk”, etc.

  • communicator – str: The middleware communicator to register the publisher for e.g., “ros”, “ros2”, “yarp”, “zeromq”, etc.

Returns:

Callable[…, Any]: A decorator function that registers the decorated class as a publisher for the given data type and middleware communicator

static scan()[source]

Scan for publishers and add them to the registry.

class wrapyfi.connect.publishers.Publisher(name: str, out_topic: str, carrier: str = '', should_wait: bool = True, **kwargs)[source]

Bases: object

A base class for all publishers.

__init__(name: str, out_topic: str, carrier: str = '', should_wait: bool = True, **kwargs)[source]

Initialize the Publisher.

Parameters:
  • name – str: The name of the publisher

  • out_topic – str: The name of the output topic

  • carrier – str: The name of the carrier to use

  • should_wait – bool: Whether to wait for the publisher to be established or not

check_establishment(established: bool)[source]

Check if the publisher is established and remove it from the ring if it is.

Parameters:

established – bool: Whether the publisher is established or not

Returns:

bool: Whether the publisher is established or not

establish(repeats: int = -1, **kwargs)[source]

Establish the publisher.

publish(obj)[source]

Publish an object.

close()[source]

Close the connection.

wrapyfi.connect.servers module

class wrapyfi.connect.servers.Servers[source]

Bases: object

A class that holds all servers and their corresponding middleware communicators.

registry = {}
mwares = {}
classmethod register(data_type: str, communicator: str)[source]

Register a server with the given data type and middleware communicator.

Parameters:
  • data_type – str: The data type to register the server for e.g., “NativeObject”, “Image”, “AudioChunk”, etc.

  • communicator – str: The middleware communicator to register the server for e.g., “ros”, “ros2”, “yarp”, “zeromq”, etc.

Returns:

Callable: A decorator that registers the server with the given data type and middleware communicator

static scan()[source]

Scan for servers and add them to the registry.

class wrapyfi.connect.servers.Server(name: str, out_topic: str, carrier: str = '', out_topic_connect: str | None = None, **kwargs)[source]

Bases: object

A base class for servers.

__init__(name: str, out_topic: str, carrier: str = '', out_topic_connect: str | None = None, **kwargs)[source]

Initialize the server.

Parameters:
  • name – str: The name of the server

  • out_topic – str: The topic to publish to

  • carrier – str: The middleware carrier to use

  • out_topic_connect – str: The topic to connect to (this is deprecated and will be removed in the future since its usage is limited to YARP)

establish()[source]

Establish the server.

await_request(*args, **kwargs)[source]

Await a request from a client.

reply(obj)[source]

Reply to a client request.

close()[source]

Close the connection.

wrapyfi.connect.wrapper module

class wrapyfi.connect.wrapper.MiddlewareCommunicator[source]

Bases: object

__init__()[source]

Initialises the middleware communicator.

classmethod register(data_type: str | List[Any], middleware: str = 'zeromq', *args, **kwargs)[source]

Registers a function to the middleware communicator, defining its communication message type and associated middleware. Note that the function returned is a wrapper that can alter the behavior of the registered function based on the communication mode set in the middleware communicator registry.

Parameters:
  • data_type – Union[str, List[Any]]: Specifies the communication message type, either as a string or a list containing specifications of the data type and middleware

  • middleware – str: Specifies the middleware to be used for the communication, defaults to DEFAULT_COMMUNICATOR

  • args – tuple: Variable positional arguments to be passed to the function

  • kwargs – dict: Additional keyword arguments to be passed to the function

Returns:

Callable[…, Any]: A wrapper function that triggers specific communication modes (e.g., publish, listen, reply, request) based on the registered function and middleware settings

Raises:

NotImplementedError: If data_type is a dictionary or an unsupported type

activate_communication(func: str | Callable[[...], Any], mode: str | List[str])[source]

Activates the communication mode for a registered function in the middleware communicator. The mode determines how the function will interact with the middleware communicator upon invocation, e.g., whether it should publish its return values, listen for values, etc. The communication mode can be set for all instances of the function or individually per instance.

Parameters:
  • func – Union[str, Callable[…, Any]]: The function or the name of the function for which the communication mode should be activated

  • mode – Union[str, List[str]]: Specifies the communication mode to be activated, either as a single mode (string) applicable to all instances, or as a list of modes (strings) per instance

Raises:

IndexError: If mode is a list and the number of elements does not match the number of instances

Raises:

ValueError: If the instance address cannot be found in the registry

close()[source]

Closes this middleware communicator instance.

static get_communicators()[source]

Returns the available middleware communicators.

Returns:

Set[str]: The available middleware communicators (excluding the fallback communicator)

classmethod close_all_instances()[source]

Closes all instances of the middleware communicator.

classmethod close_instance(instance_addr: str | None = None)[source]

Closes a specific instance of the middleware communicator. Note that the instance address is the hexadecimal representation of the instance’s id. If no instance address is provided, all instances will be closed.

Parameters:

instance_addr – str: The unique identifier of the instance to be closed, defaults to None

Raises:

ValueError: If the instance address cannot be found in the registry

Module contents