wrapyfi.listeners package

Submodules

wrapyfi.listeners.mqtt module

class wrapyfi.listeners.mqtt.MqttListener(name: str, in_topic: str, should_wait: bool = True, broker_address: str = 'broker.emqx.io', broker_port: int = 1883, mqtt_kwargs: dict | None = None, **kwargs)[source]

Bases: Listener

__init__(name: str, in_topic: str, should_wait: bool = True, broker_address: str = 'broker.emqx.io', broker_port: int = 1883, mqtt_kwargs: dict | None = None, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • broker_address – str: Address of the MQTT broker. Default is ‘broker.emqx.io’

  • broker_port – int: Port of the MQTT broker. Default is 1883

  • mqtt_kwargs – dict: Additional kwargs for the MQTT middleware

  • kwargs – dict: Additional kwargs for the subscriber

await_connection(in_topic: str | None = None, repeats: int | None = None)[source]

Wait until the MQTT connection is established.

Parameters:
  • in_topic – str: The topic to monitor for connection

  • repeats – int: The number of times to check for the connection, None for infinite.

close()[source]

Close the subscriber.

class wrapyfi.listeners.mqtt.MqttNativeObjectListener(name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

Bases: MqttListener

__init__(name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject listener using the MQTT message construct assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a native object.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • deserializer_kwargs – dict: Additional kwargs for the deserializer

on_message(client, userdata, msg)[source]

Callback for handling incoming MQTT messages.

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen()[source]

Listen for a message.

Returns:

Any: The received message as a native Python object

class wrapyfi.listeners.mqtt.MqttImageListener(name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

Bases: MqttNativeObjectListener

__init__(name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

The Image listener using the MQTT message construct parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • width – int: Width of the image. Default is -1 (use the width of the received image)

  • height – int: Height of the image. Default is -1 (use the height of the received image)

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be decompressed from JPG. Default is False

on_message(client, userdata, msg)[source]

Callback for handling incoming image messages.

listen() numpy.ndarray | None[source]

Listen for a message.

Returns:

np.ndarray: The received image as a numpy array formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.listeners.mqtt.MqttAudioChunkListener(name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: MqttNativeObjectListener

__init__(name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunk listener using the MQTT message construct parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • channels – int: Number of channels in the audio. Default is 1

  • rate – int: Sampling rate of the audio. Default is 44100

  • chunk – int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio)

on_message(client, userdata, msg)[source]

Callback for handling incoming audio chunk messages.

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen() Tuple[numpy.ndarray | None, int][source]

Listen for a message.

Returns:

Tuple[np.ndarray, int]: The received audio chunk as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.listeners.mqtt.MqttPropertiesListener(name, in_topic, **kwargs)[source]

Bases: MqttNativeObjectListener

__init__(name, in_topic, **kwargs)[source]

The NativeObject listener using the MQTT message construct assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a native object.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • deserializer_kwargs – dict: Additional kwargs for the deserializer

wrapyfi.listeners.ros module

class wrapyfi.listeners.ros.ROSListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, ros_kwargs: dict | None = None, **kwargs)[source]

Bases: Listener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, ros_kwargs: dict | None = None, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • ros_kwargs – dict: Additional kwargs for the ROS middleware

  • kwargs – dict: Additional kwargs for the subscriber

close()[source]

Close the subscriber

class wrapyfi.listeners.ros.ROSNativeObjectListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, deserializer_kwargs: dict | None = None, **kwargs)[source]

Bases: ROSListener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, deserializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject listener using the ROS String message assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a Python object.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • deserializer_kwargs – dict: Additional kwargs for the deserializer

establish()[source]

Establish the subscriber.

listen() Any[source]

Listen for a message.

Returns:

Any: The received message as a native python object

class wrapyfi.listeners.ros.ROSImageListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

Bases: ROSListener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

The Image listener using the ROS Image message parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • width – int: Width of the image. Default is -1 (use the width of the received image)

  • height – int: Height of the image. Default is -1 (use the height of the received image)

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be decompressed from JPG. Default is False

establish()[source]

Establish the subscriber.

listen()[source]

Listen for a message.

Returns:

np.ndarray: The received message as a numpy array formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.listeners.ros.ROSAudioChunkListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: ROSListener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunk listener using the ROS Image message parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • channels – int: Number of channels in the audio. Default is 1

  • rate – int: Sampling rate of the audio. Default is 44100

  • chunk – int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio)

establish()[source]

Establish the subscriber.

listen()[source]

Listen for a message.

Returns:

Tuple[np.ndarray, int]: The received message as a numpy array formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.listeners.ros.ROSPropertiesListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

Bases: ROSListener

Gets rospy parameters. Behaves differently from other data types by directly acquiring ROS parameters. Note that the listener is not guaranteed to receive the updated signal, since the listener can trigger before property is set. The property decorated method returns accept native python objects (excluding None), but care should be taken when using dictionaries, since they are analogous with node namespaces: http://wiki.ros.org/rospy/Overview/Parameter%20Server

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

The PropertiesListener using the ROS Parameter Server.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for a parameter to be set. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

await_connection(in_topic: int | None = None, repeats: int | None = None)[source]

Wait for a parameter to be set.

Parameters:
  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • repeats – int: Number of times to check for the parameter. None for infinite. Default is None

establish(repeats: int | None = None, **kwargs)[source]

Establish the subscriber.

Parameters:

repeats – int: Number of times to check for the parameter. None for infinite. Default is None

listen()[source]

Listen for a message.

Returns:

Any: The received message as a native python object

class wrapyfi.listeners.ros.ROSMessageListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

Bases: ROSListener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

The ROSMessageListener using the ROS message type inferred from the message type. Supports standard ROS msgs.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ROS currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for a message to be published. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

establish()[source]

Establish the subscriber.

listen()[source]

Listen for a message.

Returns:

rospy.msg: The received message as a ROS message object

wrapyfi.listeners.ros2 module

class wrapyfi.listeners.ros2.ROS2Listener(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, ros2_kwargs: dict | None = None, **kwargs)[source]

Bases: Listener, Node

__init__(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, ros2_kwargs: dict | None = None, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • ros2_kwargs – dict: Additional kwargs for the ROS 2 middleware

  • kwargs – dict: Additional kwargs for the subscriber

close()[source]

Close the subscriber.

class wrapyfi.listeners.ros2.ROS2NativeObjectListener(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, deserializer_kwargs: dict | None = None, **kwargs)[source]

Bases: ROS2Listener

__init__(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, deserializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject listener using the ROS 2 String message assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a native object.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • deserializer_kwargs – dict: Additional kwargs for the deserializer

establish()[source]

Establish the subscriber.

listen()[source]

Listen for a message.

Returns:

Any: The received message as a native python object

class wrapyfi.listeners.ros2.ROS2ImageListener(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

Bases: ROS2Listener

__init__(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

The Image listener using the ROS 2 Image message parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • width – int: Width of the image. Default is -1 (use the width of the received image)

  • height – int: Height of the image. Default is -1 (use the height of the received image)

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be decompressed from JPG. Default is False

establish()[source]

Establish the subscriber

listen()[source]

Listen for a message.

Returns:

np.ndarray: The received message as a numpy array formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.listeners.ros2.ROS2AudioChunkListener(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: ROS2Listener

__init__(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunk listener using the ROS 2 Audio message parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • channels – int: Number of channels in the audio. Default is 1

  • rate – int: Sampling rate of the audio. Default is 44100

  • chunk – int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio)

establish()[source]

Establish the subscriber.

listen()[source]

Listen for a message.

Returns:

Tuple[np.ndarray, int]: The received message as a numpy array formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.listeners.ros2.ROS2PropertiesListener(name, in_topic, **kwargs)[source]

Bases: ROS2Listener

__init__(name, in_topic, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • ros2_kwargs – dict: Additional kwargs for the ROS 2 middleware

  • kwargs – dict: Additional kwargs for the subscriber

class wrapyfi.listeners.ros2.ROS2MessageListener(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

Bases: ROS2Listener

__init__(name: str, in_topic: str, should_wait: bool = True, queue_size: int = 5, **kwargs)[source]

The ROS2MessageListener using the ROS 2 message type inferred from the message type. Supports standard ROS 2 msgs.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

get_topic_type(topic_name)[source]

Get the type of a specific topic.

Parameters:

topic_name – str: Name of the topic to get its type

Returns:

str or None: The topic type as a string, or None if the topic does not exist

establish()[source]

Establish the subscriber.

listen()[source]

Listen for a message.

Returns:

ROS2Message: The received message as a ROS 2 message object

wrapyfi.listeners.websocket module

class wrapyfi.listeners.websocket.WebSocketListener(name: str, in_topic: str, should_wait: bool = True, socket_ip: str = '127.0.0.1', socket_port: int = 5000, namespace: str = '/', websocket_kwargs: dict | None = None, **kwargs)[source]

Bases: Listener

__init__(name: str, in_topic: str, should_wait: bool = True, socket_ip: str = '127.0.0.1', socket_port: int = 5000, namespace: str = '/', websocket_kwargs: dict | None = None, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic (e.g., ‘topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • socket_ip – str: IP address of the socket. Default is ‘127.0.0.1’

  • socket_port – int: Port of the socket for publishing. Default is 5000

  • namespace – str: Namespace of the WebSocket. Default is ‘/’

  • websocket_kwargs – dict: Additional kwargs for the WebSocket middleware

  • kwargs – dict: Additional kwargs for the subscriber

await_connection(in_topic: str | None = None, repeats: int | None = None)[source]

Wait until the WebSocket connection is established.

Parameters:
  • in_topic – str: The topic to monitor for connection

  • repeats – int: The number of times to check for the connection, None for infinite.

close()[source]

Close the subscriber.

class wrapyfi.listeners.websocket.WebSocketNativeObjectListener(name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

Bases: WebSocketListener

__init__(name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject listener using the WebSocket message construct assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a native object.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic (e.g., ‘topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • deserializer_kwargs – dict: Additional kwargs for the deserializer

on_message(data)[source]
establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen()[source]

Listen for a message.

Returns:

Any: The received message as a native Python object

class wrapyfi.listeners.websocket.WebSocketImageListener(name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

Bases: WebSocketNativeObjectListener

__init__(name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

The Image listener using the WebSocket message construct parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic (e.g., ‘topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • width – int: Width of the image. Default is -1 (use the width of the received image)

  • height – int: Height of the image. Default is -1 (use the height of the received image)

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be decompressed from JPG. Default is False

on_message(data)[source]

Callback for handling incoming image messages.

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen()[source]

Listen for a message.

Returns:

np.ndarray: The received image as a numpy array formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.listeners.websocket.WebSocketAudioChunkListener(name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: WebSocketNativeObjectListener

__init__(name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunk listener using the WebSocket message construct parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic (e.g., ‘topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • channels – int: Number of channels in the audio. Default is 1

  • rate – int: Sampling rate of the audio. Default is 44100

  • chunk – int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio)

on_message(data)[source]

Callback for handling incoming audio messages.

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen()[source]

Listen for a message.

Returns:

Tuple[np.ndarray, int]: The received audio chunk as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.listeners.websocket.WebSocketPropertiesListener(name, in_topic, **kwargs)[source]

Bases: WebSocketNativeObjectListener

__init__(name, in_topic, **kwargs)[source]

The NativeObject listener using the WebSocket message construct assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a native object.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic (e.g., ‘topic’)

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • deserializer_kwargs – dict: Additional kwargs for the deserializer

wrapyfi.listeners.yarp module

class wrapyfi.listeners.yarp.YarpListener(name: str, in_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, yarp_kwargs: dict | None = None, **kwargs)[source]

Bases: Listener

__init__(name: str, in_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, yarp_kwargs: dict | None = None, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the publisher

  • in_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • persistent – bool: Whether the subscriber port should remain connected after closure. Default is True

  • yarp_kwargs – dict: Additional kwargs for the Yarp middleware

  • kwargs – dict: Additional kwargs for the subscriber

await_connection(in_topic: str | None = None, repeats: int | None = None)[source]

Wait for the publisher to connect to the subscriber.

Parameters:
  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • repeats – int: Number of times to check for the parameter. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

read_port(port)[source]

Read the port.

Parameters:

port – yarp.Port: Port to read from

Returns:

yarp.Value: Value read from the port

close()[source]

Close the subscriber

class wrapyfi.listeners.yarp.YarpNativeObjectListener(name: str, in_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

Bases: YarpListener

__init__(name: str, in_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject listener using the BufferedPortBottle string construct assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a Python object.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • persistent – bool: Whether the subscriber port should remain connected after closure. Default is True

  • deserializer_kwargs – dict: Additional kwargs for the deserializer

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen()[source]

Listen for a message.

Returns:

Any: The received message as a native python object

class wrapyfi.listeners.yarp.YarpImageListener(name: str, in_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

Bases: YarpListener

__init__(name: str, in_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

The Image listener using the BufferedPortImage construct parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • persistent – bool: Whether the subscriber port should remain connected after closure. Default is True

  • width – int: Width of the image. Default is -1 (use the width of the received image)

  • height – int: Height of the image. Default is -1 (use the height of the received image)

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be decompressed from JPG. Default is False

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen()[source]

Listen for a message.

Returns:

np.ndarray: The received message as a numpy array formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.listeners.yarp.YarpAudioChunkListener(name: str, in_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: YarpListener

__init__(name: str, in_topic: str, carrier: Literal['tcp', 'udp', 'mcast'] = 'tcp', should_wait: bool = True, persistent: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunk listener using the Sound construct parsed as a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • persistent – bool: Whether the subscriber port should remain connected after closure. Default is True

  • channels – int: Number of channels in the audio. Default is 1

  • rate – int: Sampling rate of the audio. Default is 44100

  • chunk – int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio)

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen()[source]

Listen for a message.

Returns:

Tuple[np.ndarray, int]: The received message as a numpy array formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.listeners.yarp.YarpPropertiesListener(name, in_topic, **kwargs)[source]

Bases: YarpListener

__init__(name, in_topic, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the publisher

  • in_topic – str: Name of the output topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • persistent – bool: Whether the subscriber port should remain connected after closure. Default is True

  • yarp_kwargs – dict: Additional kwargs for the Yarp middleware

  • kwargs – dict: Additional kwargs for the subscriber

wrapyfi.listeners.zenoh module

class wrapyfi.listeners.zenoh.ZenohListener(name: str, in_topic: str, should_wait: bool = True, ip: str = '127.0.0.1', port: int = 7447, mode: str = 'peer', zenoh_kwargs: dict | None = None, **kwargs)[source]

Bases: Listener

Base Zenoh listener class that configures and initializes Zenoh middleware. Merges listener-specific settings and environment configurations, and awaits connection.

__init__(name: str, in_topic: str, should_wait: bool = True, ip: str = '127.0.0.1', port: int = 7447, mode: str = 'peer', zenoh_kwargs: dict | None = None, **kwargs)[source]

Initializes the Zenoh listener with environment or parameter-based configurations and waits for connection if specified.

Parameters:
  • name – str: Name of the listener

  • in_topic – str: Topic name

  • should_wait – bool: Whether to block until a message is received

  • ip – str: IP address for the Zenoh connection. Default is ‘127.0.0.1’

  • port – int: Port for the Zenoh connection. Default is 7447

  • mode – str: Mode for Zenoh session (peer or client)

  • zenoh_kwargs – dict: Additional Zenoh configuration options, overridden by env variables

  • kwargs – dict: Additional options for the listener

await_connection(in_topic: str | None = None, repeats: int = -1)[source]

Waits for the Zenoh connection to be established.

Parameters:
  • in_topic – str: Topic name for connection

  • repeats – int: Number of retry attempts

Returns:

bool: True if connection is established, False otherwise

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

close()[source]

Closes the Zenoh listener. This can be overridden by child classes to add cleanup operations.

class wrapyfi.listeners.zenoh.ZenohNativeObjectListener(name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

Bases: ZenohListener

__init__(name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

Zenoh NativeObject listener for handling JSON-encoded native objects. Decodes incoming messages to native Python objects using JsonDecodeHook.

Parameters:
  • name – str: Name of the listener

  • in_topic – str: Name of the input topic

  • should_wait – bool: Whether to wait for messages

  • deserializer_kwargs – dict: Keyword arguments for the JSON deserializer

on_message(sample)[source]

Handles incoming messages by decoding JSON into native objects using JsonDecodeHook.

Parameters:

sample – zenoh.Sample: The Zenoh sample received

listen()[source]

Listen for a message, ensuring the connection is established.

Returns:

Any: The received message as a native Python object

class wrapyfi.listeners.zenoh.ZenohImageListener(name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, jpg: bool | dict = False, **kwargs)[source]

Bases: ZenohNativeObjectListener

__init__(name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, jpg: bool | dict = False, **kwargs)[source]

Zenoh Image listener for handling image messages. Converts incoming data to OpenCV images, supporting JPEG and raw formats.

Parameters:
  • name – str: Name of the listener

  • in_topic – str: Name of the input topic

  • should_wait – bool: Whether to wait for messages

  • width – int: Expected image width, -1 to use received width

  • height – int: Expected image height, -1 to use received height

  • rgb – bool: True if the image is RGB, False if grayscale

  • jpg – bool: True if the image is JPEG-compressed

on_message(sample)[source]

Handles incoming image messages, converting data to OpenCV format.

Parameters:

sample – zenoh.Sample: Zenoh sample payload

listen()[source]

Listen for a message, ensuring the connection is established.

Returns:

np.ndarray: The received image as an OpenCV-formatted array

class wrapyfi.listeners.zenoh.ZenohAudioChunkListener(name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: ZenohNativeObjectListener

__init__(name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Zenoh AudioChunk listener for handling audio messages. Converts incoming data to numpy arrays for audio processing.

Parameters:
  • name – str: Name of the listener

  • in_topic – str: Name of the input topic

  • should_wait – bool: Whether to wait for messages

  • channels – int: Number of audio channels

  • rate – int: Sampling rate of the audio

  • chunk – int: Number of samples in the audio chunk

on_message(sample)[source]

Processes incoming audio messages into structured numpy arrays.

Parameters:

sample – zenoh.Sample: Zenoh sample payload

listen()[source]

Listen for a message, ensuring the connection is established.

Returns:

Tuple[np.ndarray, int]: The received audio chunk and sampling rate

wrapyfi.listeners.zeromq module

class wrapyfi.listeners.zeromq.ZeroMQListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, socket_ip: str = '127.0.0.1', socket_pub_port: int = 5555, pubsub_monitor_topic: str = 'ZEROMQ/CONNECTIONS', start_pubsub_monitor_broker: bool = True, pubsub_monitor_listener_spawn: str | None = 'process', zeromq_kwargs: dict | None = None, **kwargs)[source]

Bases: Listener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, socket_ip: str = '127.0.0.1', socket_pub_port: int = 5555, pubsub_monitor_topic: str = 'ZEROMQ/CONNECTIONS', start_pubsub_monitor_broker: bool = True, pubsub_monitor_listener_spawn: str | None = 'process', zeromq_kwargs: dict | None = None, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • socket_ip – str: IP address of the socket. Default is ‘127.0.0.1

  • socket_pub_port – int: Port of the socket for publishing. Note that the subscriber listens directly to this port which is proxied . Default is 5555

  • pubsub_monitor_topic – str: Topic to monitor the connections. Default is ‘ZEROMQ/CONNECTIONS’

  • start_pubsub_monitor_broker – bool: Whether to start a topic monitor to enable connection establishment checking. Default is True

  • pubsub_monitor_listener_spawn – str: Whether to spawn the PUB/SUB monitor listener as a process or thread. Default is ‘process’

  • zeromq_kwargs – dict: Additional kwargs for the ZeroMQ middleware

  • kwargs – dict: Additional kwargs for the subscriber

await_connection(socket=None, in_topic: str | None = None, repeats: int | None = None)[source]

Wait for the connection to be established.

Parameters:
  • socket – zmq.Socket: Socket to await connection to

  • in_topic – str: Name of the input topic

  • repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

read_socket(socket)[source]

Read the socket.

Parameters:

socket – zmq.Socket: Socket to read from

Returns:

bytes: Data read from the socket

Returns:

yarp.Value: Value read from the port

close()[source]

Close the subscriber.

class wrapyfi.listeners.zeromq.ZeroMQNativeObjectListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

Bases: ZeroMQListener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, deserializer_kwargs: dict | None = None, **kwargs)[source]

The NativeObject listener using the ZeroMQ message construct assuming the data is serialized as a JSON string. Deserializes the data (including plugins) using the decoder and parses it to a native object.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • deserializer_kwargs – dict: Additional kwargs for the deserializer

establish(repeats: int | None = None, **kwargs)[source]

Establish the connection to the publisher.

Parameters:

repeats – int: Number of repeats to await connection. None for infinite. Default is None

Returns:

bool: True if connection established, False otherwise

listen()[source]

Listen for a message.

Returns:

Any: The received message as a native python object

class wrapyfi.listeners.zeromq.ZeroMQImageListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

Bases: ZeroMQNativeObjectListener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, fp: bool = False, jpg: bool | dict = False, **kwargs)[source]

The Image listener using the ZeroMQ message construct parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol (e.g. ‘tcp’). Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • width – int: Width of the image. Default is -1 (use the width of the received image)

  • height – int: Height of the image. Default is -1 (use the height of the received image)

  • rgb – bool: True if the image is RGB, False if it is grayscale. Default is True

  • fp – bool: True if the image is floating point, False if it is integer. Default is False

  • jpg – bool: True if the image should be decompressed from JPG. Default is False

listen()[source]

Listen for a message.

Returns:

np.ndarray: The received message as a numpy array formatted as a cv2 image np.ndarray[img_height, img_width, channels]

class wrapyfi.listeners.zeromq.ZeroMQAudioChunkListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

Bases: ZeroMQImageListener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs)[source]

The AudioChunk listener using the ZeroMQ message construct parsed to a numpy array.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • queue_size – int: Size of the queue for the subscriber. Default is 5

  • channels – int: Number of channels in the audio. Default is 1

  • rate – int: Sampling rate of the audio. Default is 44100

  • chunk – int: Number of samples in the audio chunk. Default is -1 (use the chunk size of the received audio)

listen()[source]

Listen for a message.

Returns:

Tuple[np.ndarray, int]: The received message as a numpy array formatted as (np.ndarray[audio_chunk, channels], int[samplerate])

class wrapyfi.listeners.zeromq.ZeroMQPropertiesListener(name, in_topic, **kwargs)[source]

Bases: ZeroMQListener

__init__(name, in_topic, **kwargs)[source]

Initialize the subscriber.

Parameters:
  • name – str: Name of the subscriber

  • in_topic – str: Name of the input topic preceded by ‘/’ (e.g. ‘/topic’)

  • carrier – str: Carrier protocol. ZeroMQ currently only supports TCP for PUB/SUB pattern. Default is ‘tcp’

  • should_wait – bool: Whether the subscriber should wait for the publisher to transmit a message. Default is True

  • socket_ip – str: IP address of the socket. Default is ‘127.0.0.1

  • socket_pub_port – int: Port of the socket for publishing. Note that the subscriber listens directly to this port which is proxied . Default is 5555

  • pubsub_monitor_topic – str: Topic to monitor the connections. Default is ‘ZEROMQ/CONNECTIONS’

  • start_pubsub_monitor_broker – bool: Whether to start a topic monitor to enable connection establishment checking. Default is True

  • pubsub_monitor_listener_spawn – str: Whether to spawn the PUB/SUB monitor listener as a process or thread. Default is ‘process’

  • zeromq_kwargs – dict: Additional kwargs for the ZeroMQ middleware

  • kwargs – dict: Additional kwargs for the subscriber

Module contents

class wrapyfi.listeners.FallbackListener(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, missing_middleware_object: str = '', **kwargs)[source]

Bases: Listener

__init__(name: str, in_topic: str, carrier: str = 'tcp', should_wait: bool = True, missing_middleware_object: str = '', **kwargs)[source]

Initialize the Listener.

Parameters:
  • name – str: The name of the listener

  • in_topic – str: The topic to listen to

  • carrier – str: The middleware carrier to use

  • should_wait – bool: Whether to wait for the listener to be established or not

establish(repeats: int = -1, **kwargs)[source]

Establish the listener.

listen()[source]

Listen for incoming data.

close()[source]

Close the connection.